Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional timestamp sources #298

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ jobs:
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ public final ZoneId getFilenameTimezone() {
}

public final TimestampSource getFilenameTimestampSource() {
return TimestampSource.of(getFilenameTimezone(),
TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE)));
return new TimestampSource.Builder()
.configuration(getString(FILE_NAME_TIMESTAMP_SOURCE))
.zoneId(getFilenameTimezone())
.build();
}

public final int getMaxRecordsPerFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,82 +20,199 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Locale;
import java.util.Objects;

import io.aiven.kafka.connect.common.config.extractors.DataExtractor;
import io.aiven.kafka.connect.common.config.extractors.HeaderValueExtractor;
import io.aiven.kafka.connect.common.config.extractors.SimpleValuePath;
import org.apache.kafka.connect.sink.SinkRecord;

public interface TimestampSource {

ZonedDateTime time(SinkRecord record);

@SuppressWarnings("PMD.ShortMethodName")
static TimestampSource of(final Type extractorType) {
return of(ZoneOffset.UTC, extractorType);
}

@SuppressWarnings("PMD.ShortMethodName")
static TimestampSource of(final ZoneId zoneId, final Type extractorType) {
switch (extractorType) {
case WALLCLOCK :
return new WallclockTimestampSource(zoneId);
case EVENT :
return new EventTimestampSource(zoneId);
default :
throw new IllegalArgumentException(
String.format("Unsupported timestamp extractor type: %s", extractorType));
}
}

Type type();

enum Type {

WALLCLOCK, EVENT;
/** Use the current wallclock time */
WALLCLOCK,
/** Use the event timestamp */
EVENT,
/** Use a header value to specify the date */
HEADER,
/** Extract the timestamp from the payload */
SIMPLE_DATA,
/** Use a custom timestamp source */
CUSTOM

}
class Builder {
private ZoneId zoneId = ZoneOffset.UTC;
private Type type;
private String additionalParameters;

/**
* set the zoneId to be used. If this method isnt called, the default is UTC
* @return this
* @throws NullPointerException if zoneId is null
*/
public Builder zoneId(final ZoneId zoneId) {
Objects.requireNonNull(zoneId, "zoneId cannot be null");
this.zoneId = zoneId;
return this;
}

@SuppressWarnings("PMD.ShortMethodName")
public static Type of(final String name) {
for (final Type t : Type.values()) {
if (t.name().equalsIgnoreCase(name)) {
return t;
}
/**
* sets the type of the timestamp source and associated parameters (if needed)
* The format of the configuration is <type>[:<data>]
* i.e. the type name, optionally followed by data.
* <br>
* The data is type specific
* <p>
* For type WALLCLOCK or EVENT, no data is allowed
* </p>
* <p>
* For type SIMPLE_DATA, data is required, and is a '.' separated series of
* terms in the path
* <br>If the '.' is something that should be included in the terms, and you
* want to use a different separator, then you can specify a '.' as the first character, and the separator as the
* second character, and then the path is the rest of the string
* <br>For example "SIMPLE_DATA:a.b.c" would use into a path with
* terms "a", "b", "c"
* <br>For example "SIMPLE_DATA:.:a.b:c" would use a path with terms "a.b", "c"
* </p>
* For type HEADER, data is required, and is the name of the header to extract
* <br>For example "HEADER:foo" would use to "foo" header (or null if its not available in the SinkRecord
* </p>
* </p>
* For type CUSTOM, data is required, and is the name of the class to use, and any additional parameters for that custom time source.
* The specified class must implement the TimestampSource interface and have a public constructor that takes a String and a ZoneId. Fort the meaning of the data, see the documentation of the custom class.
* <br>For example "CUSTOM:my.custom.timesource:some more data" would be similar to calling new my.custom.timesource("some more data", zoneId)
* </p>
*

* @return this
*/
public Builder configuration(final String configuration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not make sens to me.

There should be a setType(Type type) method and a setArgs(String... args) method. Worry about configuration packing outside of this class. I know that you are thinking that you can put HEADER:field into a configuration but do the unpacking outside of this class, then you don't have to jump through hoops to change the split pattern or similar gyrations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to separate the type and parameter, but its not varargs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its probably better to allow all of the additional class parameters to be able to be specified via additional parameters - which become available with #300. That way its explicit, documented and extensible

final String[] parts = configuration.split(":", 2);
final String typeName = parts[0];
try {
this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH));
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown timestamp source: "+typeName);
}

this.additionalParameters = parts.length > 1 ? parts[1] : null;
return this;
}

public TimestampSource build() {
switch (type) {
case WALLCLOCK:
if (additionalParameters != null) {
throw new IllegalArgumentException("Wallclock timestamp source does not support additionalParameters");
}
return new WallclockTimestampSource(zoneId);
case EVENT:
if (additionalParameters != null) {
throw new IllegalArgumentException("Event timestamp source does not support additionalParameters");
}
return new EventTimestampSource(zoneId);
case SIMPLE_DATA:
if (additionalParameters == null) {
throw new IllegalArgumentException("Data timestamp source requires additionalParameters");
}
return new SimpleTimestampSource(zoneId, Type.SIMPLE_DATA, SimpleValuePath.parse(additionalParameters));
case HEADER:
if (additionalParameters == null) {
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
}
return new SimpleTimestampSource(zoneId, Type.HEADER, new HeaderValueExtractor(additionalParameters));
case CUSTOM:
if (additionalParameters == null) {
throw new IllegalArgumentException("Header timestamp source requires additionalParameters");
}
final String[] parts = additionalParameters.split(":", 2);
final String className = parts[0];
final String params = parts.length > 1 ? parts[1] : null;
try {
final Class<?> clazz = Class.forName(className);
return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class).newInstance(params, zoneId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor argument seems backwards. ZoneId is required for all instances so shouldn't that be first and then the params?

Copy link
Contributor Author

@mkeskells mkeskells Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by definition both are required parameters in the signature

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that its probably better to move all of the configuration to separate properties, once #300 is merged. That allows for external classes to add to the configuration parameters, and read them. That way its not limited in the configuration, and self documenting

Thoughts?

} catch (final Exception e) {
throw new IllegalArgumentException("Failed to create custom timestamp source", e);
}
default:
throw new IllegalArgumentException(
String.format("Unsupported timestamp extractor type: %s", type));
}
throw new IllegalArgumentException(String.format("Unknown timestamp source: %s", name));
}

}

final class WallclockTimestampSource implements TimestampSource {
private final ZoneId zoneId;
class SimpleTimestampSource implements TimestampSource {
protected final ZoneId zoneId;
private final Type type;
private final DataExtractor dataExtractor;

protected WallclockTimestampSource(final ZoneId zoneId) {
protected SimpleTimestampSource(final ZoneId zoneId, final Type type, DataExtractor dataExtractor) {
this.zoneId = zoneId;
this.type = type;
this.dataExtractor = dataExtractor;
Comment on lines 162 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no checks for any of these being null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There doesn't need to be. The caller validates. Adding null checks at all levels just blurs the concerns IMO

}

@Override
public ZonedDateTime time(final SinkRecord record) {
return ZonedDateTime.now(zoneId);
public Type type() {
return type;
}

@Override
public Type type() {
return Type.WALLCLOCK;
public ZonedDateTime time(SinkRecord record) {
return fromRawTime(dataExtractor.extractDataFrom(record));
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make this an abstract method then you can simply override it in your implementations and not pollute this class with methods that are only used by one instance. This will also make the code easier to read and maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows reuse. Its can still be overridden if a subclass needs it but doesn't require that. What is here is a default, available to be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two separate concerns here. The location of the date like field, and the conversion of that to a ZonedDateTime - e.g. 982b347 adds java.util.Date as a source conversion
This isn't polluting the api, its promoting reuse, and when extending this library to allow external code to use it, I believe its best to enable extension code to access to common and shared concepts, like implicit type conversion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a class that just does the conversions. I think that classes should try to do one thing and do it very well. A class to convert time formats means they are reusable elsewhere in the chain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you want o have the Extraction as one component in the TimstampSource?
So roughly an extractor and a convertor?

that not the way that the existing TimestampSources work

e.g.
for event

        @Override
        public ZonedDateTime time(final SinkRecord record) {
            return ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), zoneId);
        }

does 3 things locate, convert and add zoneId

Do you want to change the split of the functionality for all of these existing cases? or have I missed what you are proposing/unhappy about

}
}

final class EventTimestampSource implements TimestampSource {
private final ZoneId zoneId;
protected ZonedDateTime fromRawTime(final Object rawValue) {
if (rawValue == null) {
return null;
} else if (rawValue instanceof Long) {
return withZone((Long) rawValue);
} else if (rawValue instanceof Date) {
return withZone(((Date) rawValue).getTime());
} else if (rawValue instanceof ZonedDateTime) {
return (ZonedDateTime) rawValue;
} else if (rawValue instanceof Instant) {
return withZone(((Instant) rawValue).toEpochMilli());
}
return null;
}

protected EventTimestampSource(final ZoneId zoneId) {
this.zoneId = zoneId;
protected ZonedDateTime withZone(final long timestamp) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId);
}
}

final class WallclockTimestampSource extends SimpleTimestampSource {
WallclockTimestampSource(final ZoneId zoneId) {
super(zoneId, Type.WALLCLOCK, null);
}

@Override
public ZonedDateTime time(final SinkRecord record) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), zoneId);
return ZonedDateTime.now(zoneId);
}
}

final class EventTimestampSource extends SimpleTimestampSource {
EventTimestampSource(final ZoneId zoneId) {
super(zoneId, Type.EVENT, null);
}

@Override
public Type type() {
return Type.EVENT;
public ZonedDateTime time(final SinkRecord record) {
return withZone(record.timestamp());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config.extractors;

import org.apache.kafka.connect.sink.SinkRecord;

/**
* Extracts data from a {@link SinkRecord}. The actual data extracted is implementation specific
*/
public interface DataExtractor {

/**
* Extracts data from a {@link SinkRecord}.
*
* @param record the record to extract data from
* @return the extracted data
*/
Object extractDataFrom(final SinkRecord record);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config.extractors;

import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;

/**
* A DataExtractor that extracts the value of a header from a SinkRecord
* If there are multiple headers with the same ane, then the last occurrence will be used
* If the header does not exist, then the extraction will yield null
*/
public class HeaderValueExtractor implements DataExtractor {
private final String headerKey;

/**
* Create a new HeaderValueExtractor
*
* @param headerKey the key of the header to extract.
*/
public HeaderValueExtractor(final String headerKey) {
this.headerKey = headerKey;
}

public Object extractDataFrom(final SinkRecord record) {
final Header header = record.headers().lastWithName(headerKey);
return header == null ? null : header.value();
mkeskells marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading