Skip to content

Commit

Permalink
Add experimental SOCKS5 support for S3
Browse files Browse the repository at this point in the history
Technically, I found a way to have proxy-side host name resolution, but it requires rewriting quite a bit of the AWS client machinery. Leaving it as is for now.
  • Loading branch information
ivanyu committed Mar 28, 2024
1 parent 2e0a299 commit 352d559
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 61 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ The cache is able to asynchronously prefetch next chunks, up to the specified nu

⚠️ This is an experimental feature subject for future changes.

| Object storage | Supported | Host name resolution |
|----------------------|:---------------:|:--------------------:|
| AWS S3 | ❌ (in progress) | |
| Azure Blob Storage | | Proxy-side |
| Google Cloud Storage | | Proxy-side |
| Object storage | Supported | Host name resolution |
|----------------------|:---------:|:--------------------:|
| AWS S3 | | Client-side |
| Azure Blob Storage | | Proxy-side |
| Google Cloud Storage | | Proxy-side |

## License

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class S3MinioSingleBrokerDirectTest extends S3MinioSingleBrokerTest {
static final String BUCKET = "test-bucket-direct";

@BeforeAll
static void createBucket() {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@BeforeAll
static void startKafka() throws Exception {
setupKafka(kafka -> rsmPluginBasicSetup(kafka)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET));
}

@Override
protected String bucket() {
return BUCKET;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

class S3MinioSingleBrokerSocks5Test extends S3MinioSingleBrokerTest {
static final String BUCKET = "test-bucket-socks5";

@BeforeAll
static void createBucket() {
s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}

@BeforeAll
static void startKafka() throws Exception {
setupKafka(kafka -> rsmPluginBasicSetup(kafka)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_HOST", SOCKS5_NETWORK_ALIAS)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PORT", Integer.toString(SOCKS5_PORT))
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_USERNAME", SOCKS5_USER)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_PROXY_PASSWORD", SOCKS5_PASSWORD));
}

@Override
protected String bucket() {
return BUCKET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -38,45 +38,28 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

public class S3MinioSingleBrokerTest extends SingleBrokerTest {

abstract class S3MinioSingleBrokerTest extends SingleBrokerTest {
static final int MINIO_PORT = 9000;
static final String MINIO_NETWORK_ALIAS = "minio";

static final GenericContainer<?> MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio"))
.withCommand("server", "/data", "--console-address", ":9090")
.withExposedPorts(MINIO_PORT)
.withNetwork(NETWORK)
.withNetworkAliases("minio");
.withNetworkAliases(MINIO_NETWORK_ALIAS);

static final String ACCESS_KEY_ID = "minioadmin";
static final String SECRET_ACCESS_KEY = "minioadmin";
static final String REGION = "us-east-1";
static final String BUCKET = "test-bucket";

static final String MINIO_SERVER_URL = String.format("http://%s:%s", MINIO_NETWORK_ALIAS, MINIO_PORT);

static S3Client s3Client;

@BeforeAll
static void init() throws Exception {
static void init() {
MINIO.start();

final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT);

createBucket(minioServerUrl);

initializeS3Client();

setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", minioServerUrl)
.dependsOn(MINIO));
}

private static void initializeS3Client() {
final Integer mappedPort = MINIO.getFirstMappedPort();
Testcontainers.exposeHostPorts(mappedPort);
s3Client = S3Client.builder()
Expand All @@ -93,21 +76,6 @@ private static void initializeS3Client() {
.forEach(bucket -> LOG.info("S3 bucket: {}", bucket.name()));
}

private static void createBucket(final String minioServerUrl) {
final String cmd =
"/usr/bin/mc config host add local "
+ minioServerUrl + " " + ACCESS_KEY_ID + " " + SECRET_ACCESS_KEY + " --api s3v4 &&"
+ "/usr/bin/mc mb local/test-bucket;\n";

final GenericContainer<?> mcContainer = new GenericContainer<>("minio/mc")
.withNetwork(NETWORK)
.withStartupCheckStrategy(new OneShotStartupCheckStrategy())
.withCreateContainerCmdModifier(containerCommand -> containerCommand
.withTty(true)
.withEntrypoint("/bin/sh", "-c", cmd));
mcContainer.start();
}

@AfterAll
static void cleanup() {
stopKafka();
Expand All @@ -117,16 +85,33 @@ static void cleanup() {
cleanupStorage();
}

static KafkaContainer rsmPluginBasicSetup(final KafkaContainer container) {
container
.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/s3/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", MINIO_SERVER_URL)
.dependsOn(MINIO);
return container;
}

protected abstract String bucket();

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());
final var request = ListObjectsV2Request.builder().bucket(BUCKET).prefix(prefix).build();
final var request = ListObjectsV2Request.builder().bucket(bucket()).prefix(prefix).build();
return s3Client.listObjectsV2(request).keyCount() == 0;
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(BUCKET).build();
ListObjectsV2Request request = ListObjectsV2Request.builder().bucket(bucket()).build();
final List<S3Object> s3Objects = new ArrayList<>();
ListObjectsV2Response result;
while ((result = s3Client.listObjectsV2(request)).isTruncated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void worksWithUnauthenticatedProxy() throws StorageBackendException, IOException
protected abstract Map<String, Object> storageConfigForUnauthenticatedProxy();

@Test
void doesNotWorkWithoutProxy() {
protected void doesNotWorkWithoutProxy() {
// This test accompanies the other ones by ensuring that _without_ a proxy
// we cannot even resolve the host name of the server, which is internal to the Docker network.

Expand Down
1 change: 1 addition & 0 deletions storage/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
dep.exclude group: "org.slf4j"
}
implementation ("software.amazon.awssdk:s3:$awsSdkVersion") {excludeFromAWSDeps(it)}
compileOnly ("software.amazon.awssdk:apache-client:$awsSdkVersion") {excludeFromAWSDeps(it)}
runtimeOnly ("software.amazon.awssdk:sts:$awsSdkVersion") {excludeFromAWSDeps(it)}

implementation project(':commons')
Expand Down
Loading

0 comments on commit 352d559

Please sign in to comment.