From 74049bb05977266066635497bb3a215a339d296b Mon Sep 17 00:00:00 2001
From: Mike Skells
Date: Thu, 26 Sep 2024 13:29:20 +0100
Subject: [PATCH] Add additional timestamp sources, for from a header value
from a data field via a custom extractors
Remove a few simple classes and make a DataExtractor to read things from the `sinkRecord`
and few tidyups
---
.../common/config/AivenCommonConfig.java | 6 +-
.../common/config/TimestampSource.java | 189 ++++++++++++++----
.../config/extractors/DataExtractor.java | 8 +
.../extractors/HeaderValueExtractor.java | 17 ++
.../config/extractors/SimpleValuePath.java | 92 +++++++++
.../validators/TimestampSourceValidator.java | 4 +-
.../extractors/HeaderValueExtractorTest.java | 61 ++++++
.../extractors/SimpleValuePathTest.java | 135 +++++++++++++
...sedTopicPartitionKeyRecordGrouperTest.java | 2 +-
...aBasedTopicPartitionRecordGrouperTest.java | 2 +-
.../common/grouper/TestTimestampSource.java | 38 ++++
.../TopicPartitionKeyRecordGrouperTest.java | 14 +-
.../TopicPartitionRecordGrouperTest.java | 14 +-
.../kafka/connect/gcs/GcsSinkConfig.java | 4 +-
.../kafka/connect/s3/config/S3SinkConfig.java | 5 +-
15 files changed, 530 insertions(+), 61 deletions(-)
create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java
create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java
create mode 100644 commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java
create mode 100644 commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java
create mode 100644 commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java
create mode 100644 commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java
index 70df3d964..106d39204 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java
@@ -202,8 +202,10 @@ public final ZoneId getFilenameTimezone() {
}
public final TimestampSource getFilenameTimestampSource() {
- return TimestampSource.of(getFilenameTimezone(),
- TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE)));
+ return new TimestampSource.Builder()
+ .configuration(getString(FILE_NAME_TIMESTAMP_SOURCE))
+ .zoneId(getFilenameTimezone())
+ .build();
}
public final int getMaxRecordsPerFile() {
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
index 30b38f7c3..99773f29a 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
@@ -20,82 +20,191 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
+import java.util.Locale;
+import java.util.Objects;
+import io.aiven.kafka.connect.common.config.extractors.DataExtractor;
+import io.aiven.kafka.connect.common.config.extractors.HeaderValueExtractor;
+import io.aiven.kafka.connect.common.config.extractors.SimpleValuePath;
import org.apache.kafka.connect.sink.SinkRecord;
public interface TimestampSource {
ZonedDateTime time(SinkRecord record);
- @SuppressWarnings("PMD.ShortMethodName")
- static TimestampSource of(final Type extractorType) {
- return of(ZoneOffset.UTC, extractorType);
- }
-
- @SuppressWarnings("PMD.ShortMethodName")
- static TimestampSource of(final ZoneId zoneId, final Type extractorType) {
- switch (extractorType) {
- case WALLCLOCK :
- return new WallclockTimestampSource(zoneId);
- case EVENT :
- return new EventTimestampSource(zoneId);
- default :
- throw new IllegalArgumentException(
- String.format("Unsupported timestamp extractor type: %s", extractorType));
- }
- }
Type type();
enum Type {
- WALLCLOCK, EVENT;
+ WALLCLOCK,
+ EVENT,
+ HEADER,
+ SIMPLE_DATA,
+ CUSTOM
+
+ }
+ class Builder {
+ private ZoneId zoneId = ZoneOffset.UTC;
+ private Type type;
+ private String additionalParameters;
+
+ /**
+ * set the zoneId to be used. If this method isnt called, the default is UTC
+ * @return this
+ * @throws NullPointerException if zoneId is null
+ */
+ public Builder zoneId(final ZoneId zoneId) {
+ Objects.requireNonNull(zoneId, "zoneId cannot be null");
+ this.zoneId = zoneId;
+ return this;
+ }
- @SuppressWarnings("PMD.ShortMethodName")
- public static Type of(final String name) {
- for (final Type t : Type.values()) {
- if (t.name().equalsIgnoreCase(name)) {
- return t;
- }
+ /**
+ * sets the type of the timestamp source and associated parameters (if needed)
+ * The format of the configuration is <type>[:<data>]
+ * i.e. the type name, optionally followed by data.
+ *
+ * The data is type specific
+ *
+ * For type WALLCLOCK or EVENT, no data is allowed
+ *
+ *
+ * For type SIMPLE_DATA, data is required, and is a '.' separated series of
+ * terms in the path
+ *
If the '.' is something that should be included in the terms, and you
+ * want to use a different separator, then you can specify a '.' as the first character, and the separator as the
+ * second character, and then the path is the rest of the string
+ *
For example "SIMPLE_DATA:a.b.c" would use into a path with
+ * terms "a", "b", "c"
+ *
For example "SIMPLE_DATA:.:a.b:c" would use a path with terms "a.b", "c"
+ *
+ * For type HEADER, data is required, and is the name of the header to extract
+ *
For example "HEADER:foo" would use to "foo" header (or null if its not available in the SinkRecord
+ *
+ *
+ * For type CUSTOM, data is required, and is the name of the class to use, and any additional parameters for that custom time source.
+ * The specified class must implement the TimestampSource interface and have a public constructor that takes a String and a ZoneId. Fort the meaning of the data, see the documentation of the custom class.
+ *
For example "CUSTOM:my.custom.timesource:some more data" would be similar to calling new my.custom.timesource("some more data", zoneId)
+ *
+ *
+
+ * @return this
+ */
+ public Builder configuration(final String configuration) {
+ final String[] parts = configuration.split(":", 2);
+ final String typeName = parts[0];
+ try {
+ this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH));
+ } catch (final IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unknown timestamp source: "+typeName);
+ }
+
+ this.additionalParameters = parts.length > 1 ? parts[1] : null;
+ return this;
+ }
+
+ public TimestampSource build() {
+ switch (type) {
+ case WALLCLOCK:
+ if (additionalParameters != null) {
+ throw new IllegalArgumentException("Wallclock timestamp source does not support additionalParameters");
+ }
+ return new WallclockTimestampSource(zoneId);
+ case EVENT:
+ if (additionalParameters != null) {
+ throw new IllegalArgumentException("Event timestamp source does not support additionalParameters");
+ }
+ return new EventTimestampSource(zoneId);
+ case SIMPLE_DATA:
+ if (additionalParameters == null) {
+ throw new IllegalArgumentException("Data timestamp source requires additionalParameters");
+ }
+ return new SimpleTimestampSource(zoneId, Type.SIMPLE_DATA, SimpleValuePath.parse(additionalParameters));
+ case HEADER:
+ if (additionalParameters == null) {
+ throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
+ }
+ return new SimpleTimestampSource(zoneId, Type.HEADER, new HeaderValueExtractor(additionalParameters));
+ case CUSTOM:
+ if (additionalParameters == null) {
+ throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
+ }
+ final String[] parts = additionalParameters.split(":", 2);
+ final String className = parts[0];
+ final String params = parts.length > 1 ? parts[1] : null;
+ try {
+ final Class> clazz = Class.forName(className);
+ return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class).newInstance(params, zoneId);
+ } catch (final Exception e) {
+ throw new IllegalArgumentException("Failed to create custom timestamp source", e);
+ }
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported timestamp extractor type: %s", type));
}
- throw new IllegalArgumentException(String.format("Unknown timestamp source: %s", name));
}
}
- final class WallclockTimestampSource implements TimestampSource {
- private final ZoneId zoneId;
+ class SimpleTimestampSource implements TimestampSource {
+ protected final ZoneId zoneId;
+ private final Type type;
+ private final DataExtractor dataExtractor;
- protected WallclockTimestampSource(final ZoneId zoneId) {
+ protected SimpleTimestampSource(final ZoneId zoneId, final Type type, DataExtractor dataExtractor) {
this.zoneId = zoneId;
+ this.type = type;
+ this.dataExtractor = dataExtractor;
}
@Override
- public ZonedDateTime time(final SinkRecord record) {
- return ZonedDateTime.now(zoneId);
+ public Type type() {
+ return type;
}
@Override
- public Type type() {
- return Type.WALLCLOCK;
+ public ZonedDateTime time(SinkRecord record) {
+ return fromRawTime(dataExtractor.extractDataFrom(record));
}
- }
- final class EventTimestampSource implements TimestampSource {
- private final ZoneId zoneId;
+ protected ZonedDateTime fromRawTime(final Object rawValue) {
+ if (rawValue == null) {
+ return null;
+ } else if (rawValue instanceof Long) {
+ return withZone((Long) rawValue);
+ } else if (rawValue instanceof ZonedDateTime) {
+ return (ZonedDateTime) rawValue;
+ } else if (rawValue instanceof Instant) {
+ return withZone(((Instant) rawValue).toEpochMilli());
+ }
+ return null;
+ }
- protected EventTimestampSource(final ZoneId zoneId) {
- this.zoneId = zoneId;
+ protected ZonedDateTime withZone(final long timestamp) {
+ return ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId);
+ }
+ }
+
+ final class WallclockTimestampSource extends SimpleTimestampSource {
+ WallclockTimestampSource(final ZoneId zoneId) {
+ super(zoneId, Type.WALLCLOCK, null);
}
@Override
public ZonedDateTime time(final SinkRecord record) {
- return ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), zoneId);
+ return ZonedDateTime.now(zoneId);
+ }
+ }
+
+ final class EventTimestampSource extends SimpleTimestampSource {
+ EventTimestampSource(final ZoneId zoneId) {
+ super(zoneId, Type.EVENT, null);
}
@Override
- public Type type() {
- return Type.EVENT;
+ public ZonedDateTime time(final SinkRecord record) {
+ return withZone(record.timestamp());
}
}
}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java
new file mode 100644
index 000000000..cf6dd37e2
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java
@@ -0,0 +1,8 @@
+package io.aiven.kafka.connect.common.config.extractors;
+
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public interface DataExtractor {
+
+ Object extractDataFrom(final SinkRecord record);
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java
new file mode 100644
index 000000000..9e954fdb9
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java
@@ -0,0 +1,17 @@
+package io.aiven.kafka.connect.common.config.extractors;
+
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class HeaderValueExtractor implements DataExtractor {
+ private final String headerKey;
+
+ public HeaderValueExtractor(final String headerKey) {
+ this.headerKey = headerKey;
+ }
+
+ public Object extractDataFrom(final SinkRecord record) {
+ final Header header = record.headers().lastWithName(headerKey);
+ return header == null ? null : header.value();
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java
new file mode 100644
index 000000000..8ded6ddbe
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.aiven.kafka.connect.common.config.extractors;
+
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public final class SimpleValuePath implements DataExtractor {
+ private final String[] terms;
+
+ private SimpleValuePath(final String[] terms) {
+ this.terms = terms;
+ }
+
+ /**
+ * Parse a path definition string into a Path object. The path definition string is a '.' separated series of
+ * strings, which are the terms in the path If the '.' is something that should be included in the terms, and you
+ * want to use a different separator, then you can specify a '.' as the first character, and the separator as the
+ * second character, and then the path is the rest of the string For example "a.b.c" would parse into a path with
+ * terms "a", "b", "c" For example ".:a.b:c" would parse into a path with terms "a.b", "c"
+ *
+ * @return a PathAccess that can access a value in a nested structure
+ */
+ public static SimpleValuePath parse(final String pathDefinition) {
+ final String pathDescription;
+ final String pathSeparator;
+ if (pathDefinition.length() > 1 && pathDefinition.charAt(0) == '.' ) {
+ pathDescription = pathDefinition.substring(2);
+ pathSeparator = pathDefinition.substring(1,2);
+ } else {
+ pathDescription = pathDefinition;
+ pathSeparator = ".";
+ }
+ return new SimpleValuePath(Pattern.compile(pathSeparator, Pattern.LITERAL).split(pathDescription));
+ }
+
+ public Object extractDataFrom(final SinkRecord record) {
+ Object current = record.value();
+
+ for (final String term : terms) {
+ if (current == null) {
+ return null;
+ }
+ if (current instanceof Struct) {
+ final Struct struct = (Struct) current;
+ final Schema schema = struct.schema();
+ final Field field = schema.field(term);
+ if (field == null) {
+ return null;
+ }
+ current = struct.get(field);
+ } else if (current instanceof Map) {
+ current = ((Map, ?>) current).get(term);
+ } else if (current instanceof List) {
+ try {
+ current = ((List>) current).get(Integer.parseInt(term));
+ } catch (NumberFormatException|IndexOutOfBoundsException e) {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+ return current;
+ }
+
+ @Override
+ public String toString() {
+ return "Path[terms=" + Arrays.toString( terms) +"]";
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java
index b7535538e..7b8f52843 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java
@@ -26,7 +26,9 @@ public class TimestampSourceValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
try {
- TimestampSource.Type.of(value.toString());
+ new TimestampSource.Builder()
+ .configuration(value.toString())
+ .build();
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
throw new ConfigException(name, value, e.getMessage());
}
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java
new file mode 100644
index 000000000..9db03e749
--- /dev/null
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java
@@ -0,0 +1,61 @@
+package io.aiven.kafka.connect.common.config.extractors;
+
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class HeaderValueExtractorTest {
+
+
+ static SinkRecord record1 = new SinkRecord("topic", 0, null, null, null, null, 0,
+ 0L, TimestampType.CREATE_TIME,
+ new ConnectHeaders()
+ .add("h1", "value1", Schema.STRING_SCHEMA)
+ .add("h2", "v2", Schema.STRING_SCHEMA)
+ .add("b1", true, Schema.BOOLEAN_SCHEMA)
+ .add("b2", false, Schema.BOOLEAN_SCHEMA)
+ .add("i1", null, Schema.OPTIONAL_INT32_SCHEMA)
+ .add("i2", 17, Schema.OPTIONAL_INT32_SCHEMA)
+ .add("i3", 99, Schema.INT32_SCHEMA)
+ .add("i1", null, Schema.OPTIONAL_INT64_SCHEMA)
+ .add("l2", 17L, Schema.OPTIONAL_INT64_SCHEMA)
+ .add("l3", 99L, Schema.INT64_SCHEMA)
+ .add("dup", "one", Schema.STRING_SCHEMA)
+ .add("dup", "two", Schema.STRING_SCHEMA)
+ );
+
+ public static Stream testData() {
+ return Stream.of(
+ Arguments.of(record1, "h1", "value1"),
+ Arguments.of(record1, "h2", "v2"),
+ Arguments.of(record1, "b1", true),
+ Arguments.of(record1, "b2", false),
+ Arguments.of(record1, "i1", null),
+ Arguments.of(record1, "i2", 17),
+ Arguments.of(record1, "i3", 99),
+ Arguments.of(record1, "i1", null),
+ Arguments.of(record1, "l2", 17L),
+ Arguments.of(record1, "l3", 99L),
+ Arguments.of(record1, "dup", "two"),
+ Arguments.of(record1, "xxxxx", null)
+ );
+
+
+ }
+
+ @ParameterizedTest
+ @MethodSource("testData")
+ void test(SinkRecord record, String headerKey, Object expected) {
+ var headerValueExtractor = new HeaderValueExtractor(headerKey);
+ assertEquals(expected, headerValueExtractor.extractDataFrom(record));
+ }
+
+}
\ No newline at end of file
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java
new file mode 100644
index 000000000..9d70a0a45
--- /dev/null
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.aiven.kafka.connect.common.config.extractors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonDeserializer;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SimpleValuePathTest {
+
+ public static Stream ParseDataProvider() {
+ return Stream.of(
+ Arguments.of("Path[terms=[a, b, c]]", "a.b.c"),
+ Arguments.of("Path[terms=[a:b:c]]", "a:b:c"),
+ Arguments.of("Path[terms=[.b.c]]", ".a.b.c"),
+ Arguments.of("Path[terms=[a.b, c]]", ".:a.b:c"),
+ //with some regex special characters
+ Arguments.of("Path[terms=[\\a, b, c]]", "\\a.b.c"),
+ Arguments.of("Path[terms=[a.b.c]]", ".\\a.b.c"),
+ Arguments.of("Path[terms=[a, b, c]]", ".\\a\\b\\c"),
+
+ Arguments.of("Path[terms=[ [a, b, c]]", " [a.b.c"),
+ Arguments.of("Path[terms=[[a.b.c]]", ". [a.b.c"),
+ Arguments.of("Path[terms=[a, b, c]]", ".[a[b[c"),
+
+ Arguments.of("Path[terms=[]]", "."),
+ Arguments.of("Path[terms=[]]", ""),
+ Arguments.of("Path[terms=[]]", ".."),
+ Arguments.of("Path[terms=[a]]", "..a")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("ParseDataProvider")
+ void parse(String expected, String toParse) {
+ assertEquals(expected, SimpleValuePath.parse(toParse).toString());
+ }
+
+ static Schema flatSchema = SchemaBuilder.struct()
+ .field("s1", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("i1", Schema.OPTIONAL_INT32_SCHEMA)
+ .build();
+ static Schema mapSchema = SchemaBuilder.struct()
+ .field("m1", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.struct()
+ .field("s2", Schema.OPTIONAL_STRING_SCHEMA)
+ .field("i2", Schema.OPTIONAL_INT32_SCHEMA)
+ .optional()
+ .build())
+ .optional()
+ .build())
+ .field("m2", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)
+ .optional()
+ .build())
+ .build();
+ static Schema arraySchema = SchemaBuilder.struct()
+ .field("a1", SchemaBuilder.array(Schema.FLOAT32_SCHEMA)
+ .optional()
+ .build())
+ .field("a2", SchemaBuilder.array(Schema.FLOAT64_SCHEMA)
+ .optional()
+ .build())
+ .field("a3", SchemaBuilder.array(Schema.STRING_SCHEMA)
+ .optional()
+ .build())
+ .build();
+
+ static SinkRecord toRecord(Schema schema, String json) throws Exception {
+ try (JsonDeserializer ds = new JsonDeserializer()) {
+ JsonNode jsonValue = ds.deserialize("xx", json.replace('\'', '"').getBytes(StandardCharsets.UTF_8));
+ Method m = JsonConverter.class.getDeclaredMethod("convertToConnect", Schema.class, JsonNode.class);
+ m.setAccessible(true);
+ Object value = m.invoke(null, schema, jsonValue);
+ return new SinkRecord("topic", 0, null, null, schema, value, 0);
+ }
+ }
+
+ static Stream extractDataFromDataProvider() throws Exception {
+ return Stream.of(
+ Arguments.of(toRecord(flatSchema, "{'s1': 'hi', 'i1': 42}"), "s1", "hi"),
+ Arguments.of(toRecord(flatSchema, "{'s1': 'hi', 'i1': 42}"), "i1", 42),
+ Arguments.of(toRecord(flatSchema, "{'s1': 'hi', 'i1': 42}"), "xx", null),
+
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "m1.k1.i2", 42),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "m1.k1.s2", "Hi"),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "m1.k1.xx", null),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "mx.k1.i2", null),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "m1.k2.s2", "Bi"),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "m2.two", 2),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), "m1.one.xx", null),
+ Arguments.of(toRecord(mapSchema, "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'with.dot':1}}"), ".:m2:with.dot", 1),
+
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a1.0", 1F),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a1.3", 9.9f),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a2.0", 9.0),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a2.1", -1.0),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a3.0", "zero"),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a3.-1", null),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a3.10", null),
+ Arguments.of(toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), "a3.2", "two")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("extractDataFromDataProvider")
+ void extractDataFrom(SinkRecord record, String path, Object expected) {
+ final SimpleValuePath underTest = SimpleValuePath.parse(path);
+ assertEquals(expected, underTest.extractDataFrom(record));
+
+ }
+}
\ No newline at end of file
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java
index 4fc2abf2b..3fbfc6cef 100644
--- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java
@@ -64,7 +64,7 @@ final class SchemaBasedTopicPartitionKeyRecordGrouperTest {
static final SinkRecord KRT1P1R3 = new SinkRecord("topic1", 0, SchemaBuilder.string().optional().version(2).build(),
"some_key", SchemaBuilder.string().optional().version(2).build(), null, 1003);
- static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK);
+ static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK);
@Test
void rotateOnKeySchemaChanged() {
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java
index 0bf4c4449..b840b17cd 100644
--- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java
@@ -64,7 +64,7 @@ final class SchemaBasedTopicPartitionRecordGrouperTest {
static final SinkRecord KRT1P1R3 = new SinkRecord("topic1", 0, SchemaBuilder.string().optional().version(2).build(),
"some_key", SchemaBuilder.string().optional().version(2).build(), null, 1003);
- static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK);
+ static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK);
@Test
void rotateOnKeySchemaChanged() {
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java
new file mode 100644
index 000000000..a9450ba26
--- /dev/null
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.aiven.kafka.connect.common.grouper;
+
+import io.aiven.kafka.connect.common.config.TimestampSource;
+
+import java.time.ZoneOffset;
+
+public final class TestTimestampSource {
+ private TestTimestampSource() {
+ }
+ @SuppressWarnings("PMD.ShortMethodName")
+ public static TimestampSource of(final TimestampSource.Type type) {
+ return of(type, ZoneOffset.UTC);
+ }
+
+ @SuppressWarnings("PMD.ShortMethodName")
+ public static TimestampSource of(final TimestampSource.Type type, final ZoneOffset timeZone) {
+ return new TimestampSource.Builder()
+ .configuration(type.toString())
+ .zoneId(timeZone)
+ .build();
+ }
+}
+
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java
index a4462ba11..c98c6f74f 100644
--- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java
@@ -84,7 +84,7 @@ final class TopicPartitionKeyRecordGrouperTest {
private static final SinkRecord T2P1R3 = new SinkRecord("topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null,
null, 2003, 1_635_547_906_000L, TimestampType.CREATE_TIME);
- private static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK);
+ private static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK);
@Test
void withoutNecessaryParameters() {
@@ -240,13 +240,13 @@ void setZeroPaddingForKafkaPartition() {
void addTimeUnitsToTheFileNameUsingWallclockTimestampSource() {
final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{key}}-" + "{{start_offset}}-"
+ "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}");
- final ZonedDateTime timestamp = TimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null);
+ final ZonedDateTime timestamp = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null);
final String expectedTs = timestamp.format(DateTimeFormatter.ofPattern("yyyy"))
+ timestamp.format(DateTimeFormatter.ofPattern("MM"))
+ timestamp.format(DateTimeFormatter.ofPattern("dd"));
final TopicPartitionKeyRecordGrouper grouper = new TopicPartitionKeyRecordGrouper(filenameTemplate, null,
- TimestampSource.of(TimestampSource.Type.WALLCLOCK));
+ TestTimestampSource.of(TimestampSource.Type.WALLCLOCK));
grouper.put(T1P1R0);
grouper.put(T1P1R1);
@@ -401,9 +401,9 @@ void rotateKeysYearly() {
void rotateDailyWithEventTimestampSource() {
final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{key}}-" + "{{start_offset}}-"
+ "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}");
- final ZonedDateTime timestamp0 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0);
- final ZonedDateTime timestamp1 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1);
- final ZonedDateTime timestamp2 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2);
+ final ZonedDateTime timestamp0 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0);
+ final ZonedDateTime timestamp1 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1);
+ final ZonedDateTime timestamp2 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2);
final String expectedTs0 = timestamp0.format(DateTimeFormatter.ofPattern("yyyy"))
+ timestamp0.format(DateTimeFormatter.ofPattern("MM"))
@@ -416,7 +416,7 @@ void rotateDailyWithEventTimestampSource() {
+ timestamp2.format(DateTimeFormatter.ofPattern("dd"));
final TopicPartitionKeyRecordGrouper grouper = new TopicPartitionKeyRecordGrouper(filenameTemplate, null,
- TimestampSource.of(TimestampSource.Type.EVENT));
+ TestTimestampSource.of(TimestampSource.Type.EVENT));
grouper.put(T2P1R0);
grouper.put(T2P1R1);
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java
index 466d222ef..611c5ddf4 100644
--- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java
@@ -84,7 +84,7 @@ final class TopicPartitionRecordGrouperTest {
private static final SinkRecord T2P1R3 = new SinkRecord("topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null,
null, 2003, 1_635_547_906_000L, TimestampType.CREATE_TIME);
- private static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK);
+ private static final TimestampSource DEFAULT_TS_SOURCE =TestTimestampSource.of(TimestampSource.Type.WALLCLOCK);
@Test
void withoutNecessaryParameters() {
@@ -233,13 +233,13 @@ void setZeroPaddingForKafkaPartition() {
void addTimeUnitsToTheFileNameUsingWallclockTimestampSource() {
final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{start_offset}}-"
+ "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}");
- final ZonedDateTime timestamp = TimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null);
+ final ZonedDateTime timestamp = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null);
final String expectedTs = timestamp.format(DateTimeFormatter.ofPattern("yyyy"))
+ timestamp.format(DateTimeFormatter.ofPattern("MM"))
+ timestamp.format(DateTimeFormatter.ofPattern("dd"));
final TopicPartitionRecordGrouper grouper = new TopicPartitionRecordGrouper(filenameTemplate, null,
- TimestampSource.of(TimestampSource.Type.WALLCLOCK));
+ TestTimestampSource.of(TimestampSource.Type.WALLCLOCK));
grouper.put(T1P1R0);
grouper.put(T1P1R1);
@@ -391,9 +391,9 @@ void rotateKeysYearly() {
void rotateDailyWithEventTimestampSource() {
final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{start_offset}}-"
+ "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}");
- final ZonedDateTime timestamp0 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0);
- final ZonedDateTime timestamp1 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1);
- final ZonedDateTime timestamp2 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2);
+ final ZonedDateTime timestamp0 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0);
+ final ZonedDateTime timestamp1 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1);
+ final ZonedDateTime timestamp2 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2);
final String expectedTs0 = timestamp0.format(DateTimeFormatter.ofPattern("yyyy"))
+ timestamp0.format(DateTimeFormatter.ofPattern("MM"))
@@ -406,7 +406,7 @@ void rotateDailyWithEventTimestampSource() {
+ timestamp2.format(DateTimeFormatter.ofPattern("dd"));
final TopicPartitionRecordGrouper grouper = new TopicPartitionRecordGrouper(filenameTemplate, null,
- TimestampSource.of(TimestampSource.Type.EVENT));
+ TestTimestampSource.of(TimestampSource.Type.EVENT));
grouper.put(T2P1R0);
grouper.put(T2P1R1);
diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
index 5ac3a26ba..08597c1b2 100644
--- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
+++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java
@@ -289,7 +289,9 @@ public void ensureValid(final String name, final Object value) {
@Override
public void ensureValid(final String name, final Object value) {
try {
- TimestampSource.Type.of(value.toString());
+ new TimestampSource.Builder()
+ .configuration(value.toString())
+ .build();
} catch (final Exception e) { // NOPMD broad exception catched
throw new ConfigException(FILE_NAME_TIMESTAMP_SOURCE, value, e.getMessage());
}
diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java
index 7f428e584..b2665ccb7 100644
--- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java
+++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java
@@ -617,7 +617,10 @@ public ZoneId getTimezone() {
}
public TimestampSource getTimestampSource() {
- return TimestampSource.of(getTimezone(), TimestampSource.Type.of(getString(TIMESTAMP_SOURCE)));
+ return new TimestampSource.Builder()
+ .configuration(getString(TIMESTAMP_SOURCE))
+ .zoneId(getTimezone())
+ .build();
}
public AwsStsRole getStsRole() {