-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow users to vary the size of the byteArrayBuffer in source connectors #386
base: s3-source-release
Are you sure you want to change the base?
Allow users to vary the size of the byteArrayBuffer in source connectors #386
Conversation
Signed-off-by: Aindriu Lavelle <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good
commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
Outdated
Show resolved
Hide resolved
commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
Show resolved
Hide resolved
commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Show resolved
Hide resolved
1821406
to
e45686e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. A few minor comments.
commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
Show resolved
Hide resolved
d662053
to
2348113
Compare
commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java
Outdated
Show resolved
Hide resolved
|
||
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition, | ||
sourceConfig); | ||
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, topic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, topic, | |
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, objectSize, topic, |
Stream does not have a length in general. Here iterator is sending object size, and in this repo, we are dealing with objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is referring to the stream which has been input into the createSpliterator method.
This is commons code and is not specific to the object size but the stream length.
changing the name would be a misnomer i feel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This repo mainly deals with cloud storage objects. Even if it's common, and when we use it for azure or gcp, I think objectSize or something similar looks better to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll let @Claudenw be the deciding vote on that one 👍
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Show resolved
Hide resolved
try { | ||
final int bytesRead = IOUtils.read(inputStream, buffer); | ||
if (bytesRead == 0) { | ||
return false; | ||
} | ||
if (bytesRead < MAX_BUFFER_SIZE) { | ||
if (bytesRead < maxBufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify this condition
byte[] data = (bytesRead < maxBufferSize)
? Arrays.copyOf(buffer, bytesRead)
: buffer;
action.accept(new SchemaAndValue(null, data));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the readability of the code here is to be honest worth keeping the way it is, it is much more understandable what is happening
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a simple ternary operator. Should be ok to replace.
commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
Outdated
Show resolved
Hide resolved
commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java
Show resolved
Hide resolved
commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have a few minor comments.
commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java
Show resolved
Hide resolved
2348113
to
1ea3437
Compare
… in the future. Signed-off-by: Aindriu Lavelle <[email protected]>
1ea3437
to
f6b6adf
Compare
This allows users to decide how large they wish to chunk the byte stream in the source connectors.
One change was to use the SourceCommonConfig instead of the abstractConfig in the transformers so that the new calls to get the maxbytebuffer would be available.
It also adds the stream length to the Transformer which should lead to improvements specifically in the ByteArrayTransformer.