Skip to content

Commit

Permalink
Adding the context to the source connectors (#388)
Browse files Browse the repository at this point in the history
Updating and refactoring the distribution strategies based on feedback.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven authored Jan 20, 2025
1 parent 343f23c commit f68b0a4
Show file tree
Hide file tree
Showing 20 changed files with 921 additions and 629 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
public class CommonConfig extends AbstractConfig {
protected static final String GROUP_COMPRESSION = "File Compression";
protected static final String GROUP_FORMAT = "Format";
public static final String TASK_ID = "task.id";
public static final String MAX_TASKS = "tasks.max";

/**
* @deprecated No longer needed.
Expand Down Expand Up @@ -58,4 +60,25 @@ public Long getKafkaRetryBackoffMs() {
return new BackoffPolicyConfig(this).getKafkaRetryBackoffMs();
}

/**
*
* Get the maximum number of tasks that should be run by this connector configuration Max Tasks is set within the
* Kafka Connect framework and so is retrieved slightly differently in ConnectorConfig.java
*
* @return The maximum number of tasks that should be run by this connector configuration
*/
public int getMaxTasks() {
// TODO when Connect framework is upgraded it will be possible to retrieve this information from the configDef
// as tasksMax
return Integer.parseInt(this.originalsStrings().get(MAX_TASKS));
}
/**
* Get the task id for this configuration
*
* @return The task id for this configuration
*/
public int getTaskId() {
return Integer.parseInt(this.originalsStrings().get(TASK_ID));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.DistributionType;

public class SourceCommonConfig extends CommonConfig {

Expand Down Expand Up @@ -70,8 +70,8 @@ public ErrorsTolerance getErrorsTolerance() {
return sourceConfigFragment.getErrorsTolerance();
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return sourceConfigFragment.getObjectDistributionStrategy();
public DistributionType getDistributionType() {
return sourceConfigFragment.getDistributionType();
}

public int getMaxPollRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package io.aiven.kafka.connect.common.config;

import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;
import static io.aiven.kafka.connect.common.source.task.DistributionType.OBJECT_HASH;

import java.util.Arrays;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.DistributionType;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -36,7 +38,7 @@ public final class SourceConfigFragment extends ConfigFragment {
public static final String TARGET_TOPICS = "topics";
public static final String ERRORS_TOLERANCE = "errors.tolerance";

public static final String OBJECT_DISTRIBUTION_STRATEGY = "object.distribution.strategy";
public static final String DISTRIBUTION_TYPE = "distribution.type";

/**
* Construct the ConfigFragment..
Expand Down Expand Up @@ -74,13 +76,15 @@ public static ConfigDef update(final ConfigDef configDef) {
configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(),
configDef.define(DISTRIBUTION_TYPE, ConfigDef.Type.STRING, OBJECT_HASH.name(),
new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM,
"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME,
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
// UnusedAssignment
"Based on tasks.max config and the type of strategy selected, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : "
+ Arrays.stream(DistributionType.values())
.map(DistributionType::value)
.collect(Collectors.joining(", ")),
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, DISTRIBUTION_TYPE); // NOPMD
// UnusedAssignment

return configDef;
}
Expand All @@ -105,8 +109,8 @@ public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(cfg.getString(OBJECT_DISTRIBUTION_STRATEGY));
public DistributionType getDistributionType() {
return DistributionType.forName(cfg.getString(DISTRIBUTION_TYPE));
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
Expand All @@ -126,7 +130,7 @@ public void ensureValid(final String name, final Object value) {
final String objectDistributionStrategy = (String) value;
if (StringUtils.isNotBlank(objectDistributionStrategy)) {
// This will throw an Exception if not a valid value.
ObjectDistributionStrategy.forName(objectDistributionStrategy);
DistributionType.forName(objectDistributionStrategy);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,22 @@

import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.source.task.Context;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* FilePatternUtils allows the construction of a regex pattern to extract the
* {@link io.aiven.kafka.connect.common.source.task.Context Context} from an Object Key.
*
*/
public final class FilePatternUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(FilePatternUtils.class);
public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String PATTERN_START_OFFSET_KEY = "startOffset"; // no undercore allowed as it breaks the regex.
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
Expand All @@ -36,20 +46,47 @@ public final class FilePatternUtils {
// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String START_OFFSET_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_START_OFFSET_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
public static final String START_OFFSET = "Start offset";

private FilePatternUtils() {
// hidden
final Pattern pattern;
private final boolean startOffsetConfigured;
private final boolean partitionConfigured;
private final boolean topicConfigured;

/**
* Creates an instance of FilePatternUtils, this constructor is used to configure the Pattern that is used to
* extract Context from Object 'K'.
*
* @param pattern
*/
public FilePatternUtils(final String pattern) {
this.pattern = configurePattern(pattern);
startOffsetConfigured = pattern.contains(START_OFFSET_PATTERN);
partitionConfigured = pattern.contains(PARTITION_PATTERN);
topicConfigured = pattern.contains(TOPIC_PATTERN);
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));

/**
* Sets a Regex Pattern based on initial configuration that allows group regex to be used to extract information
* from the toString() of Object K which is passed in for Context extraction.
*
* @param expectedSourceNameFormat
* This is a string in the expected compatible format which will allow object name or keys to have unique
* information such as partition number, topic name, offset and timestamp information.
* @return A pattern which is configured to allow extraction of the key information from object names and keys.
*/
private Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null) {
throw new ConfigException(
"Source name format is missing please configure the expected source to include the partition pattern.");
}

// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN,
START_OFFSET_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
Expand All @@ -62,26 +99,71 @@ public static Pattern configurePattern(final String expectedSourceNameFormat) {
}
}

public static Optional<String> getTopic(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY));
public <K extends Comparable<K>> Optional<Context<K>> process(final K sourceName) {
final Optional<Matcher> matcher = fileMatches(sourceName.toString());
if (matcher.isPresent()) {
final Context<K> ctx = new Context<>(sourceName);
getTopic(matcher.get(), sourceName.toString()).ifPresent(ctx::setTopic);
getPartitionId(matcher.get(), sourceName.toString()).ifPresent(ctx::setPartition);
getOffset(matcher.get(), sourceName.toString()).ifPresent(ctx::setOffset);
return Optional.of(ctx);
}
return Optional.empty();

}

private Optional<Matcher> fileMatches(final String sourceName) {
return matchPattern(sourceName);
}

public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).flatMap(matcher -> {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (NumberFormatException e) {
return Optional.empty();
private Optional<String> getTopic(final Matcher matcher, final String sourceName) {

try {
return Optional.of(matcher.group(PATTERN_TOPIC_KEY));
} catch (IllegalArgumentException ex) {
// It is possible that when checking for the group it does not match and returns an
// illegalArgumentException
if (topicConfigured) {
LOGGER.warn("Unable to extract Topic from {} and 'topics' not configured.", sourceName);
}
});
return Optional.empty();
}

}

private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) {
if (filePattern == null || sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
private Optional<Integer> getPartitionId(final Matcher matcher, final String sourceName) {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (IllegalArgumentException e) {
// It is possible that when checking for the group it does not match and returns an
// illegalStateException, Number format exception is also covered by this in this case.
if (partitionConfigured) {
LOGGER.warn("Unable to extract Partition id from {}.", sourceName);
}
return Optional.empty();
}

}

private Optional<Integer> getOffset(final Matcher matcher, final String sourceName) {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_START_OFFSET_KEY)));
} catch (IllegalArgumentException e) {
// It is possible that when checking for the group it does not match and returns an
// illegalStateException, Number format exception is also covered by this in this case.
if (startOffsetConfigured) {
LOGGER.warn("Unable to extract start offset from {}.", sourceName);
}
return Optional.empty();
}

final Matcher matcher = filePattern.matcher(sourceName);
}

private Optional<Matcher> matchPattern(final String sourceName) {
if (sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
}
final Matcher matcher = pattern.matcher(sourceName);
return matcher.find() ? Optional.of(matcher) : Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import java.util.Optional;

/**
* A Context which captures all the details about the source which are required to successfully send a source record
* onto Kafka
*
* @param <K>
* is is the type/class of the key unique to the object the context is being created about
*/
public class Context<K extends Comparable<K>> {

private String topic;
private Integer partition;
private Integer offset;
private K storageKey;

public Context(final K storageKey) {

this.storageKey = storageKey;
}

public Optional<String> getTopic() {
return Optional.ofNullable(topic);
}

public void setTopic(final String topic) {
this.topic = topic;
}

public Optional<Integer> getPartition() {
return Optional.ofNullable(partition);
}

public void setPartition(final Integer partition) {
this.partition = partition;
}

public Optional<K> getStorageKey() {
return Optional.ofNullable(storageKey);
}

public void setStorageKey(final K storageKey) {
this.storageKey = storageKey;
}

public Optional<Integer> getOffset() {
return Optional.ofNullable(offset);
}

public void setOffset(final Integer offset) {
this.offset = offset;
}
}
Loading

0 comments on commit f68b0a4

Please sign in to comment.