diff --git a/.github/workflows/release-edc.yml b/.github/workflows/release-edc.yml index 4ccb8813614..aebd13c36e4 100644 --- a/.github/workflows/release-edc.yml +++ b/.github/workflows/release-edc.yml @@ -41,6 +41,13 @@ jobs: outputs: edc-version: ${{ env.EDC_VERSION }} + publish-autodoc: + needs: Prepare-Release + uses: eclipse-edc/.github/.github/workflows/publish-autodoc.yml@main + secrets: inherit + with: + version: ${{ needs.Prepare-Release.outputs.edc-version }} + publish-openapi-ui: needs: Prepare-Release uses: eclipse-edc/.github/.github/workflows/publish-openapi-ui.yml@main diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java deleted file mode 100644 index 09db627435d..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e; - -import jakarta.json.Json; -import jakarta.json.JsonObject; -import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.Map; -import java.util.UUID; - -import static io.restassured.RestAssured.given; -import static jakarta.json.Json.createObjectBuilder; -import static java.time.Duration.ofDays; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.test.system.utils.PolicyFixtures.inForceDatePolicy; -import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; - -public abstract class AbstractEndToEndTransfer { - - protected final Duration timeout = Duration.ofSeconds(60); - protected static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() - .name("consumer") - .id("urn:connector:consumer") - .build(); - protected static final EndToEndTransferParticipant PROVIDER = EndToEndTransferParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - - @Test - void httpPull_dataTransfer() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress()); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - }); - - // retrieve the data reference - var edr = CONSUMER.getDataReference(transferProcessId); - - // pull the data without query parameter - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); - - // pull the data with additional query parameter - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); - - // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one - assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); - } - - @Test - void httpPull_dataTransfer_withTransferType() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - }); - - // retrieve the data reference - var edr = CONSUMER.getDataReference(transferProcessId); - - // pull the data without query parameter - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); - - // pull the data with additional query parameter - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); - - // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one - assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); - } - - @Test - void httpPull_withExpiredContract_fixedInForcePeriod() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - var now = Instant.now(); - - // contract was valid from t-10d to t-5d, so "now" it is expired - var contractPolicy = inForceDatePolicy("gteq", now.minus(ofDays(10)), "lteq", now.minus(ofDays(5))); - createResourcesOnProvider(assetId, contractPolicy, httpDataAddressProperties()); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(TERMINATED.name()); - }); - } - - @Test - void httpPull_withExpiredContract_durationInForcePeriod() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - var now = Instant.now(); - // contract was valid from t-10d to t-5d, so "now" it is expired - var contractPolicy = inForceDatePolicy("gteq", now.minus(ofDays(10)), "lteq", "contractAgreement+1s"); - createResourcesOnProvider(assetId, contractPolicy, httpDataAddressProperties()); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(TERMINATED.name()); - }); - } - - @Test - void httpPullDataTransferProvisioner() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/data", - "type", "HttpProvision", - "proxyQueryParams", "true" - )); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - - var edr = CONSUMER.getDataReference(transferProcessId); - CONSUMER.pullData(edr, Map.of(), equalTo("some information")); - }); - } - - @Test - void httpPushDataTransfer() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); - } - - @Test - void httpPushDataTransfer_withTransferType() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); - } - - @Test - @DisplayName("Provider pushes data to Consumer, Provider needs to authenticate the data request through an oauth2 server") - void httpPushDataTransfer_oauth2Provisioning() { - registerDataPlanes(); - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressOauth2Properties()); - var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); - - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); - } - - private JsonObject httpDataAddress(String baseUrl) { - return createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpData") - .add(EDC_NAMESPACE + "baseUrl", baseUrl) - .build(); - } - - private JsonObject syncDataAddress() { - return createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpProxy") - .build(); - } - - @NotNull - private Map httpDataAddressOauth2Properties() { - return Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/oauth2data", - "type", "HttpData", - "proxyQueryParams", "true", - "oauth2:clientId", "clientId", - "oauth2:clientSecretKey", "provision-oauth-secret", - "oauth2:tokenUrl", PROVIDER.backendService() + "/api/oauth2/token" - ); - } - - @NotNull - private Map httpDataAddressProperties() { - return Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/data", - "type", "HttpData", - "proxyQueryParams", "true" - ); - } - - private void registerDataPlanes() { - PROVIDER.registerDataPlane(); - } - - private void createResourcesOnProvider(String assetId, JsonObject contractPolicy, Map dataAddressProperties) { - PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); - var accessPolicyId = PROVIDER.createPolicyDefinition(noConstraintPolicy()); - var contractPolicyId = PROVIDER.createPolicyDefinition(contractPolicy); - PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); - } - - private JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java deleted file mode 100644 index 13f6e4092c9..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright (c) 2023 Amadeus - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Amadeus - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.handler.codec.http.HttpMethod; -import jakarta.json.Json; -import jakarta.json.JsonObject; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.eclipse.edc.test.e2e.annotations.KafkaIntegrationTest; -import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; -import org.eclipse.edc.test.e2e.serializers.JacksonDeserializer; -import org.eclipse.edc.test.e2e.serializers.JacksonSerializer; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import javax.validation.constraints.NotNull; - -import static java.lang.String.format; -import static java.time.Duration.ZERO; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; -import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.test.system.utils.PolicyFixtures.inForceDatePolicy; -import static org.mockserver.integration.ClientAndServer.startClientAndServer; -import static org.mockserver.model.HttpRequest.request; -import static org.mockserver.model.HttpResponse.response; -import static org.mockserver.stop.Stop.stopQuietly; -import static org.mockserver.verify.VerificationTimes.atLeast; -import static org.mockserver.verify.VerificationTimes.never; - -@KafkaIntegrationTest -class EndToEndKafkaTransferTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String KAFKA_SERVER = "localhost:9092"; - private static final Duration TIMEOUT = Duration.ofSeconds(60); - private static final String SINK_HTTP_PATH = "/api/service"; - private static final String SOURCE_TOPIC = "source_topic"; - private static final String SINK_TOPIC = "sink_topic"; - private static final int EVENT_DESTINATION_PORT = getFreePort(); - private static final JsonNode JSON_MESSAGE = sampleMessage(); - private static final AtomicInteger MESSAGE_COUNTER = new AtomicInteger(); - private static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() - .name("consumer") - .id("urn:connector:consumer") - .build(); - private static final EndToEndTransferParticipant PROVIDER = EndToEndTransferParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - static String[] controlPlaneModules = new String[]{ - ":system-tests:e2e-transfer-test:control-plane", - ":extensions:control-plane:transfer:transfer-data-plane", - ":extensions:data-plane:data-plane-client" - }; - @RegisterExtension - static EdcRuntimeExtension consumerControlPlane = new EdcRuntimeExtension( - "consumer-control-plane", - CONSUMER.controlPlaneConfiguration(), - controlPlaneModules - ); - - @RegisterExtension - static EdcRuntimeExtension providerDataPlane = new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:data-plane", - "provider-data-plane", - PROVIDER.dataPlaneConfiguration() - ); - - @RegisterExtension - static EdcRuntimeExtension providerControlPlane = new EdcRuntimeExtension( - "provider-control-plane", - PROVIDER.controlPlaneConfiguration(), - controlPlaneModules - ); - - @BeforeAll - public static void setUp() { - startKafkaProducer(); - } - - private static Consumer createKafkaConsumer() { - var props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); - return new KafkaConsumer<>(props); - } - - private static Producer createKafkaProducer() { - var props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); - return new KafkaProducer<>(props); - } - - private static void startKafkaProducer() { - var producer = createKafkaProducer(); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( - () -> producer.send(new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(MESSAGE_COUNTER.getAndIncrement()), JSON_MESSAGE)), - 0, 100, MILLISECONDS); - } - - private static JsonObject httpSink() { - return Json.createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpData") - .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() - .add(EDC_NAMESPACE + "name", "data") - .add(EDC_NAMESPACE + "baseUrl", format("http://localhost:%s", EVENT_DESTINATION_PORT)) - .add(EDC_NAMESPACE + "path", SINK_HTTP_PATH) - .build()) - .build(); - } - - @NotNull - private static JsonObject kafkaSink() { - return Json.createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "Kafka") - .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() - .add(EDC_NAMESPACE + "topic", SINK_TOPIC) - .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), KAFKA_SERVER) - .build()) - .build(); - } - - @NotNull - private static Map kafkaSourceProperty() { - return Map.of( - "name", "data", - "type", "Kafka", - "topic", SOURCE_TOPIC, - kafkaProperty("bootstrap.servers"), KAFKA_SERVER, - kafkaProperty("max.poll.records"), "100" - ); - } - - private static JsonNode sampleMessage() { - var node = OBJECT_MAPPER.createObjectNode(); - node.put("foo", "bar"); - return node; - } - - private static String kafkaProperty(String property) { - return "kafka." + property; - } - - private static JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } - - @Test - void kafkaToHttpTransfer() throws JsonProcessingException { - var destinationServer = startClientAndServer(EVENT_DESTINATION_PORT); - var request = request() - .withMethod(HttpMethod.POST.name()) - .withPath(SINK_HTTP_PATH) - .withBody(OBJECT_MAPPER.writeValueAsBytes(JSON_MESSAGE)); - destinationServer.when(request).respond(response()); - PROVIDER.registerDataPlane(Set.of("Kafka"), Set.of("HttpData")); - - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, kafkaSourceProperty()); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), httpSink()); - - await().atMost(TIMEOUT).untilAsserted(() -> { - destinationServer.verify(request, atLeast(1)); - }); - - await().atMost(TIMEOUT).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); - }); - - destinationServer.clear(request) - .when(request).respond(response()); - await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> { - try { - destinationServer.verify(request, never()); - } catch (AssertionError assertionError) { - destinationServer.clear(request) - .when(request).respond(response()); - throw assertionError; - } - }); - - stopQuietly(destinationServer); - } - - @Test - void kafkaToKafkaTransfer() { - try (var consumer = createKafkaConsumer()) { - consumer.subscribe(List.of(SINK_TOPIC)); - - PROVIDER.registerDataPlane(Set.of("Kafka"), Set.of("Kafka")); - - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, kafkaSourceProperty()); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), kafkaSink()); - await().atMost(TIMEOUT).untilAsserted(() -> { - var records = consumer.poll(ZERO); - assertThat(records.isEmpty()).isFalse(); - records.records(SINK_TOPIC).forEach(record -> assertThat(record.value()).isEqualTo(JSON_MESSAGE)); - }); - - await().atMost(TIMEOUT).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); - }); - - consumer.poll(ZERO); - await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> { - var recordsFound = consumer.poll(Duration.ofSeconds(1)).records(SINK_TOPIC); - assertThat(recordsFound).isEmpty(); - }); - } - } - - private void createResourcesOnProvider(String assetId, Map dataAddressProperties) { - PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); - var policy = inForceDatePolicy("gteq", "contractAgreement+0s", "lteq", "contractAgreement+10s"); - var policyDefinition = PROVIDER.createPolicyDefinition(policy); - PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), policyDefinition, policyDefinition); - } -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/InMemoryRuntimes.java similarity index 82% rename from system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java rename to system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/InMemoryRuntimes.java index 241540ceb65..e0e5b98c01e 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/InMemoryRuntimes.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -14,28 +14,29 @@ package org.eclipse.edc.test.e2e; -import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.junit.jupiter.api.extension.RegisterExtension; import java.util.HashMap; -@EndToEndTest -class EndToEndTransferInMemoryTest extends AbstractEndToEndTransfer { +import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.CONSUMER; +import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.PROVIDER; - static String[] controlPlaneModules = new String[]{ +public interface InMemoryRuntimes { + + String[] CONTROL_PLANE_MODULES = new String[]{ ":system-tests:e2e-transfer-test:control-plane", ":extensions:control-plane:transfer:transfer-data-plane", ":extensions:data-plane:data-plane-client" }; @RegisterExtension - static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( + EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( new EdcRuntimeExtension( "consumer-control-plane", CONSUMER.controlPlaneConfiguration(), - controlPlaneModules + CONTROL_PLANE_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -54,7 +55,7 @@ class EndToEndTransferInMemoryTest extends AbstractEndToEndTransfer { new EdcRuntimeExtension( "provider-control-plane", PROVIDER.controlPlaneConfiguration(), - controlPlaneModules + CONTROL_PLANE_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresRuntimes.java similarity index 82% rename from system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java rename to system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresRuntimes.java index e85f48be5d4..3ee93ce9511 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/PostgresRuntimes.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -14,7 +14,6 @@ package org.eclipse.edc.test.e2e; -import org.eclipse.edc.junit.annotations.PostgresqlDbIntegrationTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -23,17 +22,19 @@ import java.util.HashMap; import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; +import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.CONSUMER; +import static org.eclipse.edc.test.e2e.TransferEndToEndTestBase.PROVIDER; + +public interface PostgresRuntimes { -@PostgresqlDbIntegrationTest -class EndToEndTransferPostgresqlTest extends AbstractEndToEndTransfer { @RegisterExtension - static BeforeAllCallback createDatabase = context -> { + BeforeAllCallback CREATE_DATABASES = context -> { createDatabase(CONSUMER.getName()); createDatabase(PROVIDER.getName()); }; - static String[] controlPlanePostgresqlModules = new String[]{ + String[] CONTROL_PLANE_POSTGRESQL_MODULES = new String[]{ ":system-tests:e2e-transfer-test:control-plane", ":extensions:control-plane:transfer:transfer-data-plane", ":extensions:data-plane:data-plane-client", @@ -44,7 +45,7 @@ class EndToEndTransferPostgresqlTest extends AbstractEndToEndTransfer { ":extensions:policy-monitor:store:sql:policy-monitor-store-sql" }; - static String[] dataPlanePostgresqlModules = new String[]{ + String[] DATA_PLANE_POSTGRESQL_MODULES = new String[]{ ":system-tests:e2e-transfer-test:data-plane", ":extensions:data-plane:data-plane-public-api", ":extensions:data-plane:store:sql:data-plane-store-sql", @@ -53,11 +54,11 @@ class EndToEndTransferPostgresqlTest extends AbstractEndToEndTransfer { }; @RegisterExtension - static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( + EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( new EdcRuntimeExtension( "consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration(), - controlPlanePostgresqlModules + CONTROL_PLANE_POSTGRESQL_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -71,12 +72,12 @@ class EndToEndTransferPostgresqlTest extends AbstractEndToEndTransfer { new EdcRuntimeExtension( "provider-data-plane", PROVIDER.dataPlanePostgresConfiguration(), - dataPlanePostgresqlModules + DATA_PLANE_POSTGRESQL_MODULES ), new EdcRuntimeExtension( "provider-control-plane", PROVIDER.controlPlanePostgresConfiguration(), - controlPlanePostgresqlModules + CONTROL_PLANE_POSTGRESQL_MODULES ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java new file mode 100644 index 00000000000..49f35948dbc --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import jakarta.json.JsonObject; +import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; + +import java.time.Duration; +import java.util.Map; +import java.util.UUID; + +public abstract class TransferEndToEndTestBase { + + protected final Duration timeout = Duration.ofSeconds(60); + + protected static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() + .name("consumer") + .id("urn:connector:consumer") + .build(); + protected static final EndToEndTransferParticipant PROVIDER = EndToEndTransferParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + + protected void createResourcesOnProvider(String assetId, JsonObject policy, Map dataAddressProperties) { + PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); + var policyDefinition = PROVIDER.createPolicyDefinition(policy); + PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), policyDefinition, policyDefinition); + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java new file mode 100644 index 00000000000..3955aa890b0 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.annotations.PostgresqlDbIntegrationTest; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Map; +import java.util.UUID; + +import static jakarta.json.Json.createObjectBuilder; +import static java.time.Duration.ofDays; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.inForceDatePolicy; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; +import static org.hamcrest.CoreMatchers.equalTo; + +public class TransferPullEndToEndTest { + + @Nested + @EndToEndTest + class InMemory extends Tests implements InMemoryRuntimes { + + } + + @Nested + @PostgresqlDbIntegrationTest + class Postgres extends Tests implements PostgresRuntimes { + + } + + abstract static class Tests extends TransferEndToEndTestBase { + + @BeforeEach + void setUp() { + PROVIDER.registerDataPlane(); + } + + @Test + void pullFromHttp() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress()); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + // retrieve the data reference + var edr = CONSUMER.getDataReference(transferProcessId); + + // pull the data without query parameter + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); + + // pull the data with additional query parameter + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); + + // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one + assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); + } + + @Test + void pullFromHttp_withTransferType() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + // retrieve the data reference + var edr = CONSUMER.getDataReference(transferProcessId); + + // pull the data without query parameter + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); + + // pull the data with additional query parameter + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); + + // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one + assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); + } + + @Test + void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() { + var assetId = UUID.randomUUID().toString(); + var now = Instant.now(); + + // contract was valid from t-10d to t-5d, so "now" it is expired + var contractPolicy = inForceDatePolicy("gteq", now.minus(ofDays(10)), "lteq", now.minus(ofDays(5))); + createResourcesOnProvider(assetId, contractPolicy, httpDataAddressProperties()); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(TERMINATED.name()); + }); + } + + @Test + void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() { + var assetId = UUID.randomUUID().toString(); + var now = Instant.now(); + // contract was valid from t-10d to t-5d, so "now" it is expired + var contractPolicy = inForceDatePolicy("gteq", now.minus(ofDays(10)), "lteq", "contractAgreement+1s"); + createResourcesOnProvider(assetId, contractPolicy, httpDataAddressProperties()); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(TERMINATED.name()); + }); + } + + @Test + void pullFromHttp_httpProvision() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/data", + "type", "HttpProvision", + "proxyQueryParams", "true" + )); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + + var edr = CONSUMER.getDataReference(transferProcessId); + CONSUMER.pullData(edr, Map.of(), equalTo("some information")); + }); + } + + private JsonObject syncDataAddress() { + return createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpProxy") + .build(); + } + + @NotNull + private Map httpDataAddressProperties() { + return Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/data", + "type", "HttpData", + "proxyQueryParams", "true" + ); + } + + private JsonObject noPrivateProperty() { + return Json.createObjectBuilder().build(); + } + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java new file mode 100644 index 00000000000..f529812e4e1 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.annotations.PostgresqlDbIntegrationTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.UUID; + +import static io.restassured.RestAssured.given; +import static jakarta.json.Json.createObjectBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class TransferPushEndToEndTest { + + @Nested + @EndToEndTest + class InMemory extends Tests implements InMemoryRuntimes { + + } + + @Nested + @PostgresqlDbIntegrationTest + class Postgres extends Tests implements PostgresRuntimes { + + } + + abstract static class Tests extends TransferEndToEndTestBase { + + private final String assetId = UUID.randomUUID().toString(); + + @BeforeEach + void setUp() { + PROVIDER.registerDataPlane(); + } + + @Test + void httpToHttp() { + var url = PROVIDER.backendService() + "/api/provider/data"; + Map dataAddressProperties = Map.of("type", "HttpData", "baseUrl", url); + createResourcesOnProvider(assetId, noConstraintPolicy(), dataAddressProperties); + var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(COMPLETED.name()); + + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); + }); + } + + @Test + void httpToHttp_withTransferType() { + var url = PROVIDER.backendService() + "/api/provider/data"; + Map dataAddressProperties = Map.of("type", "HttpData", "baseUrl", url); + createResourcesOnProvider(assetId, noConstraintPolicy(), dataAddressProperties); + var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(COMPLETED.name()); + + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); + }); + } + + @Test + @DisplayName("Provider pushes data to Consumer, Provider needs to authenticate the data request through an oauth2 server") + void httpToHttp_oauth2Provisioning() { + var sourceDataAddressProperties = Map.of( + "type", "HttpData", + "baseUrl", PROVIDER.backendService() + "/api/provider/oauth2data", + "oauth2:clientId", "clientId", + "oauth2:clientSecretKey", "provision-oauth-secret", + "oauth2:tokenUrl", PROVIDER.backendService() + "/api/oauth2/token" + ); + + createResourcesOnProvider(assetId, noConstraintPolicy(), sourceDataAddressProperties); + var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); + + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(COMPLETED.name()); + + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); + }); + } + + private JsonObject httpDataAddress(String baseUrl) { + return createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "baseUrl", baseUrl) + .build(); + } + + private JsonObject noPrivateProperty() { + return Json.createObjectBuilder().build(); + } + + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java new file mode 100644 index 00000000000..1900b6741de --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.HttpMethod; +import jakarta.json.Json; +import jakarta.json.JsonObject; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.test.e2e.annotations.KafkaIntegrationTest; +import org.eclipse.edc.test.e2e.serializers.JacksonDeserializer; +import org.eclipse.edc.test.e2e.serializers.JacksonSerializer; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import javax.validation.constraints.NotNull; + +import static java.lang.String.format; +import static java.time.Duration.ZERO; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.inForceDatePolicy; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.stop.Stop.stopQuietly; +import static org.mockserver.verify.VerificationTimes.atLeast; +import static org.mockserver.verify.VerificationTimes.never; + +public class TransferStreamingEndToEndTest { + + @Nested + @KafkaIntegrationTest + class InMemory extends Tests implements InMemoryRuntimes { + + } + + abstract static class Tests extends TransferEndToEndTestBase { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String KAFKA_SERVER = "localhost:9092"; + private static final Duration TIMEOUT = Duration.ofSeconds(60); + private static final String SINK_HTTP_PATH = "/api/service"; + private static final String SOURCE_TOPIC = "source_topic"; + private static final String SINK_TOPIC = "sink_topic"; + private static final int EVENT_DESTINATION_PORT = getFreePort(); + private static final JsonNode JSON_MESSAGE = sampleMessage(); + private static final AtomicInteger MESSAGE_COUNTER = new AtomicInteger(); + + @BeforeAll + public static void setUp() { + startKafkaProducer(); + } + + private static Consumer createKafkaConsumer() { + var props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); + return new KafkaConsumer<>(props); + } + + private static Producer createKafkaProducer() { + var props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + private static void startKafkaProducer() { + var producer = createKafkaProducer(); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + () -> producer.send(new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(MESSAGE_COUNTER.getAndIncrement()), JSON_MESSAGE)), + 0, 100, MILLISECONDS); + } + + @Test + void kafkaToHttpTransfer() throws JsonProcessingException { + var destinationServer = startClientAndServer(EVENT_DESTINATION_PORT); + var request = request() + .withMethod(HttpMethod.POST.name()) + .withPath(SINK_HTTP_PATH) + .withBody(OBJECT_MAPPER.writeValueAsBytes(JSON_MESSAGE)); + destinationServer.when(request).respond(response()); + PROVIDER.registerDataPlane(Set.of("Kafka"), Set.of("HttpData")); + + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, contractExpiresInTenSeconds(), kafkaSourceProperty()); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), httpSink()); + + await().atMost(TIMEOUT).untilAsserted(() -> { + destinationServer.verify(request, atLeast(1)); + }); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); + }); + + destinationServer.clear(request) + .when(request).respond(response()); + await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> { + try { + destinationServer.verify(request, never()); + } catch (AssertionError assertionError) { + destinationServer.clear(request) + .when(request).respond(response()); + throw assertionError; + } + }); + + stopQuietly(destinationServer); + } + + @Test + void kafkaToKafkaTransfer() { + try (var consumer = createKafkaConsumer()) { + consumer.subscribe(List.of(SINK_TOPIC)); + + PROVIDER.registerDataPlane(Set.of("Kafka"), Set.of("Kafka")); + + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, contractExpiresInTenSeconds(), kafkaSourceProperty()); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), kafkaSink()); + await().atMost(TIMEOUT).untilAsserted(() -> { + var records = consumer.poll(ZERO); + assertThat(records.isEmpty()).isFalse(); + records.records(SINK_TOPIC).forEach(record -> assertThat(record.value()).isEqualTo(JSON_MESSAGE)); + }); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(TransferProcessStates.valueOf(state)).isGreaterThanOrEqualTo(TERMINATED); + }); + + consumer.poll(ZERO); + await().pollDelay(5, SECONDS).atMost(TIMEOUT).untilAsserted(() -> { + var recordsFound = consumer.poll(Duration.ofSeconds(1)).records(SINK_TOPIC); + assertThat(recordsFound).isEmpty(); + }); + } + } + + private static JsonObject httpSink() { + return Json.createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() + .add(EDC_NAMESPACE + "name", "data") + .add(EDC_NAMESPACE + "baseUrl", format("http://localhost:%s", EVENT_DESTINATION_PORT)) + .add(EDC_NAMESPACE + "path", SINK_HTTP_PATH) + .build()) + .build(); + } + + @NotNull + private static JsonObject kafkaSink() { + return Json.createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "Kafka") + .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() + .add(EDC_NAMESPACE + "topic", SINK_TOPIC) + .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), KAFKA_SERVER) + .build()) + .build(); + } + + @NotNull + private static Map kafkaSourceProperty() { + return Map.of( + "name", "data", + "type", "Kafka", + "topic", SOURCE_TOPIC, + kafkaProperty("bootstrap.servers"), KAFKA_SERVER, + kafkaProperty("max.poll.records"), "100" + ); + } + + private static String kafkaProperty(String property) { + return "kafka." + property; + } + + private static JsonObject noPrivateProperty() { + return Json.createObjectBuilder().build(); + } + + private static JsonObject contractExpiresInTenSeconds() { + return inForceDatePolicy("gteq", "contractAgreement+0s", "lteq", "contractAgreement+10s"); + } + + private static JsonNode sampleMessage() { + var node = OBJECT_MAPPER.createObjectNode(); + node.put("foo", "bar"); + return node; + } + } +}