diff --git a/gateleen-kafka/README_kafka.md b/gateleen-kafka/README_kafka.md
index 9ce9febd..09c0cc44 100644
--- a/gateleen-kafka/README_kafka.md
+++ b/gateleen-kafka/README_kafka.md
@@ -48,7 +48,7 @@ The following topic configuration values are required:
Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs).
## Usage
-To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
+To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java).
The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/`
@@ -448,4 +448,30 @@ This sequence diagrams shows the process when messages are sent to Kafka:
│ <────────────────────│ │ │ │ │ │
│ │ │ │ │ │ │
│ └┬┘ │ │ │ │
-```
\ No newline at end of file
+```
+
+### Micrometer metrics
+The kafka feature is monitored with micrometer. The following metrics are available:
+* gateleen_kafka_send_success_messages_total
+* gateleen_kafka_send_fail_messages_total
+* gateleen_kafka_validation_fail_messages_total
+
+Additional tags are provided to specify the topic.
+
+Example metrics:
+
+```
+# HELP gateleen_kafka_send_success_messages_total Amount of successfully sent kafka messages
+# TYPE gateleen_kafka_send_success_messages_total counter
+gateleen_kafka_send_success_messages_total{topic="my-topic-1",} 0.0
+gateleen_kafka_send_fail_messages_total{topic="my-topic-1",} 455.0
+gateleen_kafka_send_success_messages_total{topic="my-topic-2",} 256.0
+gateleen_kafka_send_success_messages_total{topic="my-topic-3",} 6.0
+gateleen_kafka_send_fail_messages_total{topic="my-topic-4",} 222.0
+# HELP gateleen_kafka_validation_fail_messages_total Amount of failed kafka message validations
+# TYPE gateleen_kafka_validation_fail_messages_total counter
+gateleen_kafka_validation_fail_messages_total{topic="my-topic-6",} 212.0
+```
+
+To enable `gateleen_kafka_send_success_messages_total` and `gateleen_kafka_send_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageSender` class.
+To enable `gateleen_kafka_validation_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageValidator` class.
\ No newline at end of file
diff --git a/gateleen-kafka/pom.xml b/gateleen-kafka/pom.xml
index 7b57b808..9dc0c2dc 100644
--- a/gateleen-kafka/pom.xml
+++ b/gateleen-kafka/pom.xml
@@ -22,6 +22,10 @@
gateleen-validation
${project.version}
+
+ io.micrometer
+ micrometer-core
+
org.swisspush.gateleen
diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java
index 5a63358a..d5c6ed76 100644
--- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java
+++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java
@@ -21,14 +21,11 @@
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
-import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;
-
/**
* Handler class for all Kafka related requests.
*
@@ -52,46 +49,10 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaMessageSender kafkaMessageSender;
private final Map properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
- private KafkaMessageValidator kafkaMessageValidator;
+ private final KafkaMessageValidator kafkaMessageValidator;
private boolean initialized = false;
- /** @deprecated Use {@link #builder()} */
- @Deprecated
- public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
- KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
- this(configurationResourceManager, null, repository, kafkaMessageSender,
- configResourceUri, streamingPath);
- }
-
- /** @deprecated Use {@link #builder()} */
- @Deprecated
- public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
- KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
- String streamingPath) {
- this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
- configResourceUri, streamingPath, new HashMap<>());
- }
-
- /** @deprecated Use {@link #builder()} */
- @Deprecated
- public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
- KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) {
-
- this(configurationResourceManager, null, repository, kafkaMessageSender,
- configResourceUri, streamingPath, properties);
- }
-
- /** @deprecated Use {@link #builder()} */
- @Deprecated
- public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
- KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map properties) {
- this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
- kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
- properties);
- log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
- }
-
public KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
@@ -140,8 +101,6 @@ private Future initializeKafkaConfiguration(Buffer configuration) {
Promise promise = Promise.promise();
final List kafkaConfigurations = KafkaConfigurationParser.parse(configuration, properties);
-
-
repository.closeAll().future().onComplete((event -> {
for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
repository.addKafkaProducer(kafkaConfiguration);
diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java
index 1b74bf72..2af5108b 100644
--- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java
+++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
@@ -9,7 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import static java.util.stream.Collectors.toList;
@@ -18,17 +22,32 @@ public class KafkaMessageSender {
private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);
+ private MeterRegistry meterRegistry;
+ private final Map successSendCounterMap = new HashMap<>();
+ private final Map failSendCounterMap = new HashMap<>();
+
+ public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
+ public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
+ public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
+ public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
+ public static final String TOPIC = "topic";
+
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ successSendCounterMap.clear();
+ failSendCounterMap.clear();
+ }
+
Future sendMessages(KafkaProducer kafkaProducer,
List> messages) {
Promise promise = Promise.promise();
log.debug("Start processing {} messages for kafka", messages.size());
- @SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
- List futures = messages.stream()
+ List> futures = messages.stream()
.map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message))
.collect(toList());
- CompositeFuture.all(futures).mapEmpty().onComplete(result -> {
+ Future.all(futures).mapEmpty().onComplete(result -> {
if (result.succeeded()) {
promise.complete();
log.debug("Batch messages successfully sent to Kafka.");
@@ -44,7 +63,45 @@ private Future sendMessage(KafkaProducer kafkaProducer, Ka
return kafkaProducer.send(message).compose((Function>) metadata -> {
log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}",
metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());
+ incrementSuccessCount(metadata.getTopic());
return Future.succeededFuture();
- }).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
+ }).onFailure(throwable -> {
+ log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
+ incrementFailCount1(message.topic());
+ });
+ }
+
+ private synchronized void incrementSuccessCount(String topic) {
+ Counter counter = successSendCounterMap.get(topic);
+ if(counter != null) {
+ counter.increment();
+ return;
+ }
+
+ if(meterRegistry != null) {
+ Counter newCounter = Counter.builder(SUCCESS_SEND_MESSAGES_METRIC)
+ .description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION)
+ .tag(TOPIC, topic)
+ .register(meterRegistry);
+ newCounter.increment();
+ successSendCounterMap.put(topic, newCounter);
+ }
+ }
+
+ private synchronized void incrementFailCount1(String topic) {
+ Counter counter = failSendCounterMap.get(topic);
+ if(counter != null) {
+ counter.increment();
+ return;
+ }
+
+ if(meterRegistry != null) {
+ Counter newCounter = Counter.builder(FAIL_SEND_MESSAGES_METRIC)
+ .description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION)
+ .tag(TOPIC, topic)
+ .register(meterRegistry);
+ newCounter.increment();
+ failSendCounterMap.put(topic, newCounter);
+ }
}
}
diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java
index 475e5c63..32e02f32 100644
--- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java
+++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
@@ -14,6 +16,7 @@
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -26,11 +29,23 @@ public class KafkaMessageValidator {
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);
+ private MeterRegistry meterRegistry;
+ private final Map failedToValidateCounterMap = new HashMap<>();
+
+ public static final String FAIL_VALIDATION_MESSAGES_METRIC = "gateleen.kafka.validation.fail.messages";
+ public static final String FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message validations";
+ public static final String TOPIC = "topic";
+
public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ failedToValidateCounterMap.clear();
+ }
+
public Future validateMessages(HttpServerRequest request, List> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
@@ -49,6 +64,8 @@ public Future validateMessages(HttpServerRequest request, List
SchemaLocation schemaLocation = optionalSchemaLocation.get();
+ String topic = kafkaProducerRecords.get(0).topic();
+
@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
@@ -57,10 +74,31 @@ public Future validateMessages(HttpServerRequest request, List
return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
+ incrementValidationFailCount(topic);
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
+ }, throwable -> {
+ incrementValidationFailCount(topic);
+ return Future.failedFuture(throwable);
});
}
+
+ private void incrementValidationFailCount(String topic) {
+ Counter counter = failedToValidateCounterMap.get(topic);
+ if(counter != null) {
+ counter.increment();
+ return;
+ }
+
+ if(meterRegistry != null) {
+ Counter newCounter = Counter.builder(FAIL_VALIDATION_MESSAGES_METRIC)
+ .description(FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION)
+ .tag(TOPIC, topic)
+ .register(meterRegistry);
+ newCounter.increment();
+ failedToValidateCounterMap.put(topic, newCounter);
+ }
+ }
}
diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java
index 445a85bf..f9408b0d 100644
--- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java
+++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRepository.java
@@ -46,10 +46,10 @@ Optional, Pattern>> findMatchingKafkaProducer
Promise closeAll() {
log.info("About to close all kafka producers");
Promise promise = Promise.promise();
- List futures = new ArrayList<>();
+ List> futures = new ArrayList<>();
for (Map.Entry> entry : kafkaProducers.entrySet()) {
- Promise entryFuture = Promise.promise();
+ Promise entryFuture = Promise.promise();
futures.add(entryFuture.future());
entry.getValue().close(event -> {
if (event.succeeded()) {
@@ -62,7 +62,7 @@ Promise closeAll() {
}
// wait for all producers to be closed
- CompositeFuture.all(futures).onComplete(event -> {
+ Future.all(futures).onComplete(event -> {
kafkaProducers.clear();
promise.complete();
});
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java
index cda346db..f971a0e8 100644
--- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaHandlerTest.java
@@ -2,7 +2,6 @@
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
-import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
@@ -30,7 +29,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static java.lang.Thread.currentThread;
@@ -66,7 +64,7 @@ public class KafkaHandlerTest {
private ConfigurationResourceManager configurationResourceManager;
private KafkaHandler handler;
private MockResourceStorage storage;
- private GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
+ private final GateleenExceptionFactory exceptionFactory = newGateleenWastefulExceptionFactory();
private Vertx vertxMock;
private final String configResourceUri = "/kafka/topicsConfig";
@@ -101,9 +99,16 @@ public void setUp() {
messageValidator = Mockito.mock(KafkaMessageValidator.class);
storage = new MockResourceStorage();
configurationResourceManager = new ConfigurationResourceManager(vertx, storage, exceptionFactory);
- handler = new KafkaHandler(
- vertxMock, exceptionFactory, configurationResourceManager, null, repository,
- kafkaMessageSender, configResourceUri, streamingPath, null);
+
+ handler = KafkaHandler.builder()
+ .withVertx(vertxMock)
+ .withExceptionFactory(exceptionFactory)
+ .withConfigurationResourceManager(configurationResourceManager)
+ .withRepository(repository)
+ .withKafkaMessageSender(kafkaMessageSender)
+ .withConfigResourceUri(configResourceUri)
+ .withStreamingPath(streamingPath)
+ .build();
when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.succeededFuture());
}
@@ -156,8 +161,16 @@ public void initWithWildcardConfigResource(TestContext context) {
props.put("kafka.port", "9094");
storage.putMockData(configResourceUri, CONFIG_WILDCARD_RESOURCE);
- handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender,
- configResourceUri, streamingPath, props);
+ handler = KafkaHandler.builder()
+ .withVertx(vertxMock)
+ .withConfigurationResourceManager(configurationResourceManager)
+ .withRepository(repository)
+ .withKafkaMessageSender(kafkaMessageSender)
+ .withConfigResourceUri(configResourceUri)
+ .withStreamingPath(streamingPath)
+ .withProperties(props)
+ .build();
+
context.assertFalse(handler.isInitialized());
handler.initialize().onComplete(event -> {
@@ -182,8 +195,16 @@ public void initWithWildcardConfigResourceException(TestContext context) {
Map props = new HashMap<>();
storage.putMockData(configResourceUri, CONFIG_WILDCARD_RESOURCE);
- handler = new KafkaHandler(configurationResourceManager, repository, kafkaMessageSender,
- configResourceUri, streamingPath, props);
+ handler = KafkaHandler.builder()
+ .withVertx(vertxMock)
+ .withConfigurationResourceManager(configurationResourceManager)
+ .withRepository(repository)
+ .withKafkaMessageSender(kafkaMessageSender)
+ .withConfigResourceUri(configResourceUri)
+ .withStreamingPath(streamingPath)
+ .withProperties(props)
+ .build();
+
context.assertFalse(handler.isInitialized());
handler.initialize().onComplete(event -> {
@@ -195,7 +216,7 @@ public void initWithWildcardConfigResourceException(TestContext context) {
}
@Test
- public void resourceRemovedTriggersCloseAllProducers(TestContext context){
+ public void resourceRemovedTriggersCloseAllProducers(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
JsonObject object = new JsonObject();
@@ -209,7 +230,7 @@ public void resourceRemovedTriggersCloseAllProducers(TestContext context){
}
@Test
- public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext context){
+ public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext context) {
Async async = context.async();
context.assertFalse(handler.isInitialized());
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
@@ -235,12 +256,12 @@ public void resourceChangedTriggersCloseAllAndReCreateOfProducers(TestContext co
}};
verify(repository, timeout(500).times(1)).addKafkaProducer(eq(new KafkaConfiguration(Pattern.compile("."), configs_2)));
verifyNoInteractions(kafkaMessageSender);
- await().atMost(1, SECONDS).until( () -> handler.isInitialized(), equalTo(Boolean.TRUE));
+ await().atMost(1, SECONDS).until(() -> handler.isInitialized(), equalTo(Boolean.TRUE));
async.complete();
}
@Test
- public void handleNotStreamingPath(TestContext context){
+ public void handleNotStreamingPath(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
StreamingRequest request = new StreamingRequest(HttpMethod.POST, "/some/other/uri/path");
@@ -252,7 +273,7 @@ public void handleNotStreamingPath(TestContext context){
}
@Test
- public void handleNotPOSTRequest(TestContext context){
+ public void handleNotPOSTRequest(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
@@ -269,7 +290,7 @@ public void handleNotPOSTRequest(TestContext context){
}
@Test
- public void handleEmptyTopic(TestContext context){
+ public void handleEmptyTopic(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
@@ -286,7 +307,7 @@ public void handleEmptyTopic(TestContext context){
}
@Test
- public void handleNoMatchingProducer(TestContext context){
+ public void handleNoMatchingProducer(TestContext context) {
Async async = context.async();
handler.initialize().onComplete(event -> {
@@ -303,7 +324,7 @@ public void handleNoMatchingProducer(TestContext context){
}
@Test
- public void handleInvalidPayload(TestContext context){
+ public void handleInvalidPayload(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().onComplete(event -> {
@@ -322,7 +343,7 @@ public void handleInvalidPayload(TestContext context){
}
@Test
- public void handleValidPayloadWithSingleMessage(TestContext context){
+ public void handleValidPayloadWithSingleMessage(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().onComplete(event -> {
@@ -363,9 +384,9 @@ public void handleValidPayloadWithSingleMessage(TestContext context){
});
}
- @SuppressWarnings(value="unchecked")
+ @SuppressWarnings(value = "unchecked")
@Test
- public void handleValidPayloadWithTwoMessages(TestContext context){
+ public void handleValidPayloadWithTwoMessages(TestContext context) {
Async async = context.async();
storage.putMockData(configResourceUri, CONFIG_RESOURCE);
handler.initialize().onComplete(event -> {
@@ -422,7 +443,7 @@ public void handleValidPayloadWithTwoMessages(TestContext context){
}
@Test
- public void handleValidPayloadWithFailingMessageSending(TestContext context){
+ public void handleValidPayloadWithFailingMessageSending(TestContext context) {
Async async = context.async();
when(kafkaMessageSender.sendMessages(any(), any())).thenReturn(Future.failedFuture("booom: message could not be sent!"));
@@ -467,12 +488,19 @@ public void handleValidPayloadWithFailingMessageSending(TestContext context){
}
@Test
- public void handlePayloadNotPassingValidation(TestContext context){
+ public void handlePayloadNotPassingValidation(TestContext context) {
Async async = context.async();
- handler = new KafkaHandler(
- vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository,
- kafkaMessageSender, configResourceUri, streamingPath, null);
+ handler = KafkaHandler.builder()
+ .withVertx(vertxMock)
+ .withExceptionFactory(exceptionFactory)
+ .withConfigurationResourceManager(configurationResourceManager)
+ .withKafkaMessageValidator(messageValidator)
+ .withRepository(repository)
+ .withKafkaMessageSender(kafkaMessageSender)
+ .withConfigResourceUri(configResourceUri)
+ .withStreamingPath(streamingPath)
+ .build();
when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV, "Boooom")));
@@ -517,12 +545,19 @@ public void handlePayloadNotPassingValidation(TestContext context){
}
@Test
- public void handleErrorWhileValidation(TestContext context){
+ public void handleErrorWhileValidation(TestContext context) {
Async async = context.async();
- handler = new KafkaHandler(
- vertxMock, exceptionFactory, configurationResourceManager, messageValidator, repository,
- kafkaMessageSender, configResourceUri, streamingPath, null);
+ handler = KafkaHandler.builder()
+ .withVertx(vertxMock)
+ .withExceptionFactory(exceptionFactory)
+ .withConfigurationResourceManager(configurationResourceManager)
+ .withKafkaMessageValidator(messageValidator)
+ .withRepository(repository)
+ .withKafkaMessageSender(kafkaMessageSender)
+ .withConfigResourceUri(configResourceUri)
+ .withStreamingPath(streamingPath)
+ .build();
when(messageValidator.validateMessages(any(HttpServerRequest.class), any()))
.thenReturn(Future.failedFuture("Boooom"));
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java
index e2ba6751..54cbf2f6 100644
--- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
@@ -35,11 +37,14 @@ public class KafkaMessageSenderTest {
private KafkaProducer producer;
private KafkaMessageSender kafkaMessageSender;
+ private SimpleMeterRegistry meterRegistry;
@Before
public void setUp() {
producer = Mockito.mock(KafkaProducer.class);
kafkaMessageSender = new KafkaMessageSender();
+ meterRegistry = new SimpleMeterRegistry();
+ kafkaMessageSender.setMeterRegistry(meterRegistry);
}
@Test
@@ -57,6 +62,9 @@ public void sendSingleMessage(TestContext context) throws ValidationException {
});
Mockito.verify(producer, times(1)).send(eq(records.get(0)));
+
+ Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
+ context.assertEquals(1.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 1");
}
@Test
@@ -74,6 +82,9 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx
});
Mockito.verify(producer, times(1)).send(eq(records.get(0)));
+
+ Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
+ context.assertEquals(1.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 1");
}
@Test
@@ -98,6 +109,9 @@ public void sendMultipleMessages(TestContext context) throws ValidationException
context.assertEquals(records.get(0), recordCaptor.getAllValues().get(0));
context.assertEquals(records.get(1), recordCaptor.getAllValues().get(1));
context.assertEquals(records.get(2), recordCaptor.getAllValues().get(2));
+
+ Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
+ context.assertEquals(3.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 3");
}
@Test
@@ -124,6 +138,12 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V
context.assertEquals(records.get(0), recordCaptor.getAllValues().get(0));
context.assertEquals(records.get(1), recordCaptor.getAllValues().get(1));
context.assertEquals(records.get(2), recordCaptor.getAllValues().get(2));
+
+ Counter successCounter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
+ context.assertEquals(2.0, successCounter.count(), "Success counter for topic `myTopic` should have been incremented by 2");
+
+ Counter failCounter = meterRegistry.get(KafkaMessageSender.FAIL_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
+ context.assertEquals(1.0, failCounter.count(), "Fail counter for topic `myTopic` should have been incremented by 1");
}
private JsonObject buildSingleRecordPayload(String key){
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java
index 46f45053..65e1a3ea 100644
--- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java
@@ -1,7 +1,8 @@
package org.swisspush.gateleen.kafka;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.vertx.core.Future;
-import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
@@ -36,18 +37,18 @@
@RunWith(VertxUnitRunner.class)
public class KafkaMessageValidatorTest {
- private Vertx vertx;
private KafkaMessageValidator messageValidator;
private Validator validator;
private ValidationResourceManager validationResourceManager;
+ private SimpleMeterRegistry meterRegistry;
@Before
public void setUp() {
- vertx = Vertx.vertx();
validationResourceManager = Mockito.mock(ValidationResourceManager.class);
validator = Mockito.mock(Validator.class);
-
+ meterRegistry = new SimpleMeterRegistry();
messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
+ messageValidator.setMeterRegistry(meterRegistry);
}
@Test
@@ -141,6 +142,10 @@ public void testValidateMessagesMatchingValidationResourceEntry(TestContext cont
context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus());
verify(validationResourceManager, times(2)).getValidationResource();
verify(validator, times(1)).validateWithSchemaLocation(any(), any(), any());
+
+ Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
+ context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");
+
async.complete();
});
@@ -176,6 +181,10 @@ public void testValidateMessagesWithFailInValidator(TestContext context) {
context.assertTrue(event.failed());
verify(validationResourceManager, times(2)).getValidationResource();
verify(validator, times(2)).validateWithSchemaLocation(any(), any(), any());
+
+ Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
+ context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");
+
async.complete();
});
@@ -217,4 +226,47 @@ public void testValidateMultipleMessages(TestContext context) {
});
}
+
+ @Test
+ public void testValidateMultipleMessagesWithValidatedNegative(TestContext context) {
+ Async async = context.async();
+
+ ValidationResource validationResource = new ValidationResource();
+ validationResource.addResource(
+ Map.of(ValidationResource.METHOD_PROPERTY, "PUT",
+ ValidationResource.URL_PROPERTY, "/path/to/myTopic",
+ ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema"
+ ));
+
+ when(validationResourceManager.getValidationResource()).thenReturn(validationResource);
+
+ HttpServerResponse response = spy(new StreamingResponse(new HeadersMultiMap()));
+ StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new HeadersMultiMap(), response);
+
+ String payload_1 = new JsonObject().encode();
+ String payload_2 = new JsonObject().put("foo", "bar").encode();
+ String payload_3 = new JsonObject().put("abc", "def").encode();
+ List> kafkaProducerRecords = new ArrayList<>();
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1));
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2));
+ kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_3));
+
+ when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn(
+ Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)));
+ when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_2)), any())).thenReturn(
+ Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV)));
+
+ messageValidator.validateMessages(request, kafkaProducerRecords).onComplete(event -> {
+ context.assertTrue(event.succeeded());
+ context.assertEquals(ValidationStatus.VALIDATED_NEGATIV, event.result().getValidationStatus());
+ verify(validationResourceManager, times(2)).getValidationResource();
+ verify(validator, times(3)).validateWithSchemaLocation(any(), any(), any());
+
+ Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
+ context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");
+
+ async.complete();
+ });
+
+ }
}
diff --git a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java
index 3cb7a63c..382fbec2 100644
--- a/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java
+++ b/gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilderTest.java
@@ -8,15 +8,14 @@
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.swisspush.gateleen.core.util.JsonObjectUtils;
import org.swisspush.gateleen.validation.ValidationException;
import java.util.List;
+import static org.junit.Assert.assertThrows;
import static org.swisspush.gateleen.kafka.KafkaProducerRecordBuilder.buildRecords;
/**
@@ -27,35 +26,40 @@
@RunWith(VertxUnitRunner.class)
public class KafkaProducerRecordBuilderTest {
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
@Test
- public void buildRecordsInvalidJson() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Error while parsing payload");
- buildRecords("myTopic", Buffer.buffer("notValidJson"));
+ public void buildRecordsInvalidJson(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("notValidJson"));
+ });
+
+ context.assertEquals("Error while parsing payload", exception.getMessage());
}
@Test
- public void buildRecordsMissingRecordsArray() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Missing 'records' array");
- buildRecords("myTopic", Buffer.buffer("{}"));
+ public void buildRecordsMissingRecordsArray(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{}"));
+ });
+
+ context.assertEquals("Missing 'records' array", exception.getMessage());
}
@Test
- public void buildRecordsNotArray() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
- buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
+ public void buildRecordsNotArray(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\": \"shouldBeAnArray\"}"));
+ });
+
+ context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage());
}
@Test
- public void buildRecordsInvalidRecordsType() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'records' must be of type JsonArray holding JsonObject objects");
- buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
+ public void buildRecordsInvalidRecordsType(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\": [123]}"));
+ });
+
+ context.assertEquals("Property 'records' must be of type JsonArray holding JsonObject objects", exception.getMessage());
}
@Test
@@ -66,38 +70,48 @@ public void buildRecordsEmptyRecordsArray(TestContext context) throws Validation
}
@Test
- public void buildRecordsInvalidKeyType() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'key' must be of type String");
- buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
+ public void buildRecordsInvalidKeyType(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"key\": 123,\"value\": {}}]}"));
+ });
+
+ context.assertEquals("Property 'key' must be of type String", exception.getMessage());
}
@Test
- public void buildRecordsInvalidValueType() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'value' must be of type JsonObject");
- buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
+ public void buildRecordsInvalidValueType(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\":[{\"value\":123}]}"));
+ });
+
+ context.assertEquals("Property 'value' must be of type JsonObject", exception.getMessage());
}
@Test
- public void buildRecordsMissingValue() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'value' is required");
- buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
+ public void buildRecordsMissingValue(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\":[{}]}"));
+ });
+
+ context.assertEquals("Property 'value' is required", exception.getMessage());
}
@Test
- public void buildRecordsInvalidHeadersType() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'headers' must be of type JsonObject");
- buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
+ public void buildRecordsInvalidHeadersType(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\":{},\"headers\": 123}]}"));
+ });
+
+ context.assertEquals("Property 'headers' must be of type JsonObject", exception.getMessage());
}
@Test
- public void buildRecordsInvalidHeadersValueType() throws ValidationException {
- thrown.expect( ValidationException.class );
- thrown.expectMessage("Property 'headers' must be of type JsonObject holding String values only");
- buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
+ public void buildRecordsInvalidHeadersValueType(TestContext context) {
+ Exception exception = assertThrows(ValidationException.class, () -> {
+ buildRecords("myTopic", Buffer.buffer("{\"records\": [{\"value\": {},\"headers\": {\"key\": 555}}]}"));
+ });
+
+ context.assertEquals("Property 'headers' must be of type JsonObject holding String values only", exception.getMessage());
}
@Test
diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java
index 2559fd4c..f025933e 100755
--- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java
+++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java
@@ -1,9 +1,6 @@
package org.swisspush.gateleen.playground;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
+import io.vertx.core.*;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
@@ -284,8 +281,17 @@ public void start() {
KafkaProducerRepository kafkaProducerRepository = new KafkaProducerRepository(vertx);
KafkaMessageSender kafkaMessageSender = new KafkaMessageSender();
KafkaMessageValidator messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
- kafkaHandler = new KafkaHandler(configurationResourceManager, messageValidator, kafkaProducerRepository, kafkaMessageSender,
- SERVER_ROOT + "/admin/v1/kafka/topicsConfig",SERVER_ROOT + "/streaming/");
+
+ kafkaHandler = KafkaHandler.builder()
+ .withVertx(vertx)
+ .withConfigurationResourceManager(configurationResourceManager)
+ .withKafkaMessageValidator(messageValidator)
+ .withRepository(kafkaProducerRepository)
+ .withKafkaMessageSender(kafkaMessageSender)
+ .withConfigResourceUri(SERVER_ROOT + "/admin/v1/kafka/topicsConfig")
+ .withStreamingPath(SERVER_ROOT + "/streaming/")
+ .build();
+
kafkaHandler.initialize();
schedulerResourceManager = new SchedulerResourceManager(vertx, redisProvider, storage, monitoringHandler,