diff --git a/README.md b/README.md index 7f6fd17a7..5028b377b 100644 --- a/README.md +++ b/README.md @@ -196,11 +196,11 @@ The cache is able to asynchronously prefetch next chunks, up to the specified nu ⚠️ This is an experimental feature subject for future changes. -| Object storage | Supported | Host name resolution | -|----------------------|:---------------:|:--------------------:| -| AWS S3 | ❌ (in progress) | | -| Azure Blob Storage | ✅ | Proxy-side | -| Google Cloud Storage | ✅ | Proxy-side | +| Object storage | Supported | Host name resolution | +|----------------------|:---------:|:--------------------:| +| AWS S3 | ✅ | Client-side | +| Azure Blob Storage | ✅ | Proxy-side | +| Google Cloud Storage | ✅ | Proxy-side | ## License diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerDirectTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerDirectTest.java new file mode 100644 index 000000000..c0c18cff4 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerDirectTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import org.junit.jupiter.api.BeforeAll; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; + +class S3MinioSingleBrokerDirectTest extends S3MinioSingleBrokerTest { + static final String BUCKET = "test-bucket-direct"; + + @BeforeAll + static void createBucket() { + s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); + } + + @BeforeAll + static void startKafka() throws Exception { + setupKafka(kafka -> rsmPluginBasicSetup(kafka) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)); + } + + @Override + protected String bucket() { + return BUCKET; + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerSocks5Test.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerSocks5Test.java new file mode 100644 index 000000000..4b57637a2 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerSocks5Test.java @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import org.junit.jupiter.api.BeforeAll; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; + +class S3MinioSingleBrokerSocks5Test extends S3MinioSingleBrokerTest { + static final String BUCKET = "test-bucket-socks5"; + + @BeforeAll + static void createBucket() { + s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); + } + + @BeforeAll + static void startKafka() throws Exception { + setupKafka(kafka -> rsmPluginBasicSetup(kafka) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_HOST", SOCKS5_NETWORK_ALIAS) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PORT", Integer.toString(SOCKS5_PORT)) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_USERNAME", SOCKS5_USER) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PASSWORD", SOCKS5_PASSWORD)); + } + + @Override + protected String bucket() { + return BUCKET; + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java index 439ccb18d..851ca933c 100644 --- a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java @@ -28,7 +28,7 @@ import org.junit.jupiter.api.BeforeAll; import org.testcontainers.Testcontainers; import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -38,45 +38,28 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; -public class S3MinioSingleBrokerTest extends SingleBrokerTest { - +abstract class S3MinioSingleBrokerTest extends SingleBrokerTest { static final int MINIO_PORT = 9000; + static final String MINIO_NETWORK_ALIAS = "minio"; + static final GenericContainer MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio")) .withCommand("server", "/data", "--console-address", ":9090") .withExposedPorts(MINIO_PORT) .withNetwork(NETWORK) - .withNetworkAliases("minio"); + .withNetworkAliases(MINIO_NETWORK_ALIAS); + static final String ACCESS_KEY_ID = "minioadmin"; static final String SECRET_ACCESS_KEY = "minioadmin"; static final String REGION = "us-east-1"; - static final String BUCKET = "test-bucket"; + + static final String MINIO_SERVER_URL = String.format("http://%s:%s", MINIO_NETWORK_ALIAS, MINIO_PORT); static S3Client s3Client; @BeforeAll - static void init() throws Exception { + static void init() { MINIO.start(); - final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT); - - createBucket(minioServerUrl); - - initializeS3Client(); - - setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", - "io.aiven.kafka.tieredstorage.storage.s3.S3Storage") - .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", - "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*") - .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET) - .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION) - .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true") - .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID) - .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY) - .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", minioServerUrl) - .dependsOn(MINIO)); - } - - private static void initializeS3Client() { final Integer mappedPort = MINIO.getFirstMappedPort(); Testcontainers.exposeHostPorts(mappedPort); s3Client = S3Client.builder() @@ -93,21 +76,6 @@ private static void initializeS3Client() { .forEach(bucket -> LOG.info("S3 bucket: {}", bucket.name())); } - private static void createBucket(final String minioServerUrl) { - final String cmd = - "/usr/bin/mc config host add local " - + minioServerUrl + " " + ACCESS_KEY_ID + " " + SECRET_ACCESS_KEY + " --api s3v4 &&" - + "/usr/bin/mc mb local/test-bucket;\n"; - - final GenericContainer mcContainer = new GenericContainer<>("minio/mc") - .withNetwork(NETWORK) - .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) - .withCreateContainerCmdModifier(containerCommand -> containerCommand - .withTty(true) - .withEntrypoint("/bin/sh", "-c", cmd)); - mcContainer.start(); - } - @AfterAll static void cleanup() { stopKafka(); @@ -117,16 +85,33 @@ static void cleanup() { cleanupStorage(); } + static KafkaContainer rsmPluginBasicSetup(final KafkaContainer container) { + container + .withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", + "io.aiven.kafka.tieredstorage.storage.s3.S3Storage") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", + "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", MINIO_SERVER_URL) + .dependsOn(MINIO); + return container; + } + + protected abstract String bucket(); + @Override boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { final String prefix = String.format("%s-%s", topicName, topicId.toString()); - final var request = ListObjectsV2Request.builder().bucket(BUCKET).prefix(prefix).build(); + final var request = ListObjectsV2Request.builder().bucket(bucket()).prefix(prefix).build(); return s3Client.listObjectsV2(request).keyCount() == 0; } @Override List remotePartitionFiles(final TopicIdPartition topicIdPartition) { - ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(BUCKET).build(); + ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(bucket()).build(); final List s3Objects = new ArrayList<>(); ListObjectsV2Response result; while ((result = s3Client.listObjectsV2(request)).isTruncated()) { diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseSocks5Test.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseSocks5Test.java index 4177fc0ac..61fd85816 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseSocks5Test.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseSocks5Test.java @@ -78,7 +78,7 @@ void worksWithUnauthenticatedProxy() throws StorageBackendException, IOException protected abstract Map storageConfigForUnauthenticatedProxy(); @Test - void doesNotWorkWithoutProxy() { + protected void doesNotWorkWithoutProxy() { // This test accompanies the other ones by ensuring that _without_ a proxy // we cannot even resolve the host name of the server, which is internal to the Docker network. diff --git a/storage/s3/build.gradle b/storage/s3/build.gradle index 3ecc9cdb0..01891ed3d 100644 --- a/storage/s3/build.gradle +++ b/storage/s3/build.gradle @@ -27,6 +27,7 @@ dependencies { dep.exclude group: "org.slf4j" } implementation ("software.amazon.awssdk:s3:$awsSdkVersion") {excludeFromAWSDeps(it)} + compileOnly ("software.amazon.awssdk:apache-client:$awsSdkVersion") {excludeFromAWSDeps(it)} runtimeOnly ("software.amazon.awssdk:sts:$awsSdkVersion") {excludeFromAWSDeps(it)} implementation project(':commons') diff --git a/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3Socks5Test.java b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3Socks5Test.java new file mode 100644 index 000000000..4bd6688a8 --- /dev/null +++ b/storage/s3/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/s3/S3Socks5Test.java @@ -0,0 +1,158 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.s3; + +import java.lang.reflect.Method; +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.BaseSocks5Test; +import io.aiven.kafka.tieredstorage.storage.TestUtils; + +import com.github.dockerjava.api.model.ContainerNetwork; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; + +@Testcontainers +class S3Socks5Test extends BaseSocks5Test { + static final Network NETWORK = Network.newNetwork(); + + @Container + private static final LocalStackContainer LOCALSTACK = S3TestContainer.container() + .withNetwork(NETWORK); + + @Container + static final GenericContainer PROXY_AUTHENTICATED = proxyContainer(true).withNetwork(NETWORK); + @Container + static final GenericContainer PROXY_UNAUTHENTICATED = proxyContainer(false).withNetwork(NETWORK); + + private static S3Client s3Client; + private String bucketName; + + @BeforeAll + static void setUpClass() { + final var clientBuilder = S3Client.builder(); + clientBuilder.region(Region.of(LOCALSTACK.getRegion())) + .endpointOverride(LOCALSTACK.getEndpointOverride(LocalStackContainer.Service.S3)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + LOCALSTACK.getAccessKey(), + LOCALSTACK.getSecretKey() + ) + ) + ) + .build(); + s3Client = clientBuilder.build(); + } + + @BeforeEach + void setUp(final TestInfo testInfo) { + bucketName = TestUtils.testNameToBucketName(testInfo); + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); + } + + static String internalLocalstackEndpoint() { + try { + final String networkName = LOCALSTACK.getDockerClient() + .inspectNetworkCmd().withNetworkId(NETWORK.getId()).exec().getName(); + final ContainerNetwork containerNetwork = LOCALSTACK.getContainerInfo() + .getNetworkSettings().getNetworks().get(networkName); + final String ipAddress = containerNetwork.getIpAddress(); + final Method getServicePortField = LocalStackContainer.class + .getDeclaredMethod("getServicePort", LocalStackContainer.EnabledService.class); + getServicePortField.setAccessible(true); + final int port = (int) getServicePortField.invoke(LOCALSTACK, LocalStackContainer.Service.S3); + return String.format("http://%s:%d", ipAddress, port); + } catch (final ReflectiveOperationException e) { + throw new RuntimeException(e); + } + + } + + @Override + protected S3Storage createStorageBackend() { + return new S3Storage(); + } + + @Override + protected Map storageConfigForAuthenticatedProxy() { + final var proxy = PROXY_AUTHENTICATED; + return Map.of( + "s3.bucket.name", bucketName, + "s3.region", LOCALSTACK.getRegion(), + "s3.endpoint.url", internalLocalstackEndpoint(), + "aws.access.key.id", LOCALSTACK.getAccessKey(), + "aws.secret.access.key", LOCALSTACK.getSecretKey(), + "s3.path.style.access.enabled", true, + "proxy.host", proxy.getHost(), + "proxy.port", proxy.getMappedPort(SOCKS5_PORT), + "proxy.username", SOCKS5_USER, + "proxy.password", SOCKS5_PASSWORD + ); + } + + @Override + protected Map storageConfigForUnauthenticatedProxy() { + final var proxy = PROXY_UNAUTHENTICATED; + return Map.of( + "s3.bucket.name", bucketName, + "s3.region", LOCALSTACK.getRegion(), + "s3.endpoint.url", internalLocalstackEndpoint(), + "aws.access.key.id", LOCALSTACK.getAccessKey(), + "aws.secret.access.key", LOCALSTACK.getSecretKey(), + "s3.path.style.access.enabled", true, + "proxy.host", proxy.getHost(), + "proxy.port", proxy.getMappedPort(SOCKS5_PORT) + ); + } + + @Override + protected Map storageConfigForNoProxy() { + return Map.of( + "s3.bucket.name", bucketName, + "s3.region", LOCALSTACK.getRegion(), + "s3.endpoint.url", internalLocalstackEndpoint(), + "aws.access.key.id", LOCALSTACK.getAccessKey(), + "aws.secret.access.key", LOCALSTACK.getSecretKey(), + "s3.path.style.access.enabled", true + ); + } + + @Disabled("Not applicable for S3") + @Override + protected void doesNotWorkWithoutProxy() { + // Unfortunately, S3 does the client-side hostname resolution, + // so the trick with using the hostname visible only in Docker (i.e. to the proxy containers) won't work. + } + + @Override + protected Iterable possibleRootCauseMessagesWhenNoProxy() { + return null; + } +} diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3ClientBuilder.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3ClientBuilder.java index f32a27ffc..c7c3948f7 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3ClientBuilder.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3ClientBuilder.java @@ -18,12 +18,11 @@ import java.util.Objects; +import io.aiven.kafka.tieredstorage.storage.proxy.ProxyConfig; + import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder; -import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.utils.AttributeMap; class S3ClientBuilder { static S3Client build(final S3StorageConfig config) { @@ -39,15 +38,19 @@ static S3Client build(final S3StorageConfig config) { s3ClientBuilder.forcePathStyle(config.pathStyleAccessEnabled()); } - if (!config.certificateCheckEnabled()) { - s3ClientBuilder.httpClient( - new DefaultSdkHttpClientBuilder() - .buildWithDefaults( - AttributeMap.builder() - .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true) - .build() - ) - ); + final ProxyConfig proxyConfig = config.proxyConfig(); + if (!config.certificateCheckEnabled() || proxyConfig != null) { + final var sdkHttpClientBuilder = new SdkHttpClientBuilder(); + + if (!config.certificateCheckEnabled()) { + sdkHttpClientBuilder.trustAllCertificates(); + } + + if (proxyConfig != null) { + sdkHttpClientBuilder.withProxy(proxyConfig); + } + + s3ClientBuilder.httpClientBuilder(sdkHttpClientBuilder); } s3ClientBuilder.serviceConfiguration(builder -> diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java index cd13db260..88a850ddb 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java @@ -31,6 +31,7 @@ import io.aiven.kafka.tieredstorage.config.validators.Null; import io.aiven.kafka.tieredstorage.config.validators.Subclass; import io.aiven.kafka.tieredstorage.config.validators.ValidUrl; +import io.aiven.kafka.tieredstorage.storage.proxy.ProxyConfig; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; @@ -176,9 +177,16 @@ public class S3StorageConfig extends AbstractConfig { ); } + private ProxyConfig proxyConfig = null; + public S3StorageConfig(final Map props) { super(CONFIG, props); validate(); + + final Map proxyProps = this.originalsWithPrefix(ProxyConfig.PROXY_PREFIX, true); + if (!proxyProps.isEmpty()) { + this.proxyConfig = new ProxyConfig(proxyProps); + } } private void validate() { @@ -200,6 +208,10 @@ && getPassword(AWS_ACCESS_KEY_ID_CONFIG) != null) { } } + ProxyConfig proxyConfig() { + return proxyConfig; + } + Region region() { return Region.of(getString(S3_REGION_CONFIG)); } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/SdkHttpClientBuilder.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/SdkHttpClientBuilder.java new file mode 100644 index 000000000..81bd26c87 --- /dev/null +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/SdkHttpClientBuilder.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.s3; + +import io.aiven.kafka.tieredstorage.storage.proxy.ProxyConfig; +import io.aiven.kafka.tieredstorage.storage.s3.proxy.ProxyInstaller; + +import software.amazon.awssdk.core.internal.http.loader.DefaultSdkHttpClientBuilder; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.SdkHttpConfigurationOption; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.utils.AttributeMap; + +class SdkHttpClientBuilder implements SdkHttpClient.Builder { + private boolean trustAllCertificates = false; + private ProxyConfig proxyConfig = null; + + void trustAllCertificates() { + this.trustAllCertificates = true; + } + + void withProxy(final ProxyConfig proxyConfig) { + this.proxyConfig = proxyConfig; + } + + @Override + public SdkHttpClient buildWithDefaults(final AttributeMap serviceDefaults) { + final var defaultSdkHttpClientBuilder = new DefaultSdkHttpClientBuilder(); + + final AttributeMap actualServiceDefaults = this.trustAllCertificates + ? serviceDefaults.toBuilder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true).build() + : serviceDefaults; + final ApacheHttpClient client = + (ApacheHttpClient) defaultSdkHttpClientBuilder.buildWithDefaults(actualServiceDefaults); + + if (proxyConfig != null) { + try { + ProxyInstaller.install(client, proxyConfig); + } catch (final ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + return client; + } +} diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/PrivateField.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/PrivateField.java new file mode 100644 index 000000000..a85705567 --- /dev/null +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/PrivateField.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.s3.proxy; + +import java.lang.reflect.Field; + +class PrivateField { + private final C object; + private final Field field; + private final Class valueClass; + + private PrivateField( + final Class declaringClass, final C object, final Class valueClass, final String name + ) throws ReflectiveOperationException { + this.object = object; + this.field = declaringClass.getDeclaredField(name); + this.field.setAccessible(true); + this.valueClass = valueClass; + } + + V getValue() throws ReflectiveOperationException { + return valueClass.cast(this.field.get(this.object)); + } + + void setValue(final V value) throws ReflectiveOperationException { + this.field.set(this.object, value); + } + + static PrivateField of( + final Class declaringClass, final C object, final Class valueClass, final String name + ) throws ReflectiveOperationException { + return new PrivateField<>(declaringClass, object, valueClass, name); + } +} diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/ProxyInstaller.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/ProxyInstaller.java new file mode 100644 index 000000000..d0c26132e --- /dev/null +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/proxy/ProxyInstaller.java @@ -0,0 +1,145 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.s3.proxy; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; + +import io.aiven.kafka.tieredstorage.storage.proxy.ProxyConfig; +import io.aiven.kafka.tieredstorage.storage.proxy.Socks5ProxyAuthenticator; + +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory; +import software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient; + +/** + * Makes the {@link ApacheHttpClient} connect via a SOCKS5 proxy. + * + *

This operation heavily depends on reflection. + * This is not ideal, but this is the only way to do this in the present state of the AWS SDK configurability. + */ +public class ProxyInstaller { + public static void install( + final ApacheHttpClient client, final ProxyConfig proxyConfig + ) throws ReflectiveOperationException { + final var defaultHttpClientConnectionOperator = extractClientConnectionOperator(client); + + final InetSocketAddress proxyAddr = new InetSocketAddress(proxyConfig.host(), proxyConfig.port()); + final Proxy proxy = new Proxy(Proxy.Type.SOCKS, proxyAddr); + + final var socketFactoryRegistryField = PrivateField.of( + DefaultHttpClientConnectionOperator.class, defaultHttpClientConnectionOperator, + Registry.class, "socketFactoryRegistry"); + + @SuppressWarnings("unchecked") + final var originalConnectionSocketFactoryRegistry = + (Registry) socketFactoryRegistryField.getValue(); + + final var proxiedConnectionSocketFactoryRegistry = + createProxiedConnectionSocketFactoryRegistry(originalConnectionSocketFactoryRegistry, proxy); + socketFactoryRegistryField.setValue(proxiedConnectionSocketFactoryRegistry); + + if (proxyConfig.username() != null) { + Socks5ProxyAuthenticator.register( + proxyConfig.host(), proxyConfig.port(), proxyConfig.username(), proxyConfig.password()); + } + } + + private static DefaultHttpClientConnectionOperator extractClientConnectionOperator( + final ApacheHttpClient apacheHttpClient + ) throws ReflectiveOperationException { + final var apacheSdkHttpClient = PrivateField.of( + ApacheHttpClient.class, apacheHttpClient, ApacheSdkHttpClient.class, "httpClient").getValue(); + final var poolingHttpClientConnectionManager = PrivateField.of( + ApacheSdkHttpClient.class, apacheSdkHttpClient, PoolingHttpClientConnectionManager.class, "cm") + .getValue(); + return PrivateField.of( + PoolingHttpClientConnectionManager.class, poolingHttpClientConnectionManager, + DefaultHttpClientConnectionOperator.class, "connectionOperator") + .getValue(); + } + + private static Registry createProxiedConnectionSocketFactoryRegistry( + final Registry originalConnectionSocketFactoryRegistry, + final Proxy proxy + ) throws ReflectiveOperationException { + if (originalConnectionSocketFactoryRegistry.lookup("http") == null) { + throw new RuntimeException("Connection factory for HTTP doesn't exist"); + } + + final SdkTlsSocketFactory httpsConnectionSocketFactory = + (SdkTlsSocketFactory) originalConnectionSocketFactoryRegistry.lookup("https"); + if (httpsConnectionSocketFactory == null) { + throw new RuntimeException("Connection factory for HTTPS doesn't exist"); + } + + final var sslContext = PrivateField.of( + SdkTlsSocketFactory.class, httpsConnectionSocketFactory, SSLContext.class, "sslContext").getValue(); + final var hostnameVerifier = PrivateField.of( + SSLConnectionSocketFactory.class, httpsConnectionSocketFactory, + HostnameVerifier.class, "hostnameVerifier") + .getValue(); + return RegistryBuilder.create() + .register("http", new ProxiedPlainConnectionSocketFactory(proxy)) + .register("https", new ProxiedSdkTlsSocketFactory(sslContext, hostnameVerifier, proxy)) + .build(); + } + + private static class ProxiedPlainConnectionSocketFactory extends PlainConnectionSocketFactory { + private final Proxy proxy; + + private ProxiedPlainConnectionSocketFactory(final Proxy proxy) { + this.proxy = proxy; + } + + @Override + public Socket createSocket(final HttpContext context) throws IOException { + return new Socket(proxy); + } + + } + + private static class ProxiedSdkTlsSocketFactory extends SdkTlsSocketFactory { + private final Proxy proxy; + + private ProxiedSdkTlsSocketFactory( + final SSLContext sslContext, final HostnameVerifier hostnameVerifier, final Proxy proxy + ) { + super(sslContext, hostnameVerifier); + this.proxy = proxy; + } + + @Override + public Socket createSocket(final HttpContext context) throws IOException { + return new Socket(proxy); + } + + } +}