Skip to content

Commit

Permalink
refactor: changes registration of dispatchers (#4511)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood authored Sep 30, 2024
1 parent 067ded3 commit 102dc1a
Show file tree
Hide file tree
Showing 13 changed files with 21 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class RemoteMessageDispatcherRegistryImpl implements RemoteMessageDispatc
private final Map<String, RemoteMessageDispatcher> dispatchers = new HashMap<>();

@Override
public void register(RemoteMessageDispatcher dispatcher) {
dispatchers.put(dispatcher.protocol(), dispatcher);
public void register(String protocol, RemoteMessageDispatcher dispatcher) {
dispatchers.put(protocol, dispatcher);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void shouldDispatchEventsOnProviderContractNegotiationStateChanges(EventRouter e
ContractDefinitionStore contractDefinitionStore,
PolicyDefinitionStore policyDefinitionStore,
AssetIndex assetIndex) {
dispatcherRegistry.register(succeedingDispatcher());
dispatcherRegistry.register("test", succeedingDispatcher());

when(identityService.verifyJwtToken(eq(tokenRepresentation), isA(VerificationContext.class))).thenReturn(Result.success(token));
eventRouter.register(ContractNegotiationEvent.class, eventSubscriber);
Expand Down Expand Up @@ -143,7 +143,6 @@ private ContractRequestMessage createContractOfferRequest(Policy policy, String
@NotNull
private RemoteMessageDispatcher succeedingDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any")));
return testDispatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private final EventSubscriber eventSubscriber = mock();

@RegisterExtension
static final RuntimeExtension RUNTIME = new RuntimePerClassExtension()
.setConfiguration(Map.of(
Expand All @@ -100,6 +98,7 @@ public class TransferProcessEventDispatchTest {
.registerServiceMock(ContractNegotiationStore.class, mock())
.registerServiceMock(ParticipantAgentService.class, mock())
.registerServiceMock(DataPlaneClientFactory.class, mock());
private final EventSubscriber eventSubscriber = mock();

@Test
void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService service,
Expand Down Expand Up @@ -128,7 +127,7 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se

when(agent.getIdentity()).thenReturn(providerId);

dispatcherRegistry.register(getTestDispatcher());
dispatcherRegistry.register("test", getTestDispatcher());
when(policyArchive.findPolicyForContract(matches(transferRequest.getContractId()))).thenReturn(Policy.Builder.newInstance().target("assetId").build());
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
when(agentService.createFor(token)).thenReturn(agent);
Expand Down Expand Up @@ -194,7 +193,7 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi
.policy(Policy.Builder.newInstance().build())
.build();
when(negotiationStore.findContractAgreement(transferRequest.getContractId())).thenReturn(agreement);
dispatcherRegistry.register(getTestDispatcher());
dispatcherRegistry.register("test", getTestDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);

var initiateResult = service.initiateTransfer(transferRequest);
Expand All @@ -213,7 +212,7 @@ void shouldDispatchEventOnTransferProcessTerminated(TransferProcessService servi
@Test
void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service, EventRouter eventRouter, RemoteMessageDispatcherRegistry dispatcherRegistry,
ContractNegotiationStore negotiationStore, PolicyArchive policyArchive) {
dispatcherRegistry.register(getFailingDispatcher());
dispatcherRegistry.register("test", getFailingDispatcher());
eventRouter.register(TransferProcessEvent.class, eventSubscriber);
var transferRequest = createTransferRequest();
var agreement = ContractAgreement.Builder.newInstance()
Expand All @@ -233,7 +232,6 @@ void shouldDispatchEventOnTransferProcessFailure(TransferProcessService service,
@NotNull
private RemoteMessageDispatcher getTestDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
var ack = TransferProcessAck.Builder.newInstance().build();
when(testDispatcher.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success(ack)));
return testDispatcher;
Expand All @@ -242,7 +240,6 @@ private RemoteMessageDispatcher getTestDispatcher() {
@NotNull
private RemoteMessageDispatcher getFailingDispatcher() {
var testDispatcher = mock(RemoteMessageDispatcher.class);
when(testDispatcher.protocol()).thenReturn("test");
when(testDispatcher.dispatch(any(), any())).thenReturn(failedFuture(new EdcException("cannot send message")));
return testDispatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;

import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_SCOPE;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;

Expand Down Expand Up @@ -127,7 +128,7 @@ public DspHttpRemoteMessageDispatcher dspHttpRemoteMessageDispatcher(ServiceExte
registerNegotiationPolicyScopes(dispatcher);
registerTransferProcessPolicyScopes(dispatcher);
registerCatalogPolicyScopes(dispatcher);
dispatcherRegistry.register(dispatcher);
dispatcherRegistry.register(DATASPACE_PROTOCOL_HTTP, dispatcher);
return dispatcher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.DspHttpRemoteMessageDispatcher;
import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.DspHttpRequestFactory;
import org.eclipse.edc.protocol.dsp.http.spi.dispatcher.response.DspHttpResponseBodyExtractor;
import org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.iam.AudienceResolver;
import org.eclipse.edc.spi.iam.IdentityService;
Expand Down Expand Up @@ -78,11 +77,6 @@ public DspHttpRemoteMessageDispatcherImpl(EdcHttpClient httpClient,
this.audienceResolver = audienceResolver;
}

@Override
public String protocol() {
return HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
}

@Override
public <T, M extends RemoteMessage> CompletableFuture<StatusResult<T>> dispatch(Class<T> responseType, M message) {
var handler = (MessageHandler<M, T>) this.handlers.get(message.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.spi.response.ResponseStatus.ERROR_RETRY;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
import static org.mockito.AdditionalMatchers.and;
Expand Down Expand Up @@ -100,11 +99,6 @@ void setUp() {
when(tokenDecorator.decorate(any())).thenAnswer(a -> a.getArgument(0));
}

@Test
void protocol_returnDsp() {
assertThat(dispatcher.protocol()).isEqualTo(DATASPACE_PROTOCOL_HTTP);
}

@Test
void dispatch_noScope() {
var authToken = "token";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;

import static org.eclipse.edc.connector.controlplane.callback.dispatcher.http.GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP;

@Extension(value = CallbackEventDispatcherHttpExtension.NAME)
public class CallbackEventDispatcherHttpExtension implements ServiceExtension {

Expand Down Expand Up @@ -55,13 +57,13 @@ public void initialize(ServiceExtensionContext context) {
var baseDispatcher = new GenericHttpRemoteDispatcherImpl(client);
baseDispatcher.registerDelegate(new CallbackEventRemoteMessageDispatcher(typeManager.getMapper(), vault));

registry.register(baseDispatcher);
registry.register(CALLBACK_EVENT_HTTP, baseDispatcher);
}


private String resolveScheme(String scheme) {
if (scheme.equalsIgnoreCase("https") || scheme.equalsIgnoreCase("http")) {
return GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP;
return CALLBACK_EVENT_HTTP;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,11 @@ protected GenericHttpRemoteDispatcherImpl(EdcHttpClient httpClient) {
this.httpClient = httpClient;
}

@Override
public String protocol() {
return CALLBACK_EVENT_HTTP;
}

@Override
public <T, M extends RemoteMessage> CompletableFuture<StatusResult<T>> dispatch(Class<T> responseType, M message) {
var delegate = (GenericHttpDispatcherDelegate<M, T>) delegates.get(message.getClass());
if (delegate == null) {
throw new EdcException(format("No %s message dispatcher found for message type %s", protocol(), message.getClass()));
throw new EdcException(format("No %s message dispatcher found for message type %s", CALLBACK_EVENT_HTTP, message.getClass()));
}
var request = delegate.buildRequest(message);
return httpClient.executeAsync(request, emptyList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatcher;

import static org.eclipse.edc.connector.controlplane.callback.dispatcher.http.GenericHttpRemoteDispatcherImpl.CALLBACK_EVENT_HTTP;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

Expand All @@ -50,10 +50,7 @@ void setUp(ServiceExtensionContext context, ObjectFactory factory) {
void initialize_shouldRegisterBothDispatcher(ServiceExtensionContext context) {

extension.initialize(context);
verify(registry).register(argThat(dispatcher(CALLBACK_EVENT_HTTP)));
verify(registry).register(eq(CALLBACK_EVENT_HTTP), isA(GenericHttpRemoteDispatcherImpl.class));
}

private ArgumentMatcher<GenericHttpRemoteDispatcherImpl> dispatcher(String scheme) {
return dispatcher -> dispatcher.protocol().equals(scheme);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@
*/
public interface RemoteMessageDispatcher {

/**
* Return the protocol this dispatcher uses.
*/
String protocol();


/**
* Binds and sends the message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface RemoteMessageDispatcherRegistry {
/**
* Registers a dispatcher.
*/
void register(RemoteMessageDispatcher dispatcher);
void register(String protocol, RemoteMessageDispatcher dispatcher);

/**
* Sends the message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/**
* The resolver translate the scheme part {@link CallbackAddress#getUri()} to an internal
* naming of {@link RemoteMessageDispatcher#protocol()} ()}
* naming of {@link RemoteMessageDispatcher}
*/
@FunctionalInterface
public interface CallbackProtocolResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

/**
* Registry for {@link CallbackProtocolResolver} resolvers. The registry resolves the scheme part {@link CallbackAddress#getUri()} to an internal
* naming of {@link RemoteMessageDispatcher#protocol()}
* naming of {@link RemoteMessageDispatcher}
*/
@ExtensionPoint
public interface CallbackProtocolResolverRegistry {
Expand Down

0 comments on commit 102dc1a

Please sign in to comment.