Skip to content

Commit

Permalink
refactor: decouple chunking disabling from file size
Browse files Browse the repository at this point in the history
Currently, TransformFinisher piggybacks on original file size to disable chunking.
This workaround was introduced when enabling transformations for individual indexes; but this logic has been recently replaced when concatenating indexes.
Now chunk indexes are used, and chunking is disabled not by nullifying the chunk index but by setting a chunk size equal to the transformed size.

By introducing a new flag "chunkingEnabled", the transformer logic can be cleaned up. This is needed for further changes in the transformer as we look into uploading directly from files.
  • Loading branch information
jeqo committed Apr 8, 2024
1 parent dc45dda commit 477d487
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,11 @@ ChunkIndex uploadSegmentLog(

try (final var logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) {
final var transformEnum = transformation(logSegmentInputStream, requiresCompression, maybeEncryptionKey);
final var transformFinisher = new TransformFinisher(
transformEnum,
remoteLogSegmentMetadata.segmentSizeInBytes()
);
final var transformFinisher = TransformFinisher.newBuilder(
transformEnum,
remoteLogSegmentMetadata.segmentSizeInBytes()
)
.build();

try (final var sis = transformFinisher.toInputStream()) {
final var bytes = uploader.upload(sis, fileKey);
Expand Down Expand Up @@ -455,7 +456,9 @@ InputStream transformIndex(final IndexType indexType,
() -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey)
);
}
final var transformFinisher = new TransformFinisher(transformEnum, size);
final var transformFinisher = TransformFinisher.newBuilder(transformEnum, size)
.withChunkingDisabled()
.build();
final var inputStream = transformFinisher.nextElement();
segmentIndexBuilder.add(indexType, singleChunk(transformFinisher.chunkIndex()).range().size());
return inputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ public class BaseTransformChunkEnumeration implements TransformChunkEnumeration

private byte[] chunk = null;

public BaseTransformChunkEnumeration(final InputStream inputStream) {
this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null");

this.originalChunkSize = 0;
}

public BaseTransformChunkEnumeration(final InputStream inputStream,
final int originalChunkSize) {
this.inputStream = Objects.requireNonNull(inputStream, "inputStream cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndexBuilder;
import io.aiven.kafka.tieredstorage.manifest.index.VariableSizeChunkIndexBuilder;

// TODO test transforms and detransforms with property-based tests

/**
* The transformation finisher.
*
Expand All @@ -40,30 +38,40 @@
public class TransformFinisher implements Enumeration<InputStream> {
private final TransformChunkEnumeration inner;
private final AbstractChunkIndexBuilder chunkIndexBuilder;
private final int originalFileSize;
private ChunkIndex chunkIndex = null;

public TransformFinisher(final TransformChunkEnumeration inner) {
this(inner, 0);
public static Builder newBuilder(final TransformChunkEnumeration inner, final int originalFileSize) {
return new Builder(inner, originalFileSize);
}

public TransformFinisher(final TransformChunkEnumeration inner, final int originalFileSize) {
private TransformFinisher(
final TransformChunkEnumeration inner,
final boolean chunkingEnabled,
final int originalFileSize
) {
this.inner = Objects.requireNonNull(inner, "inner cannot be null");
this.originalFileSize = originalFileSize;

if (originalFileSize < 0) {
throw new IllegalArgumentException(
"originalFileSize must be non-negative, " + originalFileSize + " given");
}
final int originalChunkSize = chunkingEnabled ? inner.originalChunkSize() : Integer.MAX_VALUE;
this.chunkIndexBuilder = chunkIndexBuilder(inner, originalChunkSize, originalFileSize);
}

private static AbstractChunkIndexBuilder chunkIndexBuilder(
final TransformChunkEnumeration inner,
final int originalChunkSize,
final int originalFileSize
) {
final Integer transformedChunkSize = inner.transformedChunkSize();
if (originalFileSize == 0) {
this.chunkIndexBuilder = null;
} else if (transformedChunkSize == null) {
this.chunkIndexBuilder = new VariableSizeChunkIndexBuilder(inner.originalChunkSize(), originalFileSize);
if (transformedChunkSize == null) {
return new VariableSizeChunkIndexBuilder(
originalChunkSize,
originalFileSize
);
} else {
this.chunkIndexBuilder = new FixedSizeChunkIndexBuilder(
inner.originalChunkSize(), originalFileSize, transformedChunkSize);
return new FixedSizeChunkIndexBuilder(
originalChunkSize,
originalFileSize,
transformedChunkSize
);
}
}

Expand All @@ -75,19 +83,17 @@ public boolean hasMoreElements() {
@Override
public InputStream nextElement() {
final var chunk = inner.nextElement();
if (chunkIndexBuilder != null) {
if (hasMoreElements()) {
this.chunkIndexBuilder.addChunk(chunk.length);
} else {
this.chunkIndex = this.chunkIndexBuilder.finish(chunk.length);
}
if (hasMoreElements()) {
this.chunkIndexBuilder.addChunk(chunk.length);
} else {
this.chunkIndex = this.chunkIndexBuilder.finish(chunk.length);
}

return new ByteArrayInputStream(chunk);
}

public ChunkIndex chunkIndex() {
if (chunkIndex == null && originalFileSize > 0) {
if (chunkIndex == null) {
throw new IllegalStateException("Chunk index was not built, was finisher used?");
}
return this.chunkIndex;
Expand All @@ -96,4 +102,30 @@ public ChunkIndex chunkIndex() {
public InputStream toInputStream() {
return new SequenceInputStream(this);
}

public static class Builder {
final TransformChunkEnumeration inner;
final Integer originalFileSize;
boolean chunkingEnabled = true;

public Builder(final TransformChunkEnumeration inner, final int originalFileSize) {
this.inner = inner;

if (originalFileSize < 0) {
throw new IllegalArgumentException(
"originalFileSize must be non-negative, " + originalFileSize + " given");
}

this.originalFileSize = originalFileSize;
}

public Builder withChunkingDisabled() {
this.chunkingEnabled = false;
return this;
}

public TransformFinisher build() {
return new TransformFinisher(inner, chunkingEnabled, originalFileSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,23 @@ class TransformFinisherTest {

@Test
void getIndexBeforeUsing() {
final TransformFinisher finisher = new TransformFinisher(new FakeDataEnumerator(3), 7);
final TransformFinisher finisher = TransformFinisher.newBuilder(new FakeDataEnumerator(3), 7)
.build();
assertThatThrownBy(() -> finisher.chunkIndex())
.isInstanceOf(IllegalStateException.class)
.hasMessage("Chunk index was not built, was finisher used?");
}

@Test
void nullInnerEnumeration() {
assertThatThrownBy(() -> new TransformFinisher(null, 100))
assertThatThrownBy(() -> TransformFinisher.newBuilder(null, 100).build())
.isInstanceOf(NullPointerException.class)
.hasMessage("inner cannot be null");
}

@Test
void negativeOriginalFileSize() {
assertThatThrownBy(() -> new TransformFinisher(inner, -1))
assertThatThrownBy(() -> TransformFinisher.newBuilder(inner, -1).build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("originalFileSize must be non-negative, -1 given");
}
Expand All @@ -66,7 +67,8 @@ void negativeOriginalFileSize() {
@MethodSource("provideForBuildIndexAndReturnCorrectInputStreams")
void buildIndexAndReturnCorrectInputStreams(final Integer transformedChunkSize,
final Class<ChunkIndex> indexType) throws IOException {
final TransformFinisher finisher = new TransformFinisher(new FakeDataEnumerator(transformedChunkSize), 7);
final TransformFinisher finisher = TransformFinisher.newBuilder(new FakeDataEnumerator(transformedChunkSize), 7)
.build();
assertThat(finisher.hasMoreElements()).isTrue();
assertThat(finisher.nextElement().readAllBytes()).isEqualTo(new byte[] {0, 1, 2});
assertThat(finisher.hasMoreElements()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,39 +70,48 @@ void compressionAndEncryption(final int chunkSize) throws IOException {

private void test(final int chunkSize, final boolean compression, final boolean encryption) throws IOException {
// Transform.
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
new ByteArrayInputStream(original), chunkSize);
if (compression) {
transformEnum = new CompressionChunkEnumeration(transformEnum);
}
if (encryption) {
transformEnum = new EncryptionChunkEnumeration(transformEnum, AesKeyAwareTest::encryptionCipherSupplier);
}
final var transformFinisher = chunkSize == 0
? new TransformFinisher(transformEnum)
: new TransformFinisher(transformEnum, ORIGINAL_SIZE);
final byte[] uploadedData;
final ChunkIndex chunkIndex;
try (final var sis = transformFinisher.toInputStream()) {
uploadedData = sis.readAllBytes();
chunkIndex = transformFinisher.chunkIndex();
}
try (final var inputStream = new ByteArrayInputStream(original)) {
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(inputStream, chunkSize);
if (compression) {
transformEnum = new CompressionChunkEnumeration(transformEnum);
}
if (encryption) {
transformEnum = new EncryptionChunkEnumeration(
transformEnum,
AesKeyAwareTest::encryptionCipherSupplier
);
}
final var transformBuilder = TransformFinisher.newBuilder(transformEnum, ORIGINAL_SIZE);
if (chunkSize == 0) {
transformBuilder.withChunkingDisabled();
}
final var transformFinisher = transformBuilder.build();
final byte[] uploadedData;
final ChunkIndex chunkIndex;
try (final var sis = transformFinisher.toInputStream()) {
uploadedData = sis.readAllBytes();
chunkIndex = transformFinisher.chunkIndex();
}

// Detransform.
DetransformChunkEnumeration detransformEnum = chunkIndex == null
? new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData))
: new BaseDetransformChunkEnumeration(new ByteArrayInputStream(uploadedData), chunkIndex.chunks());
if (encryption) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
}
if (compression) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final var detransformFinisher = new DetransformFinisher(detransformEnum);
try (final var sis = detransformFinisher.toInputStream()) {
final byte[] downloaded = sis.readAllBytes();
assertThat(downloaded).isEqualTo(original);
// Detransform.
try (final var uploadedStream = new ByteArrayInputStream(uploadedData)) {
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(
uploadedStream,
chunkIndex.chunks()
);
if (encryption) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum, ivSize, AesKeyAwareTest::decryptionCipherSupplier);
}
if (compression) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final var detransformFinisher = new DetransformFinisher(detransformEnum);
try (final var sis = detransformFinisher.toInputStream()) {
final byte[] downloaded = sis.readAllBytes();
assertThat(downloaded).isEqualTo(original);
}
}
}
}
}

0 comments on commit 477d487

Please sign in to comment.