diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index ddd2ebf0f..0ddeeeeef 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -306,10 +306,11 @@ ChunkIndex uploadSegmentLog( try (final var logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { final var transformEnum = transformation(logSegmentInputStream, requiresCompression, maybeEncryptionKey); - final var transformFinisher = new TransformFinisher( - transformEnum, - remoteLogSegmentMetadata.segmentSizeInBytes() - ); + final var transformFinisher = TransformFinisher.newBuilder( + transformEnum, + remoteLogSegmentMetadata.segmentSizeInBytes() + ) + .build(); try (final var sis = transformFinisher.toInputStream()) { final var bytes = uploader.upload(sis, fileKey); @@ -455,7 +456,9 @@ InputStream transformIndex(final IndexType indexType, () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) ); } - final var transformFinisher = new TransformFinisher(transformEnum, size); + final var transformFinisher = TransformFinisher.newBuilder(transformEnum, size) + .withChunkingDisabled() + .build(); final var inputStream = transformFinisher.nextElement(); segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size()); return inputStream; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java index 3c781eb81..83c289fb5 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/BaseTransformChunkEnumeration.java @@ -32,12 +32,6 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration private byte[] chunk = null; - public BaseTransformChunkEnumeration(final InputStream inputStream) { - this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); - - this.originalChunkSize = 0; - } - public BaseTransformChunkEnumeration(final InputStream inputStream, final int originalChunkSize) { this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null"); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java index 6f1d9c50f..201a8d6bf 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/transform/TransformFinisher.java @@ -27,8 +27,6 @@ import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder; import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder; -// TODO test transforms and detransforms with property-based tests - /** * The transformation finisher. * @@ -40,30 +38,40 @@ public class TransformFinisher implements Enumeration { private final TransformChunkEnumeration inner; private final AbstractChunkIndexBuilder chunkIndexBuilder; - private final int originalFileSize; private ChunkIndex chunkIndex = null; - public TransformFinisher(final TransformChunkEnumeration inner) { - this(inner, 0); + public static Builder newBuilder(final TransformChunkEnumeration inner, final int originalFileSize) { + return new Builder(inner, originalFileSize); } - public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) { + private TransformFinisher( + final TransformChunkEnumeration inner, + final boolean chunkingEnabled, + final int originalFileSize + ) { this.inner = Objects.requireNonNull(inner, "inner cannot be null"); - this.originalFileSize = originalFileSize; - if (originalFileSize < 0) { - throw new IllegalArgumentException( - "originalFileSize must be non-negative, " + originalFileSize + " given"); - } + final int originalChunkSize = chunkingEnabled ? inner.originalChunkSize() : Integer.MAX_VALUE; + this.chunkIndexBuilder = chunkIndexBuilder(inner, originalChunkSize, originalFileSize); + } + private static AbstractChunkIndexBuilder chunkIndexBuilder( + final TransformChunkEnumeration inner, + final int originalChunkSize, + final int originalFileSize + ) { final Integer transformedChunkSize = inner.transformedChunkSize(); - if (originalFileSize == 0) { - this.chunkIndexBuilder = null; - } else if (transformedChunkSize == null) { - this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize); + if (transformedChunkSize == null) { + return new VariableSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize + ); } else { - this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder( - inner.originalChunkSize(), originalFileSize, transformedChunkSize); + return new FixedSizeChunkIndexBuilder( + originalChunkSize, + originalFileSize, + transformedChunkSize + ); } } @@ -75,19 +83,17 @@ public boolean hasMoreElements() { @Override public InputStream nextElement() { final var chunk = inner.nextElement(); - if (chunkIndexBuilder != null) { - if (hasMoreElements()) { - this.chunkIndexBuilder.addChunk(chunk.length); - } else { - this.chunkIndex = this.chunkIndexBuilder.finish(chunk.length); - } + if (hasMoreElements()) { + this.chunkIndexBuilder.addChunk(chunk.length); + } else { + this.chunkIndex = this.chunkIndexBuilder.finish(chunk.length); } return new ByteArrayInputStream(chunk); } public ChunkIndex chunkIndex() { - if (chunkIndex == null && originalFileSize > 0) { + if (chunkIndex == null) { throw new IllegalStateException("Chunk index was not built, was finisher used?"); } return this.chunkIndex; @@ -96,4 +102,30 @@ public ChunkIndex chunkIndex() { public InputStream toInputStream() { return new SequenceInputStream(this); } + + public static class Builder { + final TransformChunkEnumeration inner; + final Integer originalFileSize; + boolean chunkingEnabled = true; + + public Builder(final TransformChunkEnumeration inner, final int originalFileSize) { + this.inner = inner; + + if (originalFileSize < 0) { + throw new IllegalArgumentException( + "originalFileSize must be non-negative, " + originalFileSize + " given"); + } + + this.originalFileSize = originalFileSize; + } + + public Builder withChunkingDisabled() { + this.chunkingEnabled = false; + return this; + } + + public TransformFinisher build() { + return new TransformFinisher(inner, chunkingEnabled, originalFileSize); + } + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformFinisherTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformFinisherTest.java index 0d485988a..9b803df99 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformFinisherTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformFinisherTest.java @@ -42,7 +42,8 @@ class TransformFinisherTest { @Test void getIndexBeforeUsing() { - final TransformFinisher finisher = new TransformFinisher(new FakeDataEnumerator(3), 7); + final TransformFinisher finisher = TransformFinisher.newBuilder(new FakeDataEnumerator(3), 7) + .build(); assertThatThrownBy(() -> finisher.chunkIndex()) .isInstanceOf(IllegalStateException.class) .hasMessage("Chunk index was not built, was finisher used?"); @@ -50,14 +51,14 @@ void getIndexBeforeUsing() { @Test void nullInnerEnumeration() { - assertThatThrownBy(() -> new TransformFinisher(null, 100)) + assertThatThrownBy(() -> TransformFinisher.newBuilder(null, 100).build()) .isInstanceOf(NullPointerException.class) .hasMessage("inner cannot be null"); } @Test void negativeOriginalFileSize() { - assertThatThrownBy(() -> new TransformFinisher(inner, -1)) + assertThatThrownBy(() -> TransformFinisher.newBuilder(inner, -1).build()) .isInstanceOf(IllegalArgumentException.class) .hasMessage("originalFileSize must be non-negative, -1 given"); } @@ -66,7 +67,8 @@ void negativeOriginalFileSize() { @MethodSource("provideForBuildIndexAndReturnCorrectInputStreams") void buildIndexAndReturnCorrectInputStreams(final Integer transformedChunkSize, final Class indexType) throws IOException { - final TransformFinisher finisher = new TransformFinisher(new FakeDataEnumerator(transformedChunkSize), 7); + final TransformFinisher finisher = TransformFinisher.newBuilder(new FakeDataEnumerator(transformedChunkSize), 7) + .build(); assertThat(finisher.hasMoreElements()).isTrue(); assertThat(finisher.nextElement().readAllBytes()).isEqualTo(new byte[] {0, 1, 2}); assertThat(finisher.hasMoreElements()).isTrue(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java index cad7355d0..46313d201 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/TransformsEndToEndTest.java @@ -70,39 +70,48 @@ void compressionAndEncryption(final int chunkSize) throws IOException { private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException { // Transform. - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - new ByteArrayInputStream(original), chunkSize); - if (compression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryption) { - transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier); - } - final var transformFinisher = chunkSize == 0 - ? new TransformFinisher(transformEnum) - : new TransformFinisher(transformEnum, ORIGINAL_SIZE); - final byte[] uploadedData; - final ChunkIndex chunkIndex; - try (final var sis = transformFinisher.toInputStream()) { - uploadedData = sis.readAllBytes(); - chunkIndex = transformFinisher.chunkIndex(); - } + try (final var inputStream = new ByteArrayInputStream(original)) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize); + if (compression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryption) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + AesKeyAwareTest::encryptionCipherSupplier + ); + } + final var transformBuilder = TransformFinisher.newBuilder(transformEnum, ORIGINAL_SIZE); + if (chunkSize == 0) { + transformBuilder.withChunkingDisabled(); + } + final var transformFinisher = transformBuilder.build(); + final byte[] uploadedData; + final ChunkIndex chunkIndex; + try (final var sis = transformFinisher.toInputStream()) { + uploadedData = sis.readAllBytes(); + chunkIndex = transformFinisher.chunkIndex(); + } - // Detransform. - DetransformChunkEnumeration detransformEnum = chunkIndex == null - ? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData)) - : new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks()); - if (encryption) { - detransformEnum = new DecryptionChunkEnumeration( - detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); - } - if (compression) { - detransformEnum = new DecompressionChunkEnumeration(detransformEnum); - } - final var detransformFinisher = new DetransformFinisher(detransformEnum); - try (final var sis = detransformFinisher.toInputStream()) { - final byte[] downloaded = sis.readAllBytes(); - assertThat(downloaded).isEqualTo(original); + // Detransform. + try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) { + DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration( + uploadedStream, + chunkIndex.chunks() + ); + if (encryption) { + detransformEnum = new DecryptionChunkEnumeration( + detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier); + } + if (compression) { + detransformEnum = new DecompressionChunkEnumeration(detransformEnum); + } + final var detransformFinisher = new DetransformFinisher(detransformEnum); + try (final var sis = detransformFinisher.toInputStream()) { + final byte[] downloaded = sis.readAllBytes(); + assertThat(downloaded).isEqualTo(original); + } + } } } }