diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 663bc3d99..85c4318fb 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -37,6 +37,7 @@
+
diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java
index 14200861f..7c51a0fc6 100644
--- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java
+++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java
@@ -16,13 +16,31 @@
package io.aiven.kafka.tieredstorage.storage;
+import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
public interface ObjectUploader {
/**
* @param inputStream content to upload. Not closed as part of the upload.
- * @param key path to an object within a storage backend.
+ * @param key destination path to an object within a storage backend.
* @return number of bytes uploaded
*/
long upload(InputStream inputStream, ObjectKey key) throws StorageBackendException;
+
+ /**
+ * Enable backend to use optimized uploading implementations based on source files
+ *
+ * @param path source path to the object to upload
+ * @param size size of the object to upload
+ * @param key destination path to an object within a storage backend
+ */
+ default long upload(Path path, int size, ObjectKey key) throws StorageBackendException {
+ try (final var inputStream = Files.newInputStream(path)) {
+ return upload(inputStream, key);
+ } catch (IOException e) {
+ throw new StorageBackendException("Error uploading file from path: " + path, e);
+ }
+ }
}
diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java
index a4a24a9d8..2eeb9187d 100644
--- a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java
+++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageMetricsTest.java
@@ -23,6 +23,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
@@ -34,6 +37,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
@@ -99,7 +103,7 @@ void setupStorage() {
}
@Test
- void metricsShouldBeReported() throws Exception {
+ void metricsShouldBeReported(@TempDir final Path tmpDir) throws Exception {
final byte[] data = new byte[PART_SIZE + 1];
final ObjectKey key = new TestObjectKey("x");
@@ -120,98 +124,118 @@ void metricsShouldBeReported() throws Exception {
assertThatThrownBy(() -> storage.upload(failingInputStream, key))
.hasRootCause(exception);
- final ObjectName segmentCopyPerSecName = ObjectName.getInstance(
+ final ObjectName s3ClientMetrics = ObjectName.getInstance(
"aiven.kafka.server.tieredstorage.s3:type=s3-client-metrics");
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "get-object-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "get-object-requests-total"))
.isEqualTo(2.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "get-object-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "get-object-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "get-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-requests-rate"))
.isEqualTo(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-requests-total"))
.isEqualTo(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-time-avg"))
.isEqualTo(Double.NaN);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "put-object-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-time-max"))
.isEqualTo(Double.NaN);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-object-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-object-requests-total"))
.isEqualTo(1.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-object-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-object-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-objects-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-objects-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-objects-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-objects-requests-total"))
.isEqualTo(1.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-objects-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-objects-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "delete-objects-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "delete-objects-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "create-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "create-multipart-upload-requests-total"))
.isEqualTo(2.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "create-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "create-multipart-upload-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "create-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "upload-part-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "upload-part-requests-total"))
.isEqualTo(2.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "upload-part-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "upload-part-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "upload-part-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "complete-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "complete-multipart-upload-requests-total"))
.isEqualTo(1.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "complete-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "complete-multipart-upload-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "complete-multipart-upload-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-rate"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "abort-multipart-upload-requests-rate"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-requests-total"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "abort-multipart-upload-requests-total"))
.isEqualTo(1.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-avg"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "abort-multipart-upload-time-avg"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
- assertThat(MBEAN_SERVER.getAttribute(segmentCopyPerSecName, "abort-multipart-upload-time-max"))
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "abort-multipart-upload-time-max"))
+ .asInstanceOf(DOUBLE)
+ .isGreaterThan(0.0);
+
+ final var tmpPath = tmpDir.resolve("test.log");
+ final var testContent = "test".getBytes(StandardCharsets.UTF_8);
+ Files.write(tmpPath, testContent);
+
+ final var anotherKey = new TestObjectKey("y");
+ storage.upload(tmpPath, testContent.length, anotherKey);
+
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-requests-rate"))
+ .asInstanceOf(DOUBLE)
+ .isGreaterThan(0.0);
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-requests-total"))
+ .asInstanceOf(DOUBLE)
+ .isEqualTo(1.0);
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-time-avg"))
+ .asInstanceOf(DOUBLE)
+ .isGreaterThan(0.0);
+ assertThat(MBEAN_SERVER.getAttribute(s3ClientMetrics, "put-object-time-max"))
.asInstanceOf(DOUBLE)
.isGreaterThan(0.0);
}
diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java
index 68fad748d..115adeb8e 100644
--- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java
+++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -37,6 +38,7 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
public class S3Storage implements StorageBackend {
@@ -62,6 +64,13 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St
}
}
+ @Override
+ public long upload(final Path path, final int size, final ObjectKey key) throws StorageBackendException {
+ final var request = PutObjectRequest.builder().bucket(bucketName).key(key.value()).build();
+ s3Client.putObject(request, path);
+ return size;
+ }
+
S3MultiPartOutputStream s3OutputStream(final ObjectKey key) {
return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client);
}