Skip to content

Commit

Permalink
#654 Removed deprecated constructors from KafkaHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Jan 8, 2025
1 parent 1607db9 commit 3396e85
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand Down Expand Up @@ -56,42 +53,6 @@ public class KafkaHandler extends ConfigurationResourceConsumer {

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}

public KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,16 @@ public void setUp() {
messageValidator = Mockito.mock(KafkaMessageValidator.class);
storage = new MockResourceStorage();
configurationResourceManager = new ConfigurationResourceManager(vertx, storage, exceptionFactory);
handler = new KafkaHandler(
vertxMock, exceptionFactory, configurationResourceManager, null, repository,
kafkaMessageSender, configResourceUri, streamingPath, null);

handler = KafkaHandler.builder()
.withVertx(vertxMock)
.withExceptionFactory(exceptionFactory)
.withConfigurationResourceManager(configurationResourceManager)
.withRepository(repository)
.withKafkaMessageSender(kafkaMessageSender)
.withConfigResourceUri(configResourceUri)
.withStreamingPath(streamingPath)
.build();

when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.succeededFuture());
}
Expand Down Expand Up @@ -156,8 +163,16 @@ public void initWithWildcardConfigResource(TestContext context) {
props.put("kafka.port", "9094");
storage.putMockData(configResourceUri, CONFIG_WILDCARD_RESOURCE);

handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender,
configResourceUri, streamingPath, props);
handler = KafkaHandler.builder()
.withVertx(vertxMock)
.withConfigurationResourceManager(configurationResourceManager)
.withRepository(repository)
.withKafkaMessageSender(kafkaMessageSender)
.withConfigResourceUri(configResourceUri)
.withStreamingPath(streamingPath)
.withProperties(props)
.build();

context.assertFalse(handler.isInitialized());

handler.initialize().onComplete(event -> {
Expand All @@ -182,8 +197,16 @@ public void initWithWildcardConfigResourceException(TestContext context) {
Map<String, Object> props = new HashMap<>();
storage.putMockData(configResourceUri, CONFIG_WILDCARD_RESOURCE);

handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender,
configResourceUri, streamingPath, props);
handler = KafkaHandler.builder()
.withVertx(vertxMock)
.withConfigurationResourceManager(configurationResourceManager)
.withRepository(repository)
.withKafkaMessageSender(kafkaMessageSender)
.withConfigResourceUri(configResourceUri)
.withStreamingPath(streamingPath)
.withProperties(props)
.build();

context.assertFalse(handler.isInitialized());

handler.initialize().onComplete(event -> {
Expand All @@ -195,7 +218,7 @@ public void initWithWildcardConfigResourceException(TestContext context) {
}

@Test
public void resourceRemovedTriggersCloseAllProducers(TestContext context){
public void resourceRemovedTriggersCloseAllProducers(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
JsonObject object = new JsonObject();
Expand All @@ -209,7 +232,7 @@ public void resourceRemovedTriggersCloseAllProducers(TestContext context){
}

@Test
public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext context){
public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext context) {
Async async = context.async();
context.assertFalse(handler.isInitialized());
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
Expand All @@ -235,12 +258,12 @@ public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext co
}};
verify(repository, timeout(500).times(1)).addKafkaProducer(eq(new KafkaConfiguration(Pattern.compile("."), configs_2)));
verifyNoInteractions(kafkaMessageSender);
await().atMost(1, SECONDS).until( () -> handler.isInitialized(), equalTo(Boolean.TRUE));
await().atMost(1, SECONDS).until(() -> handler.isInitialized(), equalTo(Boolean.TRUE));
async.complete();
}

@Test
public void handleNotStreamingPath(TestContext context){
public void handleNotStreamingPath(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
StreamingRequest request = new StreamingRequest(HttpMethod.POST, "/some/other/uri/path");
Expand All @@ -252,7 +275,7 @@ public void handleNotStreamingPath(TestContext context){
}

@Test
public void handleNotPOSTRequest(TestContext context){
public void handleNotPOSTRequest(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {

Expand All @@ -269,7 +292,7 @@ public void handleNotPOSTRequest(TestContext context){
}

@Test
public void handleEmptyTopic(TestContext context){
public void handleEmptyTopic(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {

Expand All @@ -286,7 +309,7 @@ public void handleEmptyTopic(TestContext context){
}

@Test
public void handleNoMatchingProducer(TestContext context){
public void handleNoMatchingProducer(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {

Expand All @@ -303,7 +326,7 @@ public void handleNoMatchingProducer(TestContext context){
}

@Test
public void handleInvalidPayload(TestContext context){
public void handleInvalidPayload(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().onComplete(event -> {
Expand All @@ -322,7 +345,7 @@ public void handleInvalidPayload(TestContext context){
}

@Test
public void handleValidPayloadWithSingleMessage(TestContext context){
public void handleValidPayloadWithSingleMessage(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().onComplete(event -> {
Expand Down Expand Up @@ -363,9 +386,9 @@ public void handleValidPayloadWithSingleMessage(TestContext context){
});
}

@SuppressWarnings(value="unchecked")
@SuppressWarnings(value = "unchecked")
@Test
public void handleValidPayloadWithTwoMessages(TestContext context){
public void handleValidPayloadWithTwoMessages(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().onComplete(event -> {
Expand Down Expand Up @@ -422,7 +445,7 @@ public void handleValidPayloadWithTwoMessages(TestContext context){
}

@Test
public void handleValidPayloadWithFailingMessageSending(TestContext context){
public void handleValidPayloadWithFailingMessageSending(TestContext context) {
Async async = context.async();

when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.failedFuture("booom: message could not be sent!"));
Expand Down Expand Up @@ -467,12 +490,19 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){
}

@Test
public void handlePayloadNotPassingValidation(TestContext context){
public void handlePayloadNotPassingValidation(TestContext context) {
Async async = context.async();

handler = new KafkaHandler(
vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository,
kafkaMessageSender, configResourceUri, streamingPath, null);
handler = KafkaHandler.builder()
.withVertx(vertxMock)
.withExceptionFactory(exceptionFactory)
.withConfigurationResourceManager(configurationResourceManager)
.withKafkaMessageValidator(messageValidator)
.withRepository(repository)
.withKafkaMessageSender(kafkaMessageSender)
.withConfigResourceUri(configResourceUri)
.withStreamingPath(streamingPath)
.build();

when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom")));
Expand Down Expand Up @@ -517,12 +547,19 @@ public void handlePayloadNotPassingValidation(TestContext context){
}

@Test
public void handleErrorWhileValidation(TestContext context){
public void handleErrorWhileValidation(TestContext context) {
Async async = context.async();

handler = new KafkaHandler(
vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository,
kafkaMessageSender, configResourceUri, streamingPath, null);
handler = KafkaHandler.builder()
.withVertx(vertxMock)
.withExceptionFactory(exceptionFactory)
.withConfigurationResourceManager(configurationResourceManager)
.withKafkaMessageValidator(messageValidator)
.withRepository(repository)
.withKafkaMessageSender(kafkaMessageSender)
.withConfigResourceUri(configResourceUri)
.withStreamingPath(streamingPath)
.build();

when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.failedFuture("Boooom"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.swisspush.gateleen.playground;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
Expand Down Expand Up @@ -284,8 +281,17 @@ public void start() {
KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx);
KafkaMessageSender kafkaMessageSender = new KafkaMessageSender();
KafkaMessageValidator messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
kafkaHandler = new KafkaHandler(configurationResourceManager, messageValidator, kafkaProducerRepository, kafkaMessageSender,
SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/");

kafkaHandler = KafkaHandler.builder()
.withVertx(vertx)
.withConfigurationResourceManager(configurationResourceManager)
.withKafkaMessageValidator(messageValidator)
.withRepository(kafkaProducerRepository)
.withKafkaMessageSender(kafkaMessageSender)
.withConfigResourceUri(SERVER_ROOT + "/admin/v1/kafka/topicsConfig")
.withStreamingPath(SERVER_ROOT + "/streaming/")
.build();

kafkaHandler.initialize();

schedulerResourceManager = new SchedulerResourceManager(vertx, redisProvider, storage, monitoringHandler,
Expand Down

0 comments on commit 3396e85

Please sign in to comment.