Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#654 collecting kafka metrics #627

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions gateleen-kafka/README_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The following topic configuration values are required:
Besides these required configuration values, additional string values can be added. See documentation from Apache Kafka [here](https://kafka.apache.org/documentation/#producerconfigs).

## Usage
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
To use the gateleen-kafka module, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) class has to be initialized as described in the _configuration_ section. Also, the [KafkaHandler](src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java) has to be integrated in the "MainVerticle" handling all
incoming requests. See [Playground Server](../gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java) and [Runconfig](../gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java).

The following sequence diagram shows the setup of the "MainVerticle". The `streamingPath` (KafkaHandler) is configured to `/playground/server/streaming/`
Expand Down Expand Up @@ -448,4 +448,30 @@ This sequence diagrams shows the process when messages are sent to Kafka:
│ <────────────────────│ │ │ │ │ │
│ │ │ │ │ │ │
│ └┬┘ │ │ │ │
```
```

### Micrometer metrics
The kafka feature is monitored with micrometer. The following metrics are available:
* gateleen_kafka_send_success_messages_total
* gateleen_kafka_send_fail_messages_total
* gateleen_kafka_validation_fail_messages_total

Additional tags are provided to specify the topic.

Example metrics:

```
# HELP gateleen_kafka_send_success_messages_total Amount of successfully sent kafka messages
# TYPE gateleen_kafka_send_success_messages_total counter
gateleen_kafka_send_success_messages_total{topic="my-topic-1",} 0.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-1",} 455.0
gateleen_kafka_send_success_messages_total{topic="my-topic-2",} 256.0
gateleen_kafka_send_success_messages_total{topic="my-topic-3",} 6.0
gateleen_kafka_send_fail_messages_total{topic="my-topic-4",} 222.0
# HELP gateleen_kafka_validation_fail_messages_total Amount of failed kafka message validations
# TYPE gateleen_kafka_validation_fail_messages_total counter
gateleen_kafka_validation_fail_messages_total{topic="my-topic-6",} 212.0
```

To enable `gateleen_kafka_send_success_messages_total` and `gateleen_kafka_send_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageSender` class.
To enable `gateleen_kafka_validation_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageValidator` class.
4 changes: 4 additions & 0 deletions gateleen-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
<artifactId>gateleen-validation</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- TEST dependencies -->
<dependency>
<groupId>org.swisspush.gateleen</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.swisspush.gateleen.core.validation.ValidationStatus;
import org.swisspush.gateleen.validation.ValidationException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory;

/**
* Handler class for all Kafka related requests.
*
Expand All @@ -52,46 +49,10 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
private final KafkaMessageSender kafkaMessageSender;
private final Map<String, Object> properties;
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
private KafkaMessageValidator kafkaMessageValidator;
private final KafkaMessageValidator kafkaMessageValidator;

private boolean initialized = false;

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath) {
this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator,
KafkaProducerRepository repository, KafkaMessageSender kafkaMessageSender, String configResourceUri,
String streamingPath) {
this(configurationResourceManager, kafkaMessageValidator, repository, kafkaMessageSender,
configResourceUri, streamingPath, new HashMap<>());
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {

this(configurationResourceManager, null, repository, kafkaMessageSender,
configResourceUri, streamingPath, properties);
}

/** @deprecated Use {@link #builder()} */
@Deprecated
public KafkaHandler(ConfigurationResourceManager configurationResourceManager, KafkaMessageValidator kafkaMessageValidator, KafkaProducerRepository repository,
KafkaMessageSender kafkaMessageSender, String configResourceUri, String streamingPath, Map<String, Object> properties) {
this(Vertx.vertx(), newGateleenThriftyExceptionFactory(), configurationResourceManager,
kafkaMessageValidator, repository, kafkaMessageSender, configResourceUri, streamingPath,
properties);
log.warn("TODO: Do NOT use this DEPRECATED constructor! It creates instances that it should not create!");
}

public KafkaHandler(
Vertx vertx,
GateleenExceptionFactory exceptionFactory,
Expand Down Expand Up @@ -140,8 +101,6 @@ private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
Promise<Void> promise = Promise.promise();
final List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration, properties);



repository.closeAll().future().onComplete((event -> {
for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
repository.addKafkaProducer(kafkaConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -9,7 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
Expand All @@ -18,17 +22,32 @@ public class KafkaMessageSender {

private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> successSendCounterMap = new HashMap<>();
private final Map<String, Counter> failSendCounterMap = new HashMap<>();

public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
public static final String TOPIC = "topic";

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
successSendCounterMap.clear();
failSendCounterMap.clear();
}

Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
List<KafkaProducerRecord<String, String>> messages) {
Promise<Void> promise = Promise.promise();
log.debug("Start processing {} messages for kafka", messages.size());

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = messages.stream()
List<Future<Void>> futures = messages.stream()
.map(message -> KafkaMessageSender.this.sendMessage(kafkaProducer, message))
.collect(toList());

CompositeFuture.all(futures).<Void>mapEmpty().onComplete(result -> {
Future.all(futures).<Void>mapEmpty().onComplete(result -> {
if (result.succeeded()) {
promise.complete();
log.debug("Batch messages successfully sent to Kafka.");
Expand All @@ -44,7 +63,45 @@ private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, Ka
return kafkaProducer.send(message).compose((Function<RecordMetadata, Future<Void>>) metadata -> {
log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}",
metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());
incrementSuccessCount(metadata.getTopic());
return Future.succeededFuture();
}).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
}).onFailure(throwable -> {
log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
incrementFailCount1(message.topic());
});
}

private synchronized void incrementSuccessCount(String topic) {
Counter counter = successSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(SUCCESS_SEND_MESSAGES_METRIC)
.description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
successSendCounterMap.put(topic, newCounter);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incrementSuccessCount and incrementFailCount1 are NOT thread-save. Is it ensured only one thread is at work here (eg in case of multiple verticle instances)?


private synchronized void incrementFailCount1(String topic) {
Counter counter = failSendCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_SEND_MESSAGES_METRIC)
.description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failSendCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.gateleen.kafka;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
Expand All @@ -14,6 +16,7 @@
import org.swisspush.gateleen.validation.ValidationUtil;
import org.swisspush.gateleen.validation.Validator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,11 +29,23 @@ public class KafkaMessageValidator {
private final Validator validator;
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);

private MeterRegistry meterRegistry;
private final Map<String, Counter> failedToValidateCounterMap = new HashMap<>();

public static final String FAIL_VALIDATION_MESSAGES_METRIC = "gateleen.kafka.validation.fail.messages";
public static final String FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message validations";
public static final String TOPIC = "topic";

public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
this.validationResourceManager = validationResourceManager;
this.validator = validator;
}

public void setMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
failedToValidateCounterMap.clear();
}

public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
if (kafkaProducerRecords.isEmpty()) {
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
Expand All @@ -49,6 +64,8 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List

SchemaLocation schemaLocation = optionalSchemaLocation.get();

String topic = kafkaProducerRecords.get(0).topic();

@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
List<Future> futures = kafkaProducerRecords.stream()
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
Expand All @@ -57,10 +74,31 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List
return CompositeFuture.all(futures).compose(compositeFuture -> {
for (Object o : compositeFuture.list()) {
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
incrementValidationFailCount(topic);
return Future.succeededFuture((ValidationResult) o);
}
}
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
}, throwable -> {
incrementValidationFailCount(topic);
return Future.failedFuture(throwable);
});
}

private void incrementValidationFailCount(String topic) {
Counter counter = failedToValidateCounterMap.get(topic);
if(counter != null) {
counter.increment();
return;
}

if(meterRegistry != null) {
Counter newCounter = Counter.builder(FAIL_VALIDATION_MESSAGES_METRIC)
.description(FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION)
.tag(TOPIC, topic)
.register(meterRegistry);
newCounter.increment();
failedToValidateCounterMap.put(topic, newCounter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ Optional<Pair<KafkaProducer<String, String>, Pattern>> findMatchingKafkaProducer
Promise<Void> closeAll() {
log.info("About to close all kafka producers");
Promise<Void> promise = Promise.promise();
List<Future> futures = new ArrayList<>();
List<Future<Void>> futures = new ArrayList<>();

for (Map.Entry<Pattern, KafkaProducer<String, String>> entry : kafkaProducers.entrySet()) {
Promise entryFuture = Promise.promise();
Promise<Void> entryFuture = Promise.promise();
futures.add(entryFuture.future());
entry.getValue().close(event -> {
if (event.succeeded()) {
Expand All @@ -62,7 +62,7 @@ Promise<Void> closeAll() {
}

// wait for all producers to be closed
CompositeFuture.all(futures).onComplete(event -> {
Future.all(futures).onComplete(event -> {
kafkaProducers.clear();
promise.complete();
});
Expand Down
Loading
Loading