Skip to content

Commit

Permalink
Merge pull request #627 from swisspost/feature/issue654_collect_kafka…
Browse files Browse the repository at this point in the history
…_metrics

#654 collecting kafka metrics
  • Loading branch information
mcweba authored Jan 13, 2025
2 parents fa073d8 + df9c2ff commit 6be58ea
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 132 deletions.
30 changes: 28 additions & 2 deletions gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The following topic configuration values are required:
Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs).

## Usage
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java).

The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/`
Expand Down Expand Up @@ -448,4 +448,30 @@ This sequence diagrams shows the process when messages are sent to Kafka:
│ <────────────────────│ │ │ │ │ │
│ │ │ │ │ │ │
│ └┬┘ │ │ │ │
```
```

### Micrometer metrics
The kafka feature is monitored with micrometer. The following metrics are available:
* gateleen_kafka_send_success_messages_total
* gateleen_kafka_send_fail_messages_total
* gateleen_kafka_validation_fail_messages_total

Additional tags are provided to specify the topic.

Example metrics:

```
# HELP gateleen_kafka_send_success_messages_total Amount of successfully sent kafka messages
# TYPE gateleen_kafka_send_success_messages_total counter
gateleen_kafka_send_success_messages_total{topic="my-topic-1",} 0.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-1",} 455.0
gateleen_kafka_send_success_messages_total{topic="my-topic-2",} 256.0
gateleen_kafka_send_success_messages_total{topic="my-topic-3",} 6.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-4",} 222.0
# HELP gateleen_kafka_validation_fail_messages_total Amount of failed kafka message validations
# TYPE gateleen_kafka_validation_fail_messages_total counter
gateleen_kafka_validation_fail_messages_total{topic="my-topic-6",} 212.0
```

To enable `gateleen_kafka_send_success_messages_total` and `gateleen_kafka_send_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageSender` class.
To enable `gateleen_kafka_validation_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageValidator` class.
4 changes: 4 additions & 0 deletions gateleen-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
<artifactId>gateleen-validation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- TEST dependencies -->
<dependency>
<groupId>org.swisspush.gateleen</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -52,46 +49,10 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
private KafkaMessageValidator kafkaMessageValidator;
private final KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}

public KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
Expand Down Expand Up @@ -140,8 +101,6 @@ private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration, properties);



repository.closeAll().future().onComplete((event -> {
for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
repository.addKafkaProducer(kafkaConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -9,7 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
Expand All @@ -18,17 +22,32 @@ public class KafkaMessageSender {

private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> successSendCounterMap = new HashMap<>();
private final Map<String, Counter> failSendCounterMap = new HashMap<>();

public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
public static final String TOPIC = "topic";

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
successSendCounterMap.clear();
failSendCounterMap.clear();
}

Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
List<KafkaProducerRecord<String, String>> messages) {
Promise<Void> promise = Promise.promise();
log.debug("Start processing {} messages for kafka", messages.size());

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = messages.stream()
List<Future<Void>> futures = messages.stream()
.map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message))
.collect(toList());

CompositeFuture.all(futures).<Void>mapEmpty().onComplete(result -> {
Future.all(futures).<Void>mapEmpty().onComplete(result -> {
if (result.succeeded()) {
promise.complete();
log.debug("Batch messages successfully sent to Kafka.");
Expand All @@ -44,7 +63,45 @@ private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, Ka
return kafkaProducer.send(message).compose((Function<RecordMetadata, Future<Void>>) metadata -> {
log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}",
metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());
incrementSuccessCount(metadata.getTopic());
return Future.succeededFuture();
}).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
}).onFailure(throwable -> {
log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
incrementFailCount1(message.topic());
});
}

private synchronized void incrementSuccessCount(String topic) {
Counter counter = successSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(SUCCESS_SEND_MESSAGES_METRIC)
.description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
successSendCounterMap.put(topic, newCounter);
}
}

private synchronized void incrementFailCount1(String topic) {
Counter counter = failSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_SEND_MESSAGES_METRIC)
.description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failSendCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
Expand All @@ -14,6 +16,7 @@
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,11 +29,23 @@ public class KafkaMessageValidator {
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> failedToValidateCounterMap = new HashMap<>();

public static final String FAIL_VALIDATION_MESSAGES_METRIC = "gateleen.kafka.validation.fail.messages";
public static final String FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message validations";
public static final String TOPIC = "topic";

public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
failedToValidateCounterMap.clear();
}

public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
Expand All @@ -49,6 +64,8 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List

SchemaLocation schemaLocation = optionalSchemaLocation.get();

String topic = kafkaProducerRecords.get(0).topic();

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
Expand All @@ -57,10 +74,31 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List
return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
incrementValidationFailCount(topic);
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}, throwable -> {
incrementValidationFailCount(topic);
return Future.failedFuture(throwable);
});
}

private void incrementValidationFailCount(String topic) {
Counter counter = failedToValidateCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_VALIDATION_MESSAGES_METRIC)
.description(FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failedToValidateCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ Optional<Pair<KafkaProducer<String, String>, Pattern>> findMatchingKafkaProducer
Promise<Void> closeAll() {
log.info("About to close all kafka producers");
Promise<Void> promise = Promise.promise();
List<Future> futures = new ArrayList<>();
List<Future<Void>> futures = new ArrayList<>();

for (Map.Entry<Pattern, KafkaProducer<String, String>> entry : kafkaProducers.entrySet()) {
Promise entryFuture = Promise.promise();
Promise<Void> entryFuture = Promise.promise();
futures.add(entryFuture.future());
entry.getValue().close(event -> {
if (event.succeeded()) {
Expand All @@ -62,7 +62,7 @@ Promise<Void> closeAll() {
}

// wait for all producers to be closed
CompositeFuture.all(futures).onComplete(event -> {
Future.all(futures).onComplete(event -> {
kafkaProducers.clear();
promise.complete();
});
Expand Down
Loading

0 comments on commit 6be58ea

Please sign in to comment.