diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java index 94d036a0c..24dfd3828 100644 --- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageTest.java @@ -81,6 +81,7 @@ protected StorageBackend storage() { "aws.access.key.id", LOCALSTACK.getAccessKey(), "aws.secret.access.key", LOCALSTACK.getSecretKey(), "s3.path.style.access.enabled", true, + "s3.multipart.upload.direct.buffers", false, "s3.multipart.upload.part.size", PART_SIZE ); s3Storage.configure(configs); diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java index af5806698..f69b00cba 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java @@ -66,12 +66,17 @@ public class S3MultiPartOutputStream extends OutputStream { public S3MultiPartOutputStream(final String bucketName, final ObjectKey key, final int partSize, - final S3Client client) { + final S3Client client, + final boolean directAllocation) { this.bucketName = bucketName; this.key = key; this.client = client; this.partSize = partSize; - this.partBuffer = ByteBuffer.allocate(partSize); + if (directAllocation) { + this.partBuffer = ByteBuffer.allocateDirect(partSize); + } else { + this.partBuffer = ByteBuffer.allocate(partSize); + } final CreateMultipartUploadRequest initialRequest = CreateMultipartUploadRequest.builder().bucket(bucketName) .key(key.value()).build(); final CreateMultipartUploadResponse initiateResult = client.createMultipartUpload(initialRequest); diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index 68fad748d..f1e81bfe2 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -44,12 +44,15 @@ public class S3Storage implements StorageBackend { private String bucketName; private int partSize; + private boolean multipartDirectBuffers; + @Override public void configure(final Map configs) { final S3StorageConfig config = new S3StorageConfig(configs); this.s3Client = S3ClientBuilder.build(config); this.bucketName = config.bucketName(); this.partSize = config.uploadPartSize(); + this.multipartDirectBuffers = config.multipartDirectBuffers(); } @Override @@ -63,7 +66,7 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St } S3MultiPartOutputStream s3OutputStream(final ObjectKey key) { - return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client); + return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client, multipartDirectBuffers); } @Override diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java index cd13db260..6811f0f62 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java @@ -58,10 +58,15 @@ public class S3StorageConfig extends AbstractConfig { private static final String S3_MULTIPART_UPLOAD_PART_SIZE_DOC = "Size of parts in bytes to use when uploading. " + "All parts but the last one will have this size. " + "Valid values: between 5MiB and 2GiB"; + + public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG = "s3.multipart.upload.direct.buffers"; + public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC = + "Allocate multipart upload buffers as direct buffers (off-heap)"; + static final int S3_MULTIPART_UPLOAD_PART_SIZE_MIN = 5 * 1024 * 1024; // 5MiB static final int S3_MULTIPART_UPLOAD_PART_SIZE_MAX = Integer.MAX_VALUE; static final int S3_MULTIPART_UPLOAD_PART_SIZE_DEFAULT = S3_MULTIPART_UPLOAD_PART_SIZE_MIN; - + private static final String S3_API_CALL_TIMEOUT_CONFIG = "s3.api.call.timeout"; private static final String S3_API_CALL_TIMEOUT_DOC = "AWS S3 API call timeout in milliseconds"; private static final String S3_API_CALL_ATTEMPT_TIMEOUT_CONFIG = "s3.api.call.attempt.timeout"; @@ -120,6 +125,12 @@ public class S3StorageConfig extends AbstractConfig { null, ConfigDef.Importance.LOW, S3_PATH_STYLE_ENABLED_DOC) + .define( + S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG, + ConfigDef.Type.BOOLEAN, + null, + ConfigDef.Importance.LOW, + S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC) .define( S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG, ConfigDef.Type.INT, @@ -261,6 +272,10 @@ public Boolean pathStyleAccessEnabled() { return getBoolean(S3_PATH_STYLE_ENABLED_CONFIG); } + public Boolean multipartDirectBuffers() { + return getBoolean(S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG); + } + public int uploadPartSize() { return getInt(S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG); } diff --git a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java index d95824584..3e23e4dfd 100644 --- a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java +++ b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java @@ -83,7 +83,7 @@ void sendAbortForAnyExceptionWhileWriting() { when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))) .thenThrow(testException); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3, false); assertThatThrownBy(() -> out.write(new byte[] {1, 2, 3})) .isInstanceOf(IOException.class) .hasRootCause(testException); @@ -105,7 +105,7 @@ void sendAbortForAnyExceptionWhenClosingUpload() throws Exception { when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))) .thenThrow(RuntimeException.class); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false); final byte[] buffer = new byte[5]; random.nextBytes(buffer); @@ -132,7 +132,7 @@ void sendAbortForAnyExceptionWhenClosingComplete() throws Exception { when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) .thenThrow(RuntimeException.class); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false); final byte[] buffer = new byte[5]; random.nextBytes(buffer); @@ -159,7 +159,7 @@ void writesOneByte() throws Exception { when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) .thenReturn(CompleteMultipartUploadResponse.builder().eTag("SOME_ETAG").build()); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false); out.write(1); out.close(); @@ -197,7 +197,7 @@ void writesMultipleMessages() throws Exception { .thenReturn(CompleteMultipartUploadResponse.builder().build()); final List expectedMessagesList = new ArrayList<>(); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3, false); for (int i = 0; i < 3; i++) { random.nextBytes(message); out.write(message, 0, message.length); @@ -257,7 +257,7 @@ void writesTailMessages() throws Exception { final byte[] expectedFullMessage = new byte[messageSize + 10]; final byte[] expectedTailMessage = new byte[10]; - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3, false); final byte[] message = new byte[messageSize]; random.nextBytes(message); out.write(message); @@ -288,7 +288,7 @@ void writesTailMessages() throws Exception { @Test void sendAbortIfNoWritingHappened() throws IOException { - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false); out.close(); verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture()); @@ -299,7 +299,7 @@ void sendAbortIfNoWritingHappened() throws IOException { @Test void failWhenUploadingPartAfterStreamIsClosed() throws IOException { - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false); out.close(); verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());