Skip to content

Commit

Permalink
Add length to the Transformer to allow improvements to the byte array…
Browse files Browse the repository at this point in the history
… in the future.

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Jan 15, 2025
1 parent a453a27 commit 1ea3437
Show file tree
Hide file tree
Showing 23 changed files with 248 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

public class SourceCommonConfig extends CommonConfig {

private final SchemaRegistryFragment schemaRegistryFragment;
private final TransformerFragment transformerFragment;
private final SourceConfigFragment sourceConfigFragment;
private final FileNameFragment fileNameFragment;
private final OutputFormatFragment outputFormatFragment;

public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
super(definition, originals);
// Construct Fragments
schemaRegistryFragment = new SchemaRegistryFragment(this);
transformerFragment = new TransformerFragment(this);
sourceConfigFragment = new SourceConfigFragment(this);
fileNameFragment = new FileNameFragment(this);
outputFormatFragment = new OutputFormatFragment(this);
Expand All @@ -45,18 +45,18 @@ public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
}

private void validate() {
schemaRegistryFragment.validate();
transformerFragment.validate();
sourceConfigFragment.validate();
fileNameFragment.validate();
outputFormatFragment.validate();
}

public InputFormat getInputFormat() {
return schemaRegistryFragment.getInputFormat();
return transformerFragment.getInputFormat();
}

public String getSchemaRegistryUrl() {
return schemaRegistryFragment.getSchemaRegistryUrl();
return transformerFragment.getSchemaRegistryUrl();
}

public String getTargetTopics() {
Expand All @@ -79,11 +79,11 @@ public int getMaxPollRecords() {
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
return TransformerFactory.getTransformer(transformerFragment.getInputFormat());
}

public int getByteArrayTransformerMaxBufferSize() {
return schemaRegistryFragment.getByteArrayTransformerMaxBufferSize();
public int getTransformerMaxBufferSize() {
return transformerFragment.getTransformerMaxBufferSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,44 @@

import io.aiven.kafka.connect.common.source.input.InputFormat;

public final class SchemaRegistryFragment extends ConfigFragment {
private static final String SCHEMAREGISTRY_GROUP = "Schema registry group";
public final class TransformerFragment extends ConfigFragment {
private static final String TRANSFORMER_GROUP = "Transformer group";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
public static final String AVRO_VALUE_SERIALIZER = "value.serializer";
public static final String INPUT_FORMAT_KEY = "input.format";
public static final String SCHEMAS_ENABLE = "schemas.enable";
public static final String BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE = "byte.array.transformer.max.buffer.size";
private static final int MAX_BUFFER_SIZE = 4096;
public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size";
private static final int DEFAULT_MAX_BUFFER_SIZE = 4096;

/**
* Construct the ConfigFragment..
*
* @param cfg
* the configuration that this fragment is associated with.
*/
public SchemaRegistryFragment(final AbstractConfig cfg) {
public TransformerFragment(final AbstractConfig cfg) {
super(cfg);
}

public static ConfigDef update(final ConfigDef configDef) {
int srCounter = 0;
int transformerCounter = 0;
configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++,
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL);
configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL",
SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
configDef.define(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, MAX_BUFFER_SIZE,
"Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE,
new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM,
"Max Size of the byte buffer when using the BYTE Transformer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);

configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
"Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER);
return configDef;
Expand All @@ -80,17 +79,18 @@ public Class<?> getAvroValueSerializer() {
return cfg.getClass(AVRO_VALUE_SERIALIZER);
}

public int getByteArrayTransformerMaxBufferSize() {
return cfg.getInt(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE);
public int getTransformerMaxBufferSize() {
return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE);
}

private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {

final int size = (int) value;
if (size <= 0) {
throw new ConfigException(String.format("%s must be larger then 0", name));
// ConfigDef will throw an error if this is not an int that is supplied
if ((int) value <= 0) {
throw new ConfigException(
String.format("%s must be larger then 0 and less then %s", name, Integer.MAX_VALUE));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,8 +55,9 @@ public void configureValueConverter(final Map<String, String> config, final Sour
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private DataFileStream<GenericRecord> dataFileStream;
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ByteArrayTransformer chunks an entire object into a maximum size specified by the
* {@link io.aiven.kafka.connect.common.config.TransformerFragment#TRANSFORMER_MAX_BUFFER_SIZE} configuration option.
* This will split large files into multiple records and each record will have the same key.
*/
public class ByteArrayTransformer extends Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);

Expand All @@ -42,10 +47,17 @@ public void configureValueConverter(final Map<String, String> config, final Sour
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
if (streamLength == 0) {
LOGGER.warn(
"Object sent for processing has an invalid streamLength of {}, object is empty returning an empty spliterator.",
streamLength);
return emptySpliterator(inputStreamIOSupplier);
}
// The max buffer size for the byte array the default is 4096 if not set by the user.
final int maxBufferSize = sourceConfig.getByteArrayTransformerMaxBufferSize();
final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize();
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected InputStream inputOpened(final InputStream input) {
Expand Down Expand Up @@ -79,6 +91,32 @@ protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
};
}

/**
* This method returns an empty spliterator when an empty input stream is supplied to be split
*
* @param inputStreamIOSupplier
* The empty input stream that was supplied
* @return an Empty spliterator to return to the calling method.
*/
private static StreamSpliterator emptySpliterator(final IOSupplier<InputStream> inputStreamIOSupplier) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
return false;
}

@Override
protected void doClose() {
// nothing to do
}

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
return InputStream.nullInputStream();
}
};
}

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final SourceCommonConfig sourceConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public void configureValueConverter(final Map<String, String> config, final Sour
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
BufferedReader reader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -66,8 +66,9 @@ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topi
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {

return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@

public abstract class Transformer {

public final static long UNKNOWN_STREAM_LENGTH = -1;

public abstract void configureValueConverter(Map<String, String> config, SourceCommonConfig sourceConfig);

public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition, final SourceCommonConfig sourceConfig,
final long skipRecords) {
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig, final long skipRecords) {

final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition,
sourceConfig);
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, topic,
topicPartition, sourceConfig);
return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords);
}

Expand All @@ -49,16 +51,19 @@ public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inp
*
* @param inputStreamIOSupplier
* the input stream supplier.
* @param streamLength
* the length of the input stream, {@link #UNKNOWN_STREAM_LENGTH} may be used to specify a stream with an
* unknown length, streams of length zero will log an error and return an empty stream
* @param topic
* the topic.
* @param topicPartition
* the partition.
* @param sourceConfig
* the source configuraiton.
* the source configuration.
* @return a StreamSpliterator instance.
*/
protected abstract StreamSpliterator createSpliterator(IOSupplier<InputStream> inputStreamIOSupplier, String topic,
int topicPartition, SourceCommonConfig sourceConfig);
protected abstract StreamSpliterator createSpliterator(IOSupplier<InputStream> inputStreamIOSupplier,
long streamLength, String topic, int topicPartition, SourceCommonConfig sourceConfig);

public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, SourceCommonConfig sourceConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,34 @@

import org.junit.jupiter.api.Test;

public class SchemaRegistryFragmentTest {// NOPMD
class TransformerFragmentTest {

@Test
void validateCorrectBufferSizeIsAccepted() {
final int bufferSize = 50;
final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef());
final ConfigDef configDef = TransformerFragment.update(new ConfigDef());
final Map<String, Object> props = new HashMap<>();
props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, bufferSize);
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, bufferSize);

final SchemaRegistryFragment schemaReg = new SchemaRegistryFragment(new AbstractConfig(configDef, props));
assertThat(schemaReg.getByteArrayTransformerMaxBufferSize()).isEqualTo(bufferSize);
final TransformerFragment schemaReg = new TransformerFragment(new AbstractConfig(configDef, props));
assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize);
}

@Test
void validateInvalidBufferSizeThrowsConfigException() {
final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef());
final ConfigDef configDef = TransformerFragment.update(new ConfigDef());
final Map<String, Object> props = new HashMap<>();
props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, 0);
// Too small
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, 0);

assertThatThrownBy(() -> new SchemaRegistryFragment(new AbstractConfig(configDef, props)))
assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props)))
.isInstanceOf(ConfigException.class);
// Too large
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, Integer.MAX_VALUE + "1");
assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props)))
.isInstanceOf(ConfigException.class)
.hasMessage(
"Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT");
}

}
Loading

0 comments on commit 1ea3437

Please sign in to comment.