diff --git a/README.md b/README.md index 2d4f85a..7ede4ba 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,13 @@ The project originates from Confluent [kafka-connect-elasticsearch](https://gith # Documentation -TBD +Supported Elasticsearch versions are 7.17.0+ # Contribute -[Source Code](https://github.com/aiven/elasticsearch-connector-for-apache-kafka) +[Source Code](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka) -[Issue Tracker](https://github.com/aiven/elasticsearch-connector-for-apache-kafka) +[Issue Tracker](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka) # License diff --git a/build.gradle b/build.gradle index 9f60360..41c339b 100644 --- a/build.gradle +++ b/build.gradle @@ -48,7 +48,7 @@ java { } wrapper { - distributionType = 'ALL' + distributionType = "ALL" doLast { def sha256Sum = new String(new URL("${distributionUrl}.sha256").bytes) propertiesFile << "distributionSha256Sum=${sha256Sum}\n" @@ -57,18 +57,16 @@ wrapper { } ext { - guavaVersion = "11.0.2" kafkaVersion = "2.2.0" slf4jVersion = "2.0.12" log4jVersion = "2.23.0" - elasticSearchVersion = "7.4.0" - elasticClientVersion = "7.17.0" + elasticJavaClientVersion = "7.17.18" testContainersElasticVersion = "1.19.6" carrotsearchVersion = "2.8.1" } compileJava { - options.compilerArgs = ['-Xlint:all', '-Werror'] + options.compilerArgs = ["-Xlint:all", "-Werror"] } checkstyle { @@ -81,28 +79,23 @@ jacoco { } dependencies { - compileOnly "org.apache.kafka:connect-api:$kafkaVersion" - compileOnly "org.apache.kafka:connect-json:$kafkaVersion" + compileOnly("org.apache.kafka:connect-api:$kafkaVersion") + compileOnly("org.apache.kafka:connect-json:$kafkaVersion") - implementation "org.slf4j:slf4j-api:$slf4jVersion" - implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:$elasticClientVersion" - implementation("com.google.guava:guava:$guavaVersion") - - implementation "org.apache.logging.log4j:log4j-api:$log4jVersion" - implementation "org.apache.logging.log4j:log4j-core:$log4jVersion" + implementation("org.slf4j:slf4j-api:$slf4jVersion") + implementation("co.elastic.clients:elasticsearch-java:$elasticJavaClientVersion") testImplementation("junit:junit:4.13.2") { - exclude group: 'org.hamcrest', module: 'hamcrest-core' + exclude(group: "org.hamcrest", module: "hamcrest-core") } - testImplementation "org.hamcrest:hamcrest-all:1.3" - testImplementation "org.mockito:mockito-core:5.4.0" - testImplementation "org.mockito:mockito-all:1.10.19" - - testImplementation "org.apache.kafka:connect-api:$kafkaVersion" - testImplementation "org.apache.kafka:connect-json:$kafkaVersion" - testImplementation "org.testcontainers:elasticsearch:$testContainersElasticVersion" - implementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion") - testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" + testImplementation("org.hamcrest:hamcrest-all:1.3") + testImplementation("org.mockito:mockito-core:5.4.0") + testImplementation("org.mockito:mockito-all:1.10.19") + + testImplementation("org.apache.kafka:connect-json:$kafkaVersion") + testImplementation("org.testcontainers:elasticsearch:$testContainersElasticVersion") + testImplementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion") + testRuntimeOnly("org.slf4j:slf4j-log4j12:$slf4jVersion") } distributions { @@ -156,7 +149,7 @@ publishing { } processResources { - filesMatching('elasticsearch-connector-for-apache-kafka-version.properties') { + filesMatching("elasticsearch-connector-for-apache-kafka-version.properties") { expand(version: version) } } @@ -169,12 +162,10 @@ jar { } } -test { - - //we do not need to check classpath hell for testing - systemProperty "tests.jarhell.check", "false" - - //tests.security.manager is true all ElasticsearchSinkTestBase aware test hang - systemProperty "tests.security.manager", "false" +def elasticsearch7Test = tasks.register("elasticsearch7Test", Test) { + environment("ELASTIC_TEST_CONTAINER_VERSION", "7.17.0") +} +tasks.named("check") { + dependsOn elasticsearch7Test } diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 6eeaa68..dad9472 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -25,7 +25,7 @@ /> diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java index cfe1152..c26d9f3 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchClient.java @@ -26,8 +26,9 @@ import io.aiven.connect.elasticsearch.bulk.BulkRequest; import io.aiven.connect.elasticsearch.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.cluster.metadata.MappingMetadata; +import co.elastic.clients.elasticsearch._types.mapping.Property; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import com.fasterxml.jackson.databind.JsonNode; public interface ElasticsearchClient extends AutoCloseable { @@ -66,7 +67,7 @@ enum Version { * @param type the type * @throws IOException if the client cannot execute the request */ - MappingMetadata getMapping(String index, String type) throws IOException; + Property getMapping(String index, String type) throws IOException; /** * Creates a bulk request for the list of {@link IndexableRecord} records. @@ -92,14 +93,14 @@ enum Version { * @return the search result * @throws IOException if the client cannot execute the request */ - SearchResponse search(String index) throws IOException; + SearchResponse search(String index) throws IOException; /** - * Executes a search. + * Refreshes the index. * * @param index the index to refresh */ - void refresh(String index) throws IOException; + void refreshIndex(String index) throws IOException; /** * Shuts down the client. diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java index 477b95a..fbd2c3b 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTask.java @@ -34,7 +34,7 @@ import org.apache.kafka.connect.sink.SinkTask; import io.aiven.connect.elasticsearch.bulk.BulkProcessor; -import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper; +import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +55,7 @@ public void start(final Map props) { start(props, null); } - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") //TOPIC_INDEX_MAP_CONFIG // public for testing public void start(final Map props, final ElasticsearchClient client) { try { @@ -121,7 +121,7 @@ public void start(final Map props, final ElasticsearchClient cli if (client != null) { this.client = client; } else { - this.client = new AivenElasticsearchClientWrapper(config); + this.client = new ElasticsearchClientWrapper(config); } final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client) @@ -179,7 +179,7 @@ public void close(final Collection partitions) { } public void refresh(final String index) throws IOException { - client.refresh(index); + client.refreshIndex(index); } @Override diff --git a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java index 9d84494..7aba173 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java +++ b/src/main/java/io/aiven/connect/elasticsearch/ElasticsearchWriter.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -31,7 +32,6 @@ import io.aiven.connect.elasticsearch.bulk.BulkProcessor; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +103,7 @@ public class ElasticsearchWriter { behaviorOnMalformedDoc ); - existingMappings = Sets.newHashSet(); + existingMappings = new HashSet<>(); } public static class Builder { @@ -349,7 +349,7 @@ public void createIndicesForTopics(final Set assignedTopics) { } private Set indicesForTopics(final Set assignedTopics) { - final Set indices = Sets.newHashSet(); + final Set indices = new HashSet<>(); for (final String topic : assignedTopics) { indices.add(convertTopicToIndexName(topic)); } diff --git a/src/main/java/io/aiven/connect/elasticsearch/Mapping.java b/src/main/java/io/aiven/connect/elasticsearch/Mapping.java index 4411578..5e14e06 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/Mapping.java +++ b/src/main/java/io/aiven/connect/elasticsearch/Mapping.java @@ -29,10 +29,10 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; +import co.elastic.clients.elasticsearch._types.mapping.Property; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.elasticsearch.cluster.metadata.MappingMetadata; public class Mapping { @@ -57,7 +57,7 @@ public static void createMapping( /** * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist. */ - public static MappingMetadata getMapping(final ElasticsearchClient client, final String index, final String type) + public static Property getMapping(final ElasticsearchClient client, final String index, final String type) throws IOException { return client.getMapping(index, type); } diff --git a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchBulkRequest.java similarity index 71% rename from src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java rename to src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchBulkRequest.java index d0f1afc..a5a6fac 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/BulkRequestImpl.java +++ b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchBulkRequest.java @@ -19,14 +19,14 @@ import io.aiven.connect.elasticsearch.bulk.BulkRequest; -public class BulkRequestImpl implements BulkRequest { - private final org.elasticsearch.action.bulk.BulkRequest bulkRequest; +public class ElasticsearchBulkRequest implements BulkRequest { + private final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest; - public BulkRequestImpl(final org.elasticsearch.action.bulk.BulkRequest bulkRequest) { + public ElasticsearchBulkRequest(final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest) { this.bulkRequest = bulkRequest; } - public org.elasticsearch.action.bulk.BulkRequest getBulkRequest() { + public co.elastic.clients.elasticsearch.core.BulkRequest getBulkRequest() { return bulkRequest; } } diff --git a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapper.java similarity index 52% rename from src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java rename to src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapper.java index 623f3e9..3119d66 100644 --- a/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapper.java +++ b/src/main/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapper.java @@ -18,10 +18,10 @@ package io.aiven.connect.elasticsearch.clientwrapper; import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -39,6 +39,25 @@ import io.aiven.connect.elasticsearch.bulk.BulkRequest; import io.aiven.connect.elasticsearch.bulk.BulkResponse; +import co.elastic.clients.elasticsearch._types.ElasticsearchException; +import co.elastic.clients.elasticsearch._types.VersionType; +import co.elastic.clients.elasticsearch._types.mapping.Property; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; +import co.elastic.clients.elasticsearch.indices.ExistsRequest; +import co.elastic.clients.elasticsearch.indices.GetMappingRequest; +import co.elastic.clients.elasticsearch.indices.GetMappingResponse; +import co.elastic.clients.elasticsearch.indices.PutMappingRequest; +import co.elastic.clients.elasticsearch.indices.RefreshRequest; +import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.rest_client.RestClientTransport; +import co.elastic.clients.util.BinaryData; +import co.elastic.clients.util.ContentType; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -48,51 +67,26 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.RestHighLevelClientBuilder; -import org.elasticsearch.client.core.MainResponse; -import org.elasticsearch.client.indices.CreateIndexRequest; -import org.elasticsearch.client.indices.GetIndexRequest; -import org.elasticsearch.client.indices.GetMappingsRequest; -import org.elasticsearch.client.indices.GetMappingsResponse; -import org.elasticsearch.client.indices.PutMappingRequest; -import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AivenElasticsearchClientWrapper implements ElasticsearchClient { +public class ElasticsearchClientWrapper implements ElasticsearchClient { - // visible for testing - protected static final String MAPPER_PARSE_EXCEPTION - = "mapper_parse_exception"; - protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION - = "version_conflict_engine_exception"; - - private static final Logger LOG = LoggerFactory.getLogger(AivenElasticsearchClientWrapper.class); + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchClientWrapper.class); - @SuppressWarnings("deprecation") - private final RestHighLevelClient elasticClient; + private final ElasticsearchTransport elasticTransport; + private final co.elastic.clients.elasticsearch.ElasticsearchClient elasticClient; private final Version version; // visible for testing - @SuppressWarnings("deprecation") - - public AivenElasticsearchClientWrapper(final RestHighLevelClient client) { + public ElasticsearchClientWrapper( + final ElasticsearchTransport elasticTransport, + final co.elastic.clients.elasticsearch.ElasticsearchClient elasticClient) { try { - this.elasticClient = client; + this.elasticTransport = elasticTransport; + this.elasticClient = elasticClient; this.version = getServerVersion(); } catch (final IOException e) { throw new ConnectException( @@ -103,30 +97,10 @@ public AivenElasticsearchClientWrapper(final RestHighLevelClient client) { } // visible for testing - public AivenElasticsearchClientWrapper(final String address) { + public ElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig config) { try { - final Map props = new HashMap<>(); - props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, address); - props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect"); - this.elasticClient = getElasticsearchClient(new ElasticsearchSinkConnectorConfig(props)); - this.version = getServerVersion(); - } catch (final IOException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to connection error:", - e - ); - } catch (final ConfigException e) { - throw new ConnectException( - "Couldn't start ElasticsearchSinkTask due to configuration error:", - e - ); - } - } - - // visible for testing - public AivenElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig config) { - try { - this.elasticClient = getElasticsearchClient(config); + this.elasticTransport = getElasticsearchTransport(config); + this.elasticClient = getElasticsearchClient(); this.version = getServerVersion(); } catch (final IOException e) { throw new ConnectException( @@ -142,7 +116,7 @@ public AivenElasticsearchClientWrapper(final ElasticsearchSinkConnectorConfig co } private Version getServerVersion() throws IOException { - final String esVersion = this.elasticClient.info(RequestOptions.DEFAULT).getVersion().getNumber(); + final String esVersion = this.elasticClient.info().version().number(); return matchVersionString(esVersion); } @@ -168,12 +142,7 @@ private Version matchVersionString(final String esVersion) { return defaultVersion; } - private boolean es8compat(final Version version) { - return Objects.requireNonNull(version) == Version.ES_V8; - } - - @SuppressWarnings("deprecation") - private RestHighLevelClient getElasticsearchClient(final ElasticsearchSinkConnectorConfig config) { + private ElasticsearchTransport getElasticsearchTransport(final ElasticsearchSinkConnectorConfig config) { final HttpHost[] httpHosts = config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG).stream().map(HttpHost::create) .toArray(HttpHost[]::new); @@ -210,22 +179,15 @@ private RestHighLevelClient getElasticsearchClient(final ElasticsearchSinkConnec }); final RestClient restClient = restClientBuilder.build(); - final Version version = resolveVersion(restClient); - return new RestHighLevelClientBuilder(restClient) - .setApiCompatibilityMode(es8compat(version)) - .build(); + return new RestClientTransport( + restClient, + new JacksonJsonpMapper() + ); } - @SuppressWarnings("deprecation") - private Version resolveVersion(final RestClient restClient) { - // No auto-closing, it would close also the restClient. - final RestHighLevelClient highLevelClient = new RestHighLevelClientBuilder(restClient).build(); - try { - final MainResponse.Version version = highLevelClient.info(RequestOptions.DEFAULT).getVersion(); - return matchVersionString(version.getNumber()); - } catch (final IOException e) { - throw new ConnectException("Failed to get ElasticSearch version.", e); - } + private co.elastic.clients.elasticsearch.ElasticsearchClient getElasticsearchClient() { + Objects.requireNonNull(this.elasticTransport); + return new co.elastic.clients.elasticsearch.ElasticsearchClient(this.elasticTransport); } public Version getVersion() { @@ -233,9 +195,10 @@ public Version getVersion() { } private boolean indexExists(final String index) { - final GetIndexRequest getIndexRequest = new GetIndexRequest(index); + final ExistsRequest existsRequest = new ExistsRequest.Builder() + .index(index).build(); try { - return elasticClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT); + return elasticClient.indices().exists(existsRequest).value(); } catch (final IOException e) { throw new ConnectException(e); } @@ -244,9 +207,10 @@ private boolean indexExists(final String index) { public void createIndices(final Set indices) { for (final String index : indices) { if (!indexExists(index)) { - final CreateIndexRequest createIndexRequest = new CreateIndexRequest(index); + final CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder() + .index(index).build(); try { - elasticClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); + elasticClient.indices().create(createIndexRequest); } catch (final IOException e) { throw new ConnectException("Could not create index '" + index + "'", e); } @@ -255,71 +219,88 @@ public void createIndices(final Set indices) { } public void createMapping(final String index, final String type, final Schema schema) throws IOException { - final ObjectNode obj = JsonNodeFactory.instance.objectNode(); - obj.set(type, Mapping.inferMapping(getVersion(), schema)); - final JsonNode part = Mapping.inferMapping(getVersion(), schema); - final PutMappingRequest request = new PutMappingRequest(index) - .source(part.toString(), XContentType.JSON); + final ObjectNode root = JsonNodeFactory.instance.objectNode(); + final JsonNode mapping = Mapping.inferMapping(getVersion(), schema); + final ObjectNode typeNode = JsonNodeFactory.instance.objectNode(); + typeNode.set(type, mapping); + root.set("properties", typeNode); + final PutMappingRequest request = new PutMappingRequest.Builder() + .index(index) + .withJson(new StringReader(root.toString())) + .build(); + try { - this.elasticClient.indices().putMapping(request, RequestOptions.DEFAULT); + this.elasticClient.indices().putMapping(request); } catch (final ElasticsearchException exception) { throw new ConnectException( - "Cannot create mapping " + schema + " -- " + exception.getDetailedMessage() + "Cannot create mapping " + schema + " -- " + exception.getMessage() ); } } /** - * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist. + * Get the mapping for given index and type. Returns {@code null} if it does not exist. */ @Override - public MappingMetadata getMapping(final String index, final String type) throws IOException { - final GetMappingsRequest request = new GetMappingsRequest() - .indices(index); - final GetMappingsResponse response = this.elasticClient.indices().getMapping(request, RequestOptions.DEFAULT); - if (response.mappings().isEmpty()) { + public Property getMapping(final String index, final String type) throws IOException { + final GetMappingRequest request = new GetMappingRequest.Builder() + .index(index).build(); + final GetMappingResponse response = this.elasticClient.indices().getMapping(request); + final IndexMappingRecord indexMappingRecord = response.get(index); + if (indexMappingRecord == null) { return null; } - return response.mappings().get(index); + return indexMappingRecord.mappings().properties().get(type); } public BulkRequest createBulkRequest(final List batch) { - final org.elasticsearch.action.bulk.BulkRequest bulkRequest = new org.elasticsearch.action.bulk.BulkRequest(); + final List bulkOperations = new ArrayList<>(); for (final IndexableRecord record : batch) { - bulkRequest.add(toBulkableAction(record)); + bulkOperations.add(toBulkableOperation(record)); } - return new BulkRequestImpl(bulkRequest); + final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest = + new co.elastic.clients.elasticsearch.core.BulkRequest.Builder() + .operations(bulkOperations) + .build(); + return new ElasticsearchBulkRequest(bulkRequest); } // visible for testing - protected DocWriteRequest toBulkableAction(final IndexableRecord record) { + protected BulkOperation toBulkableOperation(final IndexableRecord record) { // If payload is null, the record was a tombstone and we should delete from the index. - return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record); + return record.payload != null ? toIndexOperation(record) : toDeleteOperation(record); } - private DeleteRequest toDeleteRequest(final IndexableRecord record) { + private BulkOperation toDeleteOperation(final IndexableRecord record) { // TODO: Should version information be set here? - return new DeleteRequest(record.key.index).id(record.key.id); + return new BulkOperation.Builder().delete(operation -> operation + .index(record.key.index) + .id(record.key.id) + ).build(); } - private IndexRequest toIndexRequest(final IndexableRecord record) { - final IndexRequest indexRequest = new IndexRequest(record.key.index) - .id(record.key.id) - .source(record.payload, XContentType.JSON); - if (record.version != null) { - indexRequest - .versionType(VersionType.EXTERNAL) - .version(record.version); - } - return indexRequest; + private BulkOperation toIndexOperation(final IndexableRecord record) { + final BinaryData binaryPayload = + BinaryData.of(record.payload.getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON); + return new BulkOperation.Builder().index(operation -> { + operation + .index(record.key.index) + .id(record.key.id) + .document(binaryPayload); + if (record.version != null) { + operation + .versionType(VersionType.External) + .version(record.version); + } + return operation; + }).build(); } public BulkResponse executeBulk(final BulkRequest bulk) throws IOException { - final BulkRequestImpl bulkRequest = (BulkRequestImpl) bulk; - final org.elasticsearch.action.bulk.BulkResponse - response = elasticClient.bulk(bulkRequest.getBulkRequest(), RequestOptions.DEFAULT); + final co.elastic.clients.elasticsearch.core.BulkResponse response = + elasticClient.bulk(((ElasticsearchBulkRequest) bulk).getBulkRequest()); - if (!response.hasFailures()) { + if (!response.errors()) { return BulkResponse.success(); } @@ -328,17 +309,16 @@ public BulkResponse executeBulk(final BulkRequest bulk) throws IOException { final List versionConflicts = new ArrayList<>(); final List errors = new ArrayList<>(); - for (final BulkItemResponse item : response.getItems()) { - if (item.isFailed()) { - final BulkItemResponse.Failure failure = item.getFailure(); - final String errorType = Optional.ofNullable(failure.getCause().getMessage()).orElse(""); - if (errorType.contains("version_conflict_engine_exception")) { - versionConflicts.add(new Key(item.getIndex(), item.getType(), item.getId())); - } else if (errorType.contains("mapper_parse_exception")) { + for (final BulkResponseItem item : response.items()) { + if (item.error() != null) { + final String errorType = item.error().type(); + if ("version_conflict_engine_exception".equals(errorType)) { + versionConflicts.add(new Key(item.index(), item.operationType().name(), item.id())); + } else if ("mapper_parse_exception".equals(errorType)) { retriable = false; - errors.add(item.getFailureMessage()); + errors.add(item.error().reason()); } else { - errors.add(item.getFailureMessage()); + errors.add(item.error().reason()); } } } @@ -351,26 +331,33 @@ public BulkResponse executeBulk(final BulkRequest bulk) throws IOException { } } - final String errorInfo = errors.isEmpty() ? response.buildFailureMessage() : errors.toString(); + final String errorInfo = errors.isEmpty() + ? "Errors present, but error information missing." + : errors.toString(); + LOG.trace("Bulk response: {}", response); return BulkResponse.failure(retriable, errorInfo); } + // visible for testing @Override - public SearchResponse search(final String index) throws IOException { - final SearchRequest searchRequest = new SearchRequest(); + public SearchResponse search(final String index) throws IOException { + final SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder(); + if (index != null) { - searchRequest.indices(index); + searchRequestBuilder.index(index); } - return elasticClient.search(searchRequest, RequestOptions.DEFAULT); + return elasticClient.search(searchRequestBuilder.build(), JsonNode.class); } - public void refresh(final String index) throws IOException { - final RefreshRequest request = new RefreshRequest(index); - elasticClient.indices().refresh(request, RequestOptions.DEFAULT); + public void refreshIndex(final String index) throws IOException { + final RefreshRequest request = new RefreshRequest.Builder() + .index(index) + .build(); + elasticClient.indices().refresh(request); } public void close() throws IOException { - elasticClient.close(); + elasticTransport.close(); } } diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java index af1d3e8..f6aa25e 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java +++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkFailureTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import io.aiven.connect.elasticsearch.bulk.BulkResponse; -import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper; +import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper; import org.junit.Test; import org.mockito.Mockito; @@ -45,7 +45,7 @@ public void testRetryIfRecoverable() throws IOException { final ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask(); final int numbRetriesBeforeSucceeding = 3; - final AivenElasticsearchClientWrapper failingClient = Mockito.mock(AivenElasticsearchClientWrapper.class); + final ElasticsearchClientWrapper failingClient = Mockito.mock(ElasticsearchClientWrapper.class); final AtomicInteger apiCallCounter = new AtomicInteger(0); when(failingClient.executeBulk(any())).thenAnswer(i -> { final int numAttempt = apiCallCounter.incrementAndGet(); @@ -76,7 +76,7 @@ public void testRetryIfRecoverable() throws IOException { @Test public void testRaiseExceptionIfNot() throws IOException { final ElasticsearchSinkTask elasticsearchSinkTask = new ElasticsearchSinkTask(); - final AivenElasticsearchClientWrapper failingClient = Mockito.mock(AivenElasticsearchClientWrapper.class); + final ElasticsearchClientWrapper failingClient = Mockito.mock(ElasticsearchClientWrapper.class); final AtomicInteger apiCallCounter = new AtomicInteger(0); when(failingClient.executeBulk(any())).thenAnswer(i -> { apiCallCounter.incrementAndGet(); diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java index d69446d..645e375 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java +++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchSinkTestBase.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,10 +30,11 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; -import io.aiven.connect.elasticsearch.clientwrapper.AivenElasticsearchClientWrapper; +import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.SearchHit; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.elasticsearch.core.search.Hit; +import com.fasterxml.jackson.databind.JsonNode; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -44,6 +46,7 @@ public class ElasticsearchSinkTestBase { + private static final String DEFAULT_ELASTICSEARCH_TEST_CONTAINER_VERSION = "8.12.2"; private static final String ELASTICSEARCH_PASSWORD = "disable_tls_for_testing"; protected static final String TYPE = "kafka-connect"; @@ -64,14 +67,30 @@ public ElasticsearchSinkTestBase() { @BeforeClass public static void staticSetUp() { - container = new ElasticsearchContainer("elasticsearch:8.12.2") + final String elasticsearchContainerVersion = System.getenv().getOrDefault( + "ELASTIC_TEST_CONTAINER_VERSION", + DEFAULT_ELASTICSEARCH_TEST_CONTAINER_VERSION + ); + container = new ElasticsearchContainer("elasticsearch:" + elasticsearchContainerVersion) .withPassword(ELASTICSEARCH_PASSWORD); container.getEnvMap().put("xpack.security.transport.ssl.enabled", "false"); container.getEnvMap().put("xpack.security.http.ssl.enabled", "false"); - container.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*\"message\":\"started.*")); + container.setWaitStrategy(new LogMessageWaitStrategy() + .withRegEx(getLogMessageWaitStrategyRegex(elasticsearchContainerVersion))); container.start(); } + private static String getLogMessageWaitStrategyRegex(final String elasticContainerVersion) { + final char majorVersion = elasticContainerVersion.charAt(0); + switch (majorVersion) { + case '7': + return ".*\"message\": \"started.*"; + default: + // Default to major version 8 log message + return ".*\"message\":\"started.*"; + } + } + @Before public void setUp() throws Exception { final Map props = new HashMap<>(); @@ -80,7 +99,7 @@ public void setUp() throws Exception { props.put(ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG, "elastic"); props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, "kafka-connect"); final ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props); - client = new AivenElasticsearchClientWrapper(config); + client = new ElasticsearchClientWrapper(config); converter = new DataConverter(true, DataConverter.BehaviorOnNullValues.IGNORE); } @@ -146,15 +165,15 @@ protected void verifySearchResults( final String index, final boolean ignoreKey, final boolean ignoreSchema) throws IOException { - final SearchResponse result = client.search(index); - final SearchHit[] rawHits = result.getHits().getHits(); + final SearchResponse result = client.search(index); + final List> rawHits = result.hits().hits(); - assertEquals(records.size(), rawHits.length); + assertEquals(records.size(), rawHits.size()); final Map hits = new HashMap<>(); - for (int i = 0; i < rawHits.length; ++i) { - final String id = rawHits[i].getId(); - final String source = rawHits[i].getSourceAsString(); + for (final Hit hit: rawHits) { + final String id = hit.id(); + final String source = hit.source().toString(); hits.put(id, source); } diff --git a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java index b0bfefe..3b7cd90 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java +++ b/src/test/java/io/aiven/connect/elasticsearch/ElasticsearchWriterTest.java @@ -352,7 +352,7 @@ public void testDeleteOnNullValue() throws Exception { // Can't call writeDataAndRefresh(writer, records) since it stops the writer writer.write(records); writer.flush(); - client.refresh(getTopic()); + client.refreshIndex(getTopic()); // Make sure the record made it there successfully verifySearchResults(records); @@ -534,7 +534,7 @@ private void writeDataAndRefresh( writer.write(records); writer.flush(); writer.stop(); - client.refresh(index); + client.refreshIndex(index); } private void verifySearchResults(final Collection records) throws Exception { diff --git a/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java b/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java index 63f0437..283aa1d 100644 --- a/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java +++ b/src/test/java/io/aiven/connect/elasticsearch/MappingTest.java @@ -18,7 +18,6 @@ package io.aiven.connect.elasticsearch; import java.util.HashSet; -import java.util.Map; import java.util.Set; import org.apache.kafka.connect.data.Date; @@ -29,16 +28,12 @@ import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; -import com.fasterxml.jackson.databind.node.NumericNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import org.elasticsearch.cluster.metadata.MappingMetadata; +import co.elastic.clients.elasticsearch._types.mapping.Property; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.junit.Assert.assertTrue; public class MappingTest extends ElasticsearchSinkTestBase { @@ -46,7 +41,6 @@ public class MappingTest extends ElasticsearchSinkTestBase { private static final String TYPE = "kafka-connect-type"; @Test - @SuppressWarnings("unchecked") public void testMapping() throws Exception { final Set indices = new HashSet<>(); indices.add(INDEX); @@ -55,32 +49,10 @@ public void testMapping() throws Exception { final Schema schema = createSchema(); Mapping.createMapping(client, INDEX, TYPE, schema); - final MappingMetadata mapping = Mapping.getMapping(client, INDEX, TYPE); + final Property mapping = Mapping.getMapping(client, INDEX, TYPE); assertNotNull(mapping); - verifyMapping(schema, mapping.sourceAsMap()); - } - - @Test - @SuppressWarnings("unchecked") - public void testStringMappingForES6() throws Exception { - final ElasticsearchClient client = mock(ElasticsearchClient.class); - when(client.getVersion()).thenReturn(ElasticsearchClient.Version.ES_V6); - - final Schema schema = SchemaBuilder.struct().name("textRecord") - .field("string", Schema.STRING_SCHEMA) - .build(); - final ObjectNode mapping = (ObjectNode) Mapping.inferMapping(client.getVersion(), schema); - final ObjectNode properties = mapping.with("properties"); - final ObjectNode string = properties.with("string"); - final TextNode stringType = (TextNode) string.get("type"); - final ObjectNode fields = string.with("fields"); - final ObjectNode keyword = fields.with("keyword"); - final TextNode keywordType = (TextNode) keyword.get("type"); - final NumericNode ignoreAbove = (NumericNode) keyword.get("ignore_above"); - - assertEquals(ElasticsearchSinkConnectorConstants.TEXT_TYPE, stringType.asText()); - assertEquals(ElasticsearchSinkConnectorConstants.KEYWORD_TYPE, keywordType.asText()); - assertEquals(256, ignoreAbove.asInt()); + assertTrue(mapping.isObject()); + verifyMapping(schema, mapping); } protected Schema createSchema() { @@ -125,19 +97,17 @@ private Schema createInnerSchema() { .build(); } - @SuppressWarnings("unchecked") - private void verifyMapping(final Schema schema, final Map mapping) { + private void verifyMapping(final Schema schema, final Property property) { final String schemaName = schema.name(); - final Object type = mapping.get("type"); if (schemaName != null) { switch (schemaName) { case Date.LOGICAL_NAME: case Time.LOGICAL_NAME: case Timestamp.LOGICAL_NAME: - assertEquals(ElasticsearchSinkConnectorConstants.DATE_TYPE, type.toString()); + assertEquals(ElasticsearchSinkConnectorConstants.DATE_TYPE, property._kind().jsonValue()); return; case Decimal.LOGICAL_NAME: - assertEquals(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, type.toString()); + assertEquals(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, property._kind().jsonValue()); return; default: } @@ -148,30 +118,29 @@ private void verifyMapping(final Schema schema, final Map mappin final Schema.Type schemaType = schema.type(); switch (schemaType) { case ARRAY: - verifyMapping(schema.valueSchema(), mapping); + verifyMapping(schema.valueSchema(), property); break; case MAP: final Schema newSchema = converter.preProcessSchema(schema); - final Map> mapMapping = - (Map>) mapping.get("properties"); verifyMapping( newSchema.keySchema(), - mapMapping.get(ElasticsearchSinkConnectorConstants.MAP_KEY) + property.object().properties().get(ElasticsearchSinkConnectorConstants.MAP_KEY) ); verifyMapping( newSchema.valueSchema(), - mapMapping.get(ElasticsearchSinkConnectorConstants.MAP_VALUE) + property.object().properties().get(ElasticsearchSinkConnectorConstants.MAP_VALUE) ); break; case STRUCT: - final Map> structMapping = - (Map>) mapping.get("properties"); for (final Field field : schema.fields()) { - verifyMapping(field.schema(), structMapping.get(field.name())); + verifyMapping(field.schema(), property.object().properties().get(field.name())); } break; default: - assertEquals(Mapping.getElasticsearchType(client.getVersion(), schemaType), type.toString()); + assertEquals( + Mapping.getElasticsearchType(client.getVersion(), schemaType), + property._kind().jsonValue() + ); } } } diff --git a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java deleted file mode 100644 index a0ed1c5..0000000 --- a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/AivenElasticsearchClientWrapperTest.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Copyright 2020 Aiven Oy - * Copyright 2018 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.connect.elasticsearch.clientwrapper; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; - -import io.aiven.connect.elasticsearch.ElasticsearchClient; -import io.aiven.connect.elasticsearch.IndexableRecord; -import io.aiven.connect.elasticsearch.Key; -import io.aiven.connect.elasticsearch.Mapping; -import io.aiven.connect.elasticsearch.bulk.BulkRequest; - -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Sets; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchResponseSections; -import org.elasticsearch.client.IndicesClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.core.MainResponse; -import org.elasticsearch.client.indices.CreateIndexRequest; -import org.elasticsearch.client.indices.CreateIndexResponse; -import org.elasticsearch.client.indices.GetIndexRequest; -import org.elasticsearch.client.indices.GetMappingsRequest; -import org.elasticsearch.client.indices.GetMappingsResponse; -import org.elasticsearch.client.indices.PutMappingRequest; -import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.SearchHits; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentMatcher; -import org.mockito.InOrder; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class AivenElasticsearchClientWrapperTest { - - private static final String INDEX = "index"; - private static final String KEY = "key"; - private static final String TYPE = "type"; - - private RestHighLevelClient elasticsearchClient; - - @Before - public void setUp() throws Exception { - elasticsearchClient = mock(RestHighLevelClient.class); - when(elasticsearchClient.info(any())).thenReturn( - new MainResponse("localhost", - new MainResponse.Version("1.0", "buildFlavor", - "buildType", "buildHash", "buildDate", false, - "luceneVersion", - "minWireCompVersion", "minIndexCompVersion"), - "clusterName", UUID.randomUUID().toString(), "tagLine" - ) - ); - } - - @Test - public void getsVersion() { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - assertThat(client.getVersion(), is(equalTo(ElasticsearchClient.Version.ES_V1))); - } - - @Test - public void createsIndices() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final IndicesClient indicesClient = mock(IndicesClient.class); - when(elasticsearchClient.indices()).thenReturn(indicesClient); - when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false); - when(indicesClient.create(any(CreateIndexRequest.class), any())).thenThrow(new IOException("failure")); - - when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false); - when(indicesClient.create(argThat(isCreateIndexForTestIndex()), any())).thenReturn( - new CreateIndexResponse(true, true, INDEX)); - - final Set indices = Sets.newHashSet(); - indices.add(INDEX); - client.createIndices(indices); - final InOrder inOrder = inOrder(elasticsearchClient, indicesClient); - inOrder.verify(elasticsearchClient).info(any()); - inOrder.verify(indicesClient).exists(any(GetIndexRequest.class), any()); - inOrder.verify(indicesClient).create(argThat(isCreateIndexForTestIndex()), any()); - } - - private ArgumentMatcher isCreateIndexForTestIndex() { - return new ArgumentMatcher() { - @Override - public boolean matches(final CreateIndexRequest createIndexRequest) { - // check the URI as the equals method on CreateIndex doesn't work - return createIndexRequest.index().equals(INDEX); - } - }; - } - - @Test - public void createIndicesAndFails() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final IndicesClient indicesClient = mock(IndicesClient.class); - when(elasticsearchClient.indices()).thenReturn(indicesClient); - when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(false); - when(indicesClient.create(any(CreateIndexRequest.class), any())).thenThrow(new IOException("failure")); - - final Set indices = new HashSet<>(); - indices.add("test-index"); - assertThrows("Could not create index 'test-index'", ConnectException.class, () -> { - client.createIndices(indices); - }); - } - - @Test - public void createsMapping() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final IndicesClient indicesClient = mock(IndicesClient.class); - when(elasticsearchClient.indices()).thenReturn(indicesClient); - final ObjectNode obj = JsonNodeFactory.instance.objectNode(); - obj.set(TYPE, Mapping.inferMapping(client.getVersion(), Schema.STRING_SCHEMA)); - client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); - verify(indicesClient).putMapping(any(PutMappingRequest.class), any()); - } - - @Test(expected = ConnectException.class) - public void createsMappingAndFails() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final IndicesClient indicesClient = mock(IndicesClient.class); - when(elasticsearchClient.indices()).thenReturn(indicesClient); - when(indicesClient.putMapping(any(PutMappingRequest.class), any())) - .thenThrow(new ElasticsearchException("failure")); - client.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); - } - - @Test - public void getsMapping() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final IndicesClient indicesClient = mock(IndicesClient.class); - when(elasticsearchClient.indices()).thenReturn(indicesClient); - final MappingMetadata mappingMetadata = new MappingMetadata(TYPE, new HashMap<>()); - final Map mappingsMetadata = new HashMap<>(); - mappingsMetadata.put(INDEX, mappingMetadata); - when(indicesClient.getMapping(any(GetMappingsRequest.class), any())).thenReturn( - new GetMappingsResponse(mappingsMetadata)); - assertEquals(client.getMapping(INDEX, TYPE), new MappingMetadata(TYPE, new HashMap<>())); - } - - @Test - public void executesBulk() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - BulkItemResponse.success(1, DocWriteRequest.OpType.CREATE, - new IndexResponse(ShardId.fromString("[" + INDEX + "][1]"), TYPE, "id", 1L, 1L, 1L, true) - ) - }; - when(elasticsearchClient.bulk(any(), any())).thenReturn( - new BulkResponse(bulkItemResponses, 100) - ); - - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - assertThat(client.executeBulk(request).isSucceeded(), is(equalTo(true))); - } - - @Test - public void executesBulkAndFails() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( - INDEX, TYPE, "id", new IOException("failure") - )) - }; - when(elasticsearchClient.bulk(any(), any())).thenReturn( - new BulkResponse(bulkItemResponses, 100) - ); - - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), null, 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); - assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); - assertThat(bulkResponse.isRetriable(), is(equalTo(true))); - assertEquals("[java.io.IOException: failure]", bulkResponse.getErrorInfo()); - } - - @Test - public void executesBulkAndFailsWithParseError() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( - INDEX, AivenElasticsearchClientWrapper.MAPPER_PARSE_EXCEPTION, "id", - new ElasticsearchException("[type=mapper_parse_exception, reason=[key]: Mapper parse error]") - - )) - }; - when(elasticsearchClient.bulk(any(), any())).thenReturn( - new BulkResponse(bulkItemResponses, 100) - ); - - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); - assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); - assertThat(bulkResponse.isRetriable(), is(equalTo(false))); - assertEquals( - "[ElasticsearchException[[type=mapper_parse_exception, reason=[key]: Mapper parse error]]]", - bulkResponse.getErrorInfo() - ); - } - - @Test - public void executesBulkAndFailsWithSomeOtherError() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( - INDEX, "some_random_exception", "id", - new ElasticsearchException("[type=random_type_string, reason=[key]: Unknown error]") - )) - }; - when(elasticsearchClient.bulk(any(), any())).thenReturn( - new BulkResponse(bulkItemResponses, 100) - ); - - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); - assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); - assertThat(bulkResponse.isRetriable(), is(equalTo(true))); - assertEquals( - "[ElasticsearchException[[type=random_type_string, reason=[key]: Unknown error]]]", - bulkResponse.getErrorInfo() - ); - } - - @Test - public void executesBulkAndSucceedsBecauseOnlyVersionConflicts() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - BulkItemResponse.failure(1, DocWriteRequest.OpType.CREATE, new BulkItemResponse.Failure( - INDEX, "_doc", "id", - new ElasticsearchException( - "[type=version_conflict_engine_exception, reason=[key]: " - + "version conflict, current version [1] is higher or equal to the one provided [0]]") - )) - }; - when(elasticsearchClient.bulk(any(), any())).thenReturn( - new BulkResponse(bulkItemResponses, 100) - ); - - - final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); - final List records = new ArrayList<>(); - records.add(record); - final BulkRequest request = client.createBulkRequest(records); - final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = client.executeBulk(request); - assertThat(bulkResponse.isSucceeded(), is(equalTo(true))); - } - - @Test - public void searches() throws Exception { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - final SearchResponse response = new SearchResponse( - new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 0), - "scrollId", 1, 1, 0, 100L, null, SearchResponse.Clusters.EMPTY - ); - when(elasticsearchClient.search(any(SearchRequest.class), any())).thenReturn(response); - assertNotNull(client.search(INDEX)); - verify(elasticsearchClient).search(any(SearchRequest.class), any()); - } - - @Test - public void closes() throws IOException { - final AivenElasticsearchClientWrapper client = new AivenElasticsearchClientWrapper(elasticsearchClient); - client.close(); - verify(elasticsearchClient).close(); - } -} diff --git a/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapperTest.java b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapperTest.java new file mode 100644 index 0000000..e1c4dd7 --- /dev/null +++ b/src/test/java/io/aiven/connect/elasticsearch/clientwrapper/ElasticsearchClientWrapperTest.java @@ -0,0 +1,411 @@ +/* + * Copyright 2020 Aiven Oy + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.elasticsearch.clientwrapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; + +import io.aiven.connect.elasticsearch.ElasticsearchClient; +import io.aiven.connect.elasticsearch.IndexableRecord; +import io.aiven.connect.elasticsearch.Key; +import io.aiven.connect.elasticsearch.Mapping; +import io.aiven.connect.elasticsearch.bulk.BulkRequest; + +import co.elastic.clients.elasticsearch._types.ElasticsearchException; +import co.elastic.clients.elasticsearch._types.ElasticsearchVersionInfo; +import co.elastic.clients.elasticsearch._types.ErrorCause; +import co.elastic.clients.elasticsearch._types.ErrorResponse; +import co.elastic.clients.elasticsearch._types.ShardStatistics; +import co.elastic.clients.elasticsearch._types.mapping.Property; +import co.elastic.clients.elasticsearch._types.mapping.TextProperty; +import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; +import co.elastic.clients.elasticsearch.core.BulkResponse; +import co.elastic.clients.elasticsearch.core.InfoResponse; +import co.elastic.clients.elasticsearch.core.SearchRequest; +import co.elastic.clients.elasticsearch.core.SearchResponse; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; +import co.elastic.clients.elasticsearch.core.bulk.OperationType; +import co.elastic.clients.elasticsearch.core.search.HitsMetadata; +import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; +import co.elastic.clients.elasticsearch.indices.CreateIndexResponse; +import co.elastic.clients.elasticsearch.indices.ElasticsearchIndicesClient; +import co.elastic.clients.elasticsearch.indices.ExistsRequest; +import co.elastic.clients.elasticsearch.indices.GetMappingRequest; +import co.elastic.clients.elasticsearch.indices.GetMappingResponse; +import co.elastic.clients.elasticsearch.indices.PutMappingRequest; +import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.endpoints.BooleanResponse; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.InOrder; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ElasticsearchClientWrapperTest { + + private static final String INDEX = "index"; + private static final String KEY = "key"; + private static final String TYPE = "type"; + + private ElasticsearchTransport elasticsearchTransport; + private co.elastic.clients.elasticsearch.ElasticsearchClient elasticsearchClient; + private ElasticsearchClientWrapper elasticsearchClientWrapper; + + @Before + public void setUp() throws Exception { + elasticsearchTransport = mock(ElasticsearchTransport.class); + elasticsearchClient = mock(co.elastic.clients.elasticsearch.ElasticsearchClient.class); + when(elasticsearchClient.info()).thenReturn( + new InfoResponse.Builder() + .version(new ElasticsearchVersionInfo.Builder() + .number("1.0") + .buildFlavor("buildFlavor") + .buildType("buildType") + .buildHash("buildHash") + .buildDate("buildDate") + .buildSnapshot(false) + .luceneVersion("luceneVersion") + .minimumWireCompatibilityVersion("minWireCompVersion") + .minimumIndexCompatibilityVersion("minIndexCompVersion") + .build() + ) + .clusterName("clusterName") + .clusterUuid(UUID.randomUUID().toString()) + .tagline("tagLine") + .name("name") + .build() + ); + // InfoResponse required for resolving server version when initializing the client + elasticsearchClientWrapper = new ElasticsearchClientWrapper(elasticsearchTransport, elasticsearchClient); + } + + @Test + public void getsVersion() { + assertThat(elasticsearchClientWrapper.getVersion(), is(equalTo(ElasticsearchClient.Version.ES_V1))); + } + + @Test + public void createsIndices() throws Exception { + final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + when(indicesClient.exists(any(ExistsRequest.class))).thenReturn(new BooleanResponse(false)); + when(indicesClient.create(any(CreateIndexRequest.class))).thenThrow(new IOException("failure")); + + when(indicesClient.exists(any(ExistsRequest.class))).thenReturn(new BooleanResponse(false)); + when(indicesClient.create(argThat(isCreateIndexForTestIndex()))).thenReturn( + new CreateIndexResponse.Builder() + .index(INDEX) + .acknowledged(true) + .shardsAcknowledged(true) + .build() + ); + + final Set indices = new HashSet<>(); + indices.add(INDEX); + elasticsearchClientWrapper.createIndices(indices); + final InOrder inOrder = inOrder(elasticsearchClient, indicesClient); + inOrder.verify(elasticsearchClient).info(); + inOrder.verify(indicesClient).exists(any(ExistsRequest.class)); + inOrder.verify(indicesClient).create(argThat(isCreateIndexForTestIndex())); + } + + private ArgumentMatcher isCreateIndexForTestIndex() { + return new ArgumentMatcher() { + @Override + public boolean matches(final CreateIndexRequest createIndexRequest) { + // check the URI as the equals method on CreateIndex doesn't work + return createIndexRequest.index().equals(INDEX); + } + }; + } + + @Test + public void createIndicesAndFails() throws Exception { + final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + when(indicesClient.exists(any(ExistsRequest.class))).thenReturn(new BooleanResponse(false)); + when(indicesClient.create(any(CreateIndexRequest.class))).thenThrow(new IOException("failure")); + + final Set indices = new HashSet<>(); + indices.add("test-index"); + assertThrows("Could not create index 'test-index'", ConnectException.class, () -> { + elasticsearchClientWrapper.createIndices(indices); + }); + } + + @Test + public void createsMapping() throws Exception { + final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + final ObjectNode obj = JsonNodeFactory.instance.objectNode(); + obj.set(TYPE, Mapping.inferMapping(elasticsearchClientWrapper.getVersion(), Schema.STRING_SCHEMA)); + elasticsearchClientWrapper.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); + verify(indicesClient).putMapping(any(PutMappingRequest.class)); + } + + @Test(expected = ConnectException.class) + public void createsMappingAndFails() throws Exception { + final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + when(indicesClient.putMapping(any(PutMappingRequest.class))) + .thenThrow( + new ElasticsearchException("endpointId", new ErrorResponse.Builder() + .error(new ErrorCause.Builder() + .reason("failure") + .build()) + .status(500) + .build()) + ); + elasticsearchClientWrapper.createMapping(INDEX, TYPE, Schema.STRING_SCHEMA); + } + + @Test + public void getsMapping() throws Exception { + final ElasticsearchIndicesClient indicesClient = mock(ElasticsearchIndicesClient.class); + when(elasticsearchClient.indices()).thenReturn(indicesClient); + final IndexMappingRecord indexMappingRecord = new IndexMappingRecord.Builder() + .mappings(new TypeMapping.Builder() + .properties(TYPE, builder -> { + return builder.text(new TextProperty.Builder().build()); + }).build()) + .build(); + final Map indexMappingRecordMap = new HashMap<>(); + indexMappingRecordMap.put(INDEX, indexMappingRecord); + when(indicesClient.getMapping(any(GetMappingRequest.class))).thenReturn( + new GetMappingResponse.Builder() + .result(indexMappingRecordMap) + .build() + ); + + assertEquals( + new Property.Builder().text(new TextProperty.Builder().build()).build()._kind().jsonValue(), + elasticsearchClientWrapper.getMapping(INDEX, TYPE)._kind().jsonValue() + ); + } + + @Test + public void executesBulk() throws Exception { + final List items = new ArrayList<>(); + items.add(new BulkResponseItem.Builder() + .operationType(OperationType.Create) + .index(INDEX) + .status(200) + .build() + ); + when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn( + new BulkResponse.Builder() + .items(items) + .errors(false) + .took(100) + .build() + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records); + assertThat(elasticsearchClientWrapper.executeBulk(request).isSucceeded(), is(equalTo(true))); + } + + @Test + public void executesBulkAndFails() throws Exception { + final List items = new ArrayList<>(); + items.add(new BulkResponseItem.Builder() + .operationType(OperationType.Create) + .index(INDEX) + .status(200) + .error(builder -> builder + .type("failure") + .reason("failure") + ) + .build() + ); + when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn( + new BulkResponse.Builder() + .items(items) + .errors(true) + .took(100) + .build() + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), null, 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = + elasticsearchClientWrapper.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); + assertThat(bulkResponse.isRetriable(), is(equalTo(true))); + assertEquals("[failure]", bulkResponse.getErrorInfo()); + } + + @Test + public void executesBulkAndFailsWithParseError() throws Exception { + final List items = new ArrayList<>(); + items.add(new BulkResponseItem.Builder() + .operationType(OperationType.Create) + .index(INDEX) + .status(200) + .error(builder -> builder + .type("mapper_parse_exception") + .reason("[type=mapper_parse_exception, reason=[key]: Mapper parse error]") + ) + .build() + ); + when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn( + new BulkResponse.Builder() + .items(items) + .errors(true) + .took(100) + .build() + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = + elasticsearchClientWrapper.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); + assertThat(bulkResponse.isRetriable(), is(equalTo(false))); + assertEquals( + "[[type=mapper_parse_exception, reason=[key]: Mapper parse error]]", + bulkResponse.getErrorInfo() + ); + } + + @Test + public void executesBulkAndFailsWithSomeOtherError() throws Exception { + final List items = new ArrayList<>(); + items.add(new BulkResponseItem.Builder() + .operationType(OperationType.Create) + .index(INDEX) + .status(200) + .error(builder -> builder + .type("random_error_type_string") + .reason("[type=random_type_string, reason=[key]: Unknown error]") + ) + .build() + ); + when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn( + new BulkResponse.Builder() + .items(items) + .errors(true) + .took(100) + .build() + ); + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = + elasticsearchClientWrapper.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(false))); + assertThat(bulkResponse.isRetriable(), is(equalTo(true))); + assertEquals( + "[[type=random_type_string, reason=[key]: Unknown error]]", + bulkResponse.getErrorInfo() + ); + } + + @Test + public void executesBulkAndSucceedsBecauseOnlyVersionConflicts() throws Exception { + final List items = new ArrayList<>(); + items.add(new BulkResponseItem.Builder() + .operationType(OperationType.Create) + .index(INDEX) + .status(200) + .error(builder -> builder + .type("version_conflict_engine_exception") + .reason("[type=version_conflict_engine_exception, reason=[key]: " + + "version conflict, current version [1] is higher or equal to the one provided [0]]") + ) + .build() + ); + when(elasticsearchClient.bulk(any(co.elastic.clients.elasticsearch.core.BulkRequest.class))).thenReturn( + new BulkResponse.Builder() + .items(items) + .errors(true) + .took(100) + .build() + ); + + + final IndexableRecord record = new IndexableRecord(new Key(INDEX, TYPE, KEY), "payload", 0L); + final List records = new ArrayList<>(); + records.add(record); + final BulkRequest request = elasticsearchClientWrapper.createBulkRequest(records); + final io.aiven.connect.elasticsearch.bulk.BulkResponse bulkResponse = + elasticsearchClientWrapper.executeBulk(request); + assertThat(bulkResponse.isSucceeded(), is(equalTo(true))); + } + + @Test + public void searches() throws Exception { + final SearchResponse response = new SearchResponse.Builder() + .took(100) + .timedOut(false) + .hits( + new HitsMetadata.Builder() + .hits(new ArrayList<>()) + .build() + ) + .shards( + new ShardStatistics.Builder() + .failed(1) + .successful(1) + .total(2) + .build() + ).build(); + when(elasticsearchClient.search(any(SearchRequest.class), any())).thenReturn(response); + assertNotNull(elasticsearchClientWrapper.search(INDEX)); + verify(elasticsearchClient).search(any(SearchRequest.class), any()); + } + + @Test + public void closes() throws IOException { + elasticsearchClientWrapper.close(); + verify(elasticsearchTransport).close(); + } +}