Skip to content

Commit

Permalink
Add additional timestamp sources, for
Browse files Browse the repository at this point in the history
from a header value
from a data field
via a custom extractors

Remove a few simple classes and make a DataExtractor to read things from the `sinkRecord`
and few tidyups
  • Loading branch information
Mike Skells committed Oct 8, 2024
1 parent ec00122 commit 74049bb
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ public final ZoneId getFilenameTimezone() {
}

public final TimestampSource getFilenameTimestampSource() {
return TimestampSource.of(getFilenameTimezone(),
TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE)));
return new TimestampSource.Builder()
.configuration(getString(FILE_NAME_TIMESTAMP_SOURCE))
.zoneId(getFilenameTimezone())
.build();
}

public final int getMaxRecordsPerFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,82 +20,191 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Locale;
import java.util.Objects;

import io.aiven.kafka.connect.common.config.extractors.DataExtractor;
import io.aiven.kafka.connect.common.config.extractors.HeaderValueExtractor;
import io.aiven.kafka.connect.common.config.extractors.SimpleValuePath;
import org.apache.kafka.connect.sink.SinkRecord;

public interface TimestampSource {

ZonedDateTime time(SinkRecord record);

@SuppressWarnings("PMD.ShortMethodName")
static TimestampSource of(final Type extractorType) {
return of(ZoneOffset.UTC, extractorType);
}

@SuppressWarnings("PMD.ShortMethodName")
static TimestampSource of(final ZoneId zoneId, final Type extractorType) {
switch (extractorType) {
case WALLCLOCK :
return new WallclockTimestampSource(zoneId);
case EVENT :
return new EventTimestampSource(zoneId);
default :
throw new IllegalArgumentException(
String.format("Unsupported timestamp extractor type: %s", extractorType));
}
}

Type type();

enum Type {

WALLCLOCK, EVENT;
WALLCLOCK,
EVENT,
HEADER,
SIMPLE_DATA,
CUSTOM

}
class Builder {
private ZoneId zoneId = ZoneOffset.UTC;
private Type type;
private String additionalParameters;

/**
* set the zoneId to be used. If this method isnt called, the default is UTC
* @return this
* @throws NullPointerException if zoneId is null
*/
public Builder zoneId(final ZoneId zoneId) {
Objects.requireNonNull(zoneId, "zoneId cannot be null");
this.zoneId = zoneId;
return this;
}

@SuppressWarnings("PMD.ShortMethodName")
public static Type of(final String name) {
for (final Type t : Type.values()) {
if (t.name().equalsIgnoreCase(name)) {
return t;
}
/**
* sets the type of the timestamp source and associated parameters (if needed)
* The format of the configuration is <type>[:<data>]
* i.e. the type name, optionally followed by data.
* <br>
* The data is type specific
* <p>
* For type WALLCLOCK or EVENT, no data is allowed
* </p>
* <p>
* For type SIMPLE_DATA, data is required, and is a '.' separated series of
* terms in the path
* <br>If the '.' is something that should be included in the terms, and you
* want to use a different separator, then you can specify a '.' as the first character, and the separator as the
* second character, and then the path is the rest of the string
* <br>For example "SIMPLE_DATA:a.b.c" would use into a path with
* terms "a", "b", "c"
* <br>For example "SIMPLE_DATA:.:a.b:c" would use a path with terms "a.b", "c"
* </p>
* For type HEADER, data is required, and is the name of the header to extract
* <br>For example "HEADER:foo" would use to "foo" header (or null if its not available in the SinkRecord
* </p>
* </p>
* For type CUSTOM, data is required, and is the name of the class to use, and any additional parameters for that custom time source.
* The specified class must implement the TimestampSource interface and have a public constructor that takes a String and a ZoneId. Fort the meaning of the data, see the documentation of the custom class.
* <br>For example "CUSTOM:my.custom.timesource:some more data" would be similar to calling new my.custom.timesource("some more data", zoneId)
* </p>
*
* @return this
*/
public Builder configuration(final String configuration) {
final String[] parts = configuration.split(":", 2);
final String typeName = parts[0];
try {
this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH));
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown timestamp source: "+typeName);
}

this.additionalParameters = parts.length > 1 ? parts[1] : null;
return this;
}

public TimestampSource build() {
switch (type) {
case WALLCLOCK:
if (additionalParameters != null) {
throw new IllegalArgumentException("Wallclock timestamp source does not support additionalParameters");
}
return new WallclockTimestampSource(zoneId);
case EVENT:
if (additionalParameters != null) {
throw new IllegalArgumentException("Event timestamp source does not support additionalParameters");
}
return new EventTimestampSource(zoneId);
case SIMPLE_DATA:
if (additionalParameters == null) {
throw new IllegalArgumentException("Data timestamp source requires additionalParameters");
}
return new SimpleTimestampSource(zoneId, Type.SIMPLE_DATA, SimpleValuePath.parse(additionalParameters));
case HEADER:
if (additionalParameters == null) {
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
}
return new SimpleTimestampSource(zoneId, Type.HEADER, new HeaderValueExtractor(additionalParameters));
case CUSTOM:
if (additionalParameters == null) {
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
}
final String[] parts = additionalParameters.split(":", 2);
final String className = parts[0];
final String params = parts.length > 1 ? parts[1] : null;
try {
final Class<?> clazz = Class.forName(className);
return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class).newInstance(params, zoneId);
} catch (final Exception e) {
throw new IllegalArgumentException("Failed to create custom timestamp source", e);
}
default:
throw new IllegalArgumentException(
String.format("Unsupported timestamp extractor type: %s", type));
}
throw new IllegalArgumentException(String.format("Unknown timestamp source: %s", name));
}

}

final class WallclockTimestampSource implements TimestampSource {
private final ZoneId zoneId;
class SimpleTimestampSource implements TimestampSource {
protected final ZoneId zoneId;
private final Type type;
private final DataExtractor dataExtractor;

protected WallclockTimestampSource(final ZoneId zoneId) {
protected SimpleTimestampSource(final ZoneId zoneId, final Type type, DataExtractor dataExtractor) {
this.zoneId = zoneId;
this.type = type;
this.dataExtractor = dataExtractor;
}

@Override
public ZonedDateTime time(final SinkRecord record) {
return ZonedDateTime.now(zoneId);
public Type type() {
return type;
}

@Override
public Type type() {
return Type.WALLCLOCK;
public ZonedDateTime time(SinkRecord record) {
return fromRawTime(dataExtractor.extractDataFrom(record));
}
}

final class EventTimestampSource implements TimestampSource {
private final ZoneId zoneId;
protected ZonedDateTime fromRawTime(final Object rawValue) {
if (rawValue == null) {
return null;
} else if (rawValue instanceof Long) {
return withZone((Long) rawValue);
} else if (rawValue instanceof ZonedDateTime) {
return (ZonedDateTime) rawValue;
} else if (rawValue instanceof Instant) {
return withZone(((Instant) rawValue).toEpochMilli());
}
return null;
}

protected EventTimestampSource(final ZoneId zoneId) {
this.zoneId = zoneId;
protected ZonedDateTime withZone(final long timestamp) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId);
}
}

final class WallclockTimestampSource extends SimpleTimestampSource {
WallclockTimestampSource(final ZoneId zoneId) {
super(zoneId, Type.WALLCLOCK, null);
}

@Override
public ZonedDateTime time(final SinkRecord record) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), zoneId);
return ZonedDateTime.now(zoneId);
}
}

final class EventTimestampSource extends SimpleTimestampSource {
EventTimestampSource(final ZoneId zoneId) {
super(zoneId, Type.EVENT, null);
}

@Override
public Type type() {
return Type.EVENT;
public ZonedDateTime time(final SinkRecord record) {
return withZone(record.timestamp());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.aiven.kafka.connect.common.config.extractors;

import org.apache.kafka.connect.sink.SinkRecord;

public interface DataExtractor {

Object extractDataFrom(final SinkRecord record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.aiven.kafka.connect.common.config.extractors;

import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;

public class HeaderValueExtractor implements DataExtractor {
private final String headerKey;

public HeaderValueExtractor(final String headerKey) {
this.headerKey = headerKey;
}

public Object extractDataFrom(final SinkRecord record) {
final Header header = record.headers().lastWithName(headerKey);
return header == null ? null : header.value();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aiven.kafka.connect.common.config.extractors;


import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public final class SimpleValuePath implements DataExtractor {
private final String[] terms;

private SimpleValuePath(final String[] terms) {
this.terms = terms;
}

/**
* Parse a path definition string into a Path object. The path definition string is a '.' separated series of
* strings, which are the terms in the path If the '.' is something that should be included in the terms, and you
* want to use a different separator, then you can specify a '.' as the first character, and the separator as the
* second character, and then the path is the rest of the string For example "a.b.c" would parse into a path with
* terms "a", "b", "c" For example ".:a.b:c" would parse into a path with terms "a.b", "c"
*
* @return a PathAccess that can access a value in a nested structure
*/
public static SimpleValuePath parse(final String pathDefinition) {
final String pathDescription;
final String pathSeparator;
if (pathDefinition.length() > 1 && pathDefinition.charAt(0) == '.' ) {
pathDescription = pathDefinition.substring(2);
pathSeparator = pathDefinition.substring(1,2);
} else {
pathDescription = pathDefinition;
pathSeparator = ".";
}
return new SimpleValuePath(Pattern.compile(pathSeparator, Pattern.LITERAL).split(pathDescription));
}

public Object extractDataFrom(final SinkRecord record) {
Object current = record.value();

for (final String term : terms) {
if (current == null) {
return null;
}
if (current instanceof Struct) {
final Struct struct = (Struct) current;
final Schema schema = struct.schema();
final Field field = schema.field(term);
if (field == null) {
return null;
}
current = struct.get(field);
} else if (current instanceof Map) {
current = ((Map<?, ?>) current).get(term);
} else if (current instanceof List) {
try {
current = ((List<?>) current).get(Integer.parseInt(term));
} catch (NumberFormatException|IndexOutOfBoundsException e) {
return null;
}
} else {
return null;
}
}
return current;
}

@Override
public String toString() {
return "Path[terms=" + Arrays.toString( terms) +"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ public class TimestampSourceValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
try {
TimestampSource.Type.of(value.toString());
new TimestampSource.Builder()
.configuration(value.toString())
.build();
} catch (final Exception e) { // NOPMD AvoidCatchingGenericException
throw new ConfigException(name, value, e.getMessage());
}
Expand Down
Loading

0 comments on commit 74049bb

Please sign in to comment.