Skip to content

Commit

Permalink
AWS SDK 2.X migration for source connector [KCON-84] (#374)
Browse files Browse the repository at this point in the history
The AWS 1.X sdk is in maintenance mode and will be out of support by
December 2025.

Key differences are
* Use of the builder pattern when creating objects
* get and set removed from getters and setters e.g. getKey(),
setKey(newKey) -> key(), key(newKey)
* S3Client is immutable
* different package names
* Additional built in functionality removing some of the work from the
connector implementation and having the existing library handle it.

SDK 1.X still in use by sink connector but that will be required to be
updated as well in the future, but this means the s3-commons code has
both the 1.x and 2.x jars.

---------

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven authored Dec 30, 2024
1 parent e0184bb commit b4475c9
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 417 deletions.
3 changes: 3 additions & 0 deletions s3-commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.777")
val amazonSTSVersion by extra("1.12.777")
val amazonV2Version by extra("2.29.34")

dependencies {
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion")
implementation("software.amazon.awssdk:auth:$amazonV2Version")
implementation("software.amazon.awssdk:sts:$amazonV2Version")

implementation(project(":commons"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import com.amazonaws.services.s3.internal.BucketNameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

/**
* The configuration fragment that defines the S3 specific characteristics.
*/
@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports" })
@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports", "PMD.TooManyStaticImports", "PMD.GodClass" })
public final class S3ConfigFragment extends ConfigFragment {

private static final Logger LOGGER = LoggerFactory.getLogger(S3ConfigFragment.class);
Expand Down Expand Up @@ -345,7 +347,8 @@ public void validateCredentials() {
}
} else {
final BasicAWSCredentials awsCredentials = getAwsCredentials();
if (awsCredentials == null) {
final AwsBasicCredentials awsCredentialsV2 = getAwsCredentialsV2();
if (awsCredentials == null && awsCredentialsV2 == null) {
LOGGER.info(
"Connector use {} as credential Provider, "
+ "when configuration for {{}, {}} OR {{}, {}} are absent",
Expand Down Expand Up @@ -410,11 +413,13 @@ public AwsStsEndpointConfig getStsEndpointConfig() {
return new AwsStsEndpointConfig(cfg.getString(AWS_STS_CONFIG_ENDPOINT), cfg.getString(AWS_S3_REGION_CONFIG));
}

@Deprecated
public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
final AwsStsEndpointConfig config = getStsEndpointConfig();
return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion());
}

@Deprecated
public BasicAWSCredentials getAwsCredentials() {
if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {
Expand All @@ -430,12 +435,26 @@ public BasicAWSCredentials getAwsCredentials() {
return null;
}

public AwsBasicCredentials getAwsCredentialsV2() {
if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {

return AwsBasicCredentials.create(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(),
cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value());
} else if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID))
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY))) {
LOGGER.warn("Config options {} and {} are not supported for this Connector", AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY);
}
return null;
}

public String getAwsS3EndPoint() {
return Objects.nonNull(cfg.getString(AWS_S3_ENDPOINT_CONFIG))
? cfg.getString(AWS_S3_ENDPOINT_CONFIG)
: cfg.getString(AWS_S3_ENDPOINT);
}

@Deprecated
public Region getAwsS3Region() {
// we have priority of properties if old one not set or both old and new one set
// the new property value will be selected
Expand All @@ -448,6 +467,18 @@ public Region getAwsS3Region() {
}
}

public software.amazon.awssdk.regions.Region getAwsS3RegionV2() {
// we have priority of properties if old one not set or both old and new one set
// the new property value will be selected
if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) {
return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION_CONFIG));
} else if (Objects.nonNull(cfg.getString(AWS_S3_REGION))) {
return software.amazon.awssdk.regions.Region.of(cfg.getString(AWS_S3_REGION));
} else {
return software.amazon.awssdk.regions.Region.of(Regions.US_EAST_1.getName());
}
}

public String getAwsS3BucketName() {
return Objects.nonNull(cfg.getString(AWS_S3_BUCKET_NAME_CONFIG))
? cfg.getString(AWS_S3_BUCKET_NAME_CONFIG)
Expand Down Expand Up @@ -484,6 +515,10 @@ public AWSCredentialsProvider getCustomCredentialsProvider() {
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class);
}

public AwsCredentialsProvider getCustomCredentialsProviderV2() {
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class);
}

public int getFetchPageSize() {
return cfg.getInt(FETCH_PAGE_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

public class AwsCredentialProviderFactory {

Expand Down Expand Up @@ -58,4 +63,33 @@ private AWSSecurityTokenService securityTokenService(final S3ConfigFragment conf
}
return AWSSecurityTokenServiceClientBuilder.defaultClient();
}

public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) {

if (config.hasAwsStsRole()) {
return getV2StsProvider(config);
}
final AwsBasicCredentials awsCredentials = config.getAwsCredentialsV2();
if (Objects.isNull(awsCredentials)) {
return config.getCustomCredentialsProviderV2();
}
return StaticCredentialsProvider.create(awsCredentials);

}

private StsAssumeRoleCredentialsProvider getV2StsProvider(final S3ConfigFragment config) {
if (config.hasAwsStsRole()) {
return StsAssumeRoleCredentialsProvider.builder()
.refreshRequest(() -> AssumeRoleRequest.builder()
.roleArn(config.getStsRole().getArn())
// Maker this a unique identifier
.roleSessionName("AwsV2SDKConnectorSession")
.build())
.build();
}

return StsAssumeRoleCredentialsProvider.builder().build();

}

}
9 changes: 4 additions & 5 deletions s3-source-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import com.github.spotbugs.snom.SpotBugsTask

plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.729")
val amazonSTSVersion by extra("1.12.729")
val amazonS3Version by extra("2.29.34")
val amazonSTSVersion by extra("2.29.34")
val s3mockVersion by extra("0.2.6")
val testKafkaVersion by extra("3.7.1")

Expand Down Expand Up @@ -67,8 +67,8 @@ dependencies {

implementation(project(":commons"))
implementation(project(":s3-commons"))
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion")
implementation("software.amazon.awssdk:s3:$amazonS3Version")
implementation("software.amazon.awssdk:sts:$amazonSTSVersion")

implementation(tools.spotbugs.annotations)
implementation(logginglibs.slf4j)
Expand Down Expand Up @@ -154,7 +154,6 @@ dependencies {
exclude(group = "org.apache.commons", module = "commons-math3")
exclude(group = "org.apache.httpcomponents", module = "httpclient")
exclude(group = "commons-codec", module = "commons-codec")
exclude(group = "commons-io", module = "commons-io")
exclude(group = "commons-net", module = "commons-net")
exclude(group = "org.eclipse.jetty")
exclude(group = "org.eclipse.jetty.websocket")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -47,11 +48,6 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -61,6 +57,10 @@
import org.testcontainers.containers.Container;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

public interface IntegrationBase {
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
Expand Down Expand Up @@ -101,13 +101,13 @@ static void waitForRunningContainer(final Container<?> container) {
await().atMost(Duration.ofMinutes(1)).until(container::isRunning);
}

static AmazonS3 createS3Client(final LocalStackContainer localStackContainer) {
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString(),
localStackContainer.getRegion()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
localStackContainer.getAccessKey(), localStackContainer.getSecretKey())))
static S3Client createS3Client(final LocalStackContainer localStackContainer) {
return S3Client.builder()
.endpointOverride(
URI.create(localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString()))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials
.create(localStackContainer.getAccessKey(), localStackContainer.getSecretKey())))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -62,9 +61,6 @@
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.testutils.ContentUtils;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
Expand All @@ -83,6 +79,9 @@
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

@Testcontainers
@SuppressWarnings("PMD.ExcessiveImports")
Expand Down Expand Up @@ -111,7 +110,7 @@ final class IntegrationTest implements IntegrationBase {
private AdminClient adminClient;
private ConnectRunner connectRunner;

private static AmazonS3 s3Client;
private static S3Client s3Client;

@BeforeAll
static void setUpAll() throws IOException, InterruptedException {
Expand Down Expand Up @@ -263,7 +262,7 @@ void parquetTest(final TestInfo testInfo) throws IOException {
final Path path = ContentUtils.getTmpFilePath(name);

try {
s3Client.putObject(TEST_BUCKET_NAME, fileName, Files.newInputStream(path), null);
s3Client.putObject(PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(fileName).build(), path);
} catch (final Exception e) { // NOPMD broad exception caught
LOGGER.error("Error in reading file {}", e.getMessage(), e);
} finally {
Expand Down Expand Up @@ -341,9 +340,8 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId
private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) {
final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-"
+ System.currentTimeMillis() + ".txt";
final PutObjectRequest request = new PutObjectRequest(TEST_BUCKET_NAME, objectKey,
new ByteArrayInputStream(testDataBytes), new ObjectMetadata());
s3Client.putObject(request);
final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build();
s3Client.putObject(request, RequestBody.fromBytes(testDataBytes));
return OBJECT_KEY + SEPARATOR + objectKey;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;

/**
* S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka
Expand All @@ -64,7 +64,7 @@ public class S3SourceTask extends SourceTask {
private static final long ERROR_BACKOFF = 1000L;

private S3SourceConfig s3SourceConfig;
private AmazonS3 s3Client;
private S3Client s3Client;

private Iterator<S3SourceRecord> sourceRecordIterator;
private Transformer transformer;
Expand Down Expand Up @@ -122,8 +122,8 @@ public List<SourceRecord> poll() throws InterruptedException {
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
return results;
} catch (AmazonS3Exception exception) {
if (exception.isRetryable()) {
} catch (SdkException exception) {
if (exception.retryable()) {
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...",
exception);
pollLock.wait(ERROR_BACKOFF);
Expand Down
Loading

0 comments on commit b4475c9

Please sign in to comment.