Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java API] Rough edges when partitioning by time types #11899

Open
1 of 3 tasks
ahmedabu98 opened this issue Jan 1, 2025 · 3 comments
Open
1 of 3 tasks

[Java API] Rough edges when partitioning by time types #11899

ahmedabu98 opened this issue Jan 1, 2025 · 3 comments
Labels
bug Something isn't working

Comments

@ahmedabu98
Copy link

ahmedabu98 commented Jan 1, 2025

Apache Iceberg version

1.7.1 (latest release)

Query engine

Other

Please describe the bug 🐞

We've been developing an Iceberg connector at Apache Beam using the Java API, and I noticed some rough edges around partitioning by time types (i.e. year, month, day or hour).

See the following code:

org.apache.iceberg.Schema schema =
    new org.apache.iceberg.Schema(
        Types.NestedField.required(1, "year", Types.TimestampType.withoutZone()),
        Types.NestedField.required(2, "day", Types.TimestampType.withoutZone()));
PartitionSpec spec = PartitionSpec.builderFor(schema)
        .year("year")
        .day("day").build();
Table table = catalog.createTable(TableIdentifier.parse("db.table"), schema, spec);
PartitionKey pk = new PartitionKey(spec, schema);

LocalDateTime val = LocalDateTime.parse("2024-10-08T13:18:20.053");
Record rec = GenericRecord.create(schema).copy(
        ImmutableMap.of(
                "year", val, 
                "day", val));
pk.partition(rec);

I'm applying a simple partition to my original record and would expect it to work normally, but the last line fails with the following error:

java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-08T13:18:20.053
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
	at org.apache.iceberg.StructTransform.wrap(StructTransform.java:78)
	at org.apache.iceberg.PartitionKey.wrap(PartitionKey.java:30)
	at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:64)

We've been able to work around it with this logic, replicated below:

Work-around
private Record getPartitionableRecord(
    Record record, PartitionSpec spec, org.apache.iceberg.Schema schema) {
  if (spec.isUnpartitioned()) {
    return record;
  }
  Record output = GenericRecord.create(schema);
  for (PartitionField partitionField : spec.fields()) {
    Transform<?, ?> transform = partitionField.transform();
    Types.NestedField field = schema.findField(partitionField.sourceId());
    String name = field.name();
    Object value = record.getField(name);
    @Nullable Literal<Object> literal = Literal.of(value.toString()).to(field.type());
    if (literal == null || transform.isVoid() || transform.isIdentity()) {
      output.setField(name, value);
    } else {
      output.setField(name, literal.value());
    }
  }
  return output;
}

So that instead we have this:

Record partitionableRec = getPartitionableRecord(rec, spec, schema);
pk.partition(partitionableRec);

This feels a little hacky and I would expect the Iceberg API to handle this by itself. Let me know if I'm missing something!

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@ahmedabu98 ahmedabu98 added the bug Something isn't working label Jan 1, 2025
@RussellSpitzer
Copy link
Member

RussellSpitzer commented Jan 2, 2025

I think the issue here is that the Copy constructor for GenericRecord does not do type checking. The accessor is failing because the Generic record has an illegal object in it. We should have failed when the LocalDateTime is inserted because that is not a valid Iceberg type that can be in a struct and doesn't match the Iceberg type of Timestamp (Long microseconds from epoch).

See here for the Java Classes used in the reference lib for various Iceberg Types

enum TypeID {
BOOLEAN(Boolean.class),
INTEGER(Integer.class),
LONG(Long.class),
FLOAT(Float.class),
DOUBLE(Double.class),
DATE(Integer.class),
TIME(Long.class),
TIMESTAMP(Long.class),
TIMESTAMP_NANO(Long.class),
STRING(CharSequence.class),
UUID(java.util.UUID.class),
FIXED(ByteBuffer.class),
BINARY(ByteBuffer.class),
DECIMAL(BigDecimal.class),
STRUCT(StructLike.class),
LIST(List.class),
MAP(Map.class),
VARIANT(Object.class);

So to fix this you need to just convert LocalDateTime into the actual Iceberg Type before putting it in the generic record.

        LocalDateTime val = LocalDateTime.parse("2024-10-08T13:18:20.053");
        Long epochMicros =       DateTimeUtil.microsFromTimestamp(val)
        Record rec = GenericRecord.create(schema).copy(
        ImmutableMap.of(
                "year", epochMicros, 
                "day", epochMicros));

@Gezi-lzq
Copy link

Gezi-lzq commented Jan 6, 2025

I think the issue here is that the Copy constructor for GenericRecord does not do type checking. The accessor is failing because the Generic record has an illegal object in it. We should have failed when the LocalDateTime is inserted because that is not a valid Iceberg type that can be in a struct and doesn't match the Iceberg type of Timestamp (Long microseconds from epoch).

See here for the Java Classes used in the reference lib for various Iceberg Types

enum TypeID {
BOOLEAN(Boolean.class),
INTEGER(Integer.class),
LONG(Long.class),
FLOAT(Float.class),
DOUBLE(Double.class),
DATE(Integer.class),
TIME(Long.class),
TIMESTAMP(Long.class),
TIMESTAMP_NANO(Long.class),
STRING(CharSequence.class),
UUID(java.util.UUID.class),
FIXED(ByteBuffer.class),
BINARY(ByteBuffer.class),
DECIMAL(BigDecimal.class),
STRUCT(StructLike.class),
LIST(List.class),
MAP(Map.class),
VARIANT(Object.class);

So to fix this you need to just convert LocalDateTime into the actual Iceberg Type before putting it in the generic record.

        LocalDateTime val = LocalDateTime.parse("2024-10-08T13:18:20.053");
        Long epochMicros =       DateTimeUtil.microsFromTimestamp(val)
        Record rec = GenericRecord.create(schema).copy(
        ImmutableMap.of(
                "year", epochMicros, 
                "day", epochMicros));

@RussellSpitzer I noticed that in Kafka connect, the Value values corresponding to TimestampType are all LocalDateTime. Is there a problem with this approach.

@RussellSpitzer
Copy link
Member

That code uses an additional conversion here

To convert the Date Time objects to Long before determining the partitioning or writing

case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp);
} else {
return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants