From 3396e852386990127108551d7c294d9779a0799c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Wed, 8 Jan 2025 15:41:47 +0100 Subject: [PATCH] #654 Removed deprecated constructors from KafkaHandler --- .../gateleen/kafka/KafkaHandler.java | 39 -------- .../gateleen/kafka/KafkaHandlerTest.java | 91 +++++++++++++------ .../swisspush/gateleen/playground/Server.java | 18 ++-- 3 files changed, 76 insertions(+), 72 deletions(-) diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java index 14ae65b8..d5c6ed76 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java @@ -21,14 +21,11 @@ import org.swisspush.gateleen.core.validation.ValidationStatus; import org.swisspush.gateleen.validation.ValidationException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.regex.Pattern; -import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; - /** * Handler class for all Kafka related requests. * @@ -56,42 +53,6 @@ public class KafkaHandler extends ConfigurationResourceConsumer { private boolean initialized = false; - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, - KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) { - this(configurationResourceManager, null, repository, kafkaMessageSender, - configResourceUri, streamingPath); - } - - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, - KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri, - String streamingPath) { - this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender, - configResourceUri, streamingPath, new HashMap<>()); - } - - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository, - KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { - - this(configurationResourceManager, null, repository, kafkaMessageSender, - configResourceUri, streamingPath, properties); - } - - /** @deprecated Use {@link #builder()} */ - @Deprecated - public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository, - KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) { - this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager, - kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath, - properties); - log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!"); - } - public KafkaHandler( Vertx vertx, GateleenExceptionFactory exceptionFactory, diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java index cda346db..514a1d7f 100644 --- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java +++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java @@ -101,9 +101,16 @@ public void setUp() { messageValidator = Mockito.mock(KafkaMessageValidator.class); storage = new MockResourceStorage(); configurationResourceManager = new ConfigurationResourceManager(vertx, storage, exceptionFactory); - handler = new KafkaHandler( - vertxMock, exceptionFactory, configurationResourceManager, null, repository, - kafkaMessageSender, configResourceUri, streamingPath, null); + + handler = KafkaHandler.builder() + .withVertx(vertxMock) + .withExceptionFactory(exceptionFactory) + .withConfigurationResourceManager(configurationResourceManager) + .withRepository(repository) + .withKafkaMessageSender(kafkaMessageSender) + .withConfigResourceUri(configResourceUri) + .withStreamingPath(streamingPath) + .build(); when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.succeededFuture()); } @@ -156,8 +163,16 @@ public void initWithWildcardConfigResource(TestContext context) { props.put("kafka.port", "9094"); storage.putMockData(configResourceUri, CONFIG_WILDCARD_RESOURCE); - handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender, - configResourceUri, streamingPath, props); + handler = KafkaHandler.builder() + .withVertx(vertxMock) + .withConfigurationResourceManager(configurationResourceManager) + .withRepository(repository) + .withKafkaMessageSender(kafkaMessageSender) + .withConfigResourceUri(configResourceUri) + .withStreamingPath(streamingPath) + .withProperties(props) + .build(); + context.assertFalse(handler.isInitialized()); handler.initialize().onComplete(event -> { @@ -182,8 +197,16 @@ public void initWithWildcardConfigResourceException(TestContext context) { Map props = new HashMap<>(); storage.putMockData(configResourceUri, CONFIG_WILDCARD_RESOURCE); - handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender, - configResourceUri, streamingPath, props); + handler = KafkaHandler.builder() + .withVertx(vertxMock) + .withConfigurationResourceManager(configurationResourceManager) + .withRepository(repository) + .withKafkaMessageSender(kafkaMessageSender) + .withConfigResourceUri(configResourceUri) + .withStreamingPath(streamingPath) + .withProperties(props) + .build(); + context.assertFalse(handler.isInitialized()); handler.initialize().onComplete(event -> { @@ -195,7 +218,7 @@ public void initWithWildcardConfigResourceException(TestContext context) { } @Test - public void resourceRemovedTriggersCloseAllProducers(TestContext context){ + public void resourceRemovedTriggersCloseAllProducers(TestContext context) { Async async = context.async(); handler.initialize().onComplete(event -> { JsonObject object = new JsonObject(); @@ -209,7 +232,7 @@ public void resourceRemovedTriggersCloseAllProducers(TestContext context){ } @Test - public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext context){ + public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext context) { Async async = context.async(); context.assertFalse(handler.isInitialized()); storage.putMockData(configResourceUri, CONFIG_RESOURCE); @@ -235,12 +258,12 @@ public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext co }}; verify(repository, timeout(500).times(1)).addKafkaProducer(eq(new KafkaConfiguration(Pattern.compile("."), configs_2))); verifyNoInteractions(kafkaMessageSender); - await().atMost(1, SECONDS).until( () -> handler.isInitialized(), equalTo(Boolean.TRUE)); + await().atMost(1, SECONDS).until(() -> handler.isInitialized(), equalTo(Boolean.TRUE)); async.complete(); } @Test - public void handleNotStreamingPath(TestContext context){ + public void handleNotStreamingPath(TestContext context) { Async async = context.async(); handler.initialize().onComplete(event -> { StreamingRequest request = new StreamingRequest(HttpMethod.POST, "/some/other/uri/path"); @@ -252,7 +275,7 @@ public void handleNotStreamingPath(TestContext context){ } @Test - public void handleNotPOSTRequest(TestContext context){ + public void handleNotPOSTRequest(TestContext context) { Async async = context.async(); handler.initialize().onComplete(event -> { @@ -269,7 +292,7 @@ public void handleNotPOSTRequest(TestContext context){ } @Test - public void handleEmptyTopic(TestContext context){ + public void handleEmptyTopic(TestContext context) { Async async = context.async(); handler.initialize().onComplete(event -> { @@ -286,7 +309,7 @@ public void handleEmptyTopic(TestContext context){ } @Test - public void handleNoMatchingProducer(TestContext context){ + public void handleNoMatchingProducer(TestContext context) { Async async = context.async(); handler.initialize().onComplete(event -> { @@ -303,7 +326,7 @@ public void handleNoMatchingProducer(TestContext context){ } @Test - public void handleInvalidPayload(TestContext context){ + public void handleInvalidPayload(TestContext context) { Async async = context.async(); storage.putMockData(configResourceUri, CONFIG_RESOURCE); handler.initialize().onComplete(event -> { @@ -322,7 +345,7 @@ public void handleInvalidPayload(TestContext context){ } @Test - public void handleValidPayloadWithSingleMessage(TestContext context){ + public void handleValidPayloadWithSingleMessage(TestContext context) { Async async = context.async(); storage.putMockData(configResourceUri, CONFIG_RESOURCE); handler.initialize().onComplete(event -> { @@ -363,9 +386,9 @@ public void handleValidPayloadWithSingleMessage(TestContext context){ }); } - @SuppressWarnings(value="unchecked") + @SuppressWarnings(value = "unchecked") @Test - public void handleValidPayloadWithTwoMessages(TestContext context){ + public void handleValidPayloadWithTwoMessages(TestContext context) { Async async = context.async(); storage.putMockData(configResourceUri, CONFIG_RESOURCE); handler.initialize().onComplete(event -> { @@ -422,7 +445,7 @@ public void handleValidPayloadWithTwoMessages(TestContext context){ } @Test - public void handleValidPayloadWithFailingMessageSending(TestContext context){ + public void handleValidPayloadWithFailingMessageSending(TestContext context) { Async async = context.async(); when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.failedFuture("booom: message could not be sent!")); @@ -467,12 +490,19 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){ } @Test - public void handlePayloadNotPassingValidation(TestContext context){ + public void handlePayloadNotPassingValidation(TestContext context) { Async async = context.async(); - handler = new KafkaHandler( - vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository, - kafkaMessageSender, configResourceUri, streamingPath, null); + handler = KafkaHandler.builder() + .withVertx(vertxMock) + .withExceptionFactory(exceptionFactory) + .withConfigurationResourceManager(configurationResourceManager) + .withKafkaMessageValidator(messageValidator) + .withRepository(repository) + .withKafkaMessageSender(kafkaMessageSender) + .withConfigResourceUri(configResourceUri) + .withStreamingPath(streamingPath) + .build(); when(messageValidator.validateMessages(any(HttpServerRequest.class), any())) .thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom"))); @@ -517,12 +547,19 @@ public void handlePayloadNotPassingValidation(TestContext context){ } @Test - public void handleErrorWhileValidation(TestContext context){ + public void handleErrorWhileValidation(TestContext context) { Async async = context.async(); - handler = new KafkaHandler( - vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository, - kafkaMessageSender, configResourceUri, streamingPath, null); + handler = KafkaHandler.builder() + .withVertx(vertxMock) + .withExceptionFactory(exceptionFactory) + .withConfigurationResourceManager(configurationResourceManager) + .withKafkaMessageValidator(messageValidator) + .withRepository(repository) + .withKafkaMessageSender(kafkaMessageSender) + .withConfigResourceUri(configResourceUri) + .withStreamingPath(streamingPath) + .build(); when(messageValidator.validateMessages(any(HttpServerRequest.class), any())) .thenReturn(Future.failedFuture("Boooom")); diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java index 2559fd4c..f025933e 100755 --- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java +++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java @@ -1,9 +1,6 @@ package org.swisspush.gateleen.playground; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; +import io.vertx.core.*; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpServer; @@ -284,8 +281,17 @@ public void start() { KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx); KafkaMessageSender kafkaMessageSender = new KafkaMessageSender(); KafkaMessageValidator messageValidator = new KafkaMessageValidator(validationResourceManager, validator); - kafkaHandler = new KafkaHandler(configurationResourceManager, messageValidator, kafkaProducerRepository, kafkaMessageSender, - SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/"); + + kafkaHandler = KafkaHandler.builder() + .withVertx(vertx) + .withConfigurationResourceManager(configurationResourceManager) + .withKafkaMessageValidator(messageValidator) + .withRepository(kafkaProducerRepository) + .withKafkaMessageSender(kafkaMessageSender) + .withConfigResourceUri(SERVER_ROOT + "/admin/v1/kafka/topicsConfig") + .withStreamingPath(SERVER_ROOT + "/streaming/") + .build(); + kafkaHandler.initialize(); schedulerResourceManager = new SchedulerResourceManager(vertx, redisProvider, storage, monitoringHandler,