Skip to content

Commit

Permalink
Add dynamic DLQ kafka config (#53)
Browse files Browse the repository at this point in the history
* Add dynamic DLQ kafka config

* Checkstyle

* Add wildcard config

* Add more props on DLQ KAFKA Wildcard props

* Rename enum, add case-insensitive matcher, add more test

* remove unused getter

* bump version

* feat: enable multiple underscore parsing

* chore: revert extra underscore parsing
  • Loading branch information
ekawinataa authored Oct 2, 2024
1 parent 263d7c7 commit a376678
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 14 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.10.6'
version '0.10.7'

def projName = "firehose"

Expand Down
7 changes: 7 additions & 0 deletions docs/docs/advance/dlq.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Curre
* Type: `optional`
* Default value: `firehose-retry-topic`

## `DLQ_KAFKA_(.*)`

* Example property: `DLQ_KAFKA_SASL_JAAS_CONFIG`, `DLQ_KAFKA_SECURITY_PROTOCOL`, `DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD`, `DLQ_KAFKA_SASL_MECHANISM`
* Type: `optional`
* Default value: `null`
* Description: Any property starting with `DLQ_KAFKA_` will be passed to the kafka producer. This one is useful for setting any kafka producer property that is not available in the configuration.

## `DLQ_S3_REGION"`

Amazon S3 creates buckets in a Region that you specify.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.gotocompany.firehose.sink.dlq.blobstorage.BlobStorageDlqWriter;
import com.gotocompany.firehose.sink.dlq.kafka.KafkaDlqWriter;
import com.gotocompany.firehose.sink.dlq.log.LogDlqWriter;
import com.gotocompany.firehose.utils.KafkaProducerTypesMetadata;
import com.gotocompany.firehose.utils.KafkaUtils;
import com.gotocompany.depot.metrics.StatsDReporter;
import io.opentracing.Tracer;
Expand All @@ -25,7 +26,7 @@ public static DlqWriter create(Map<String, String> configuration, StatsDReporter
switch (dlqConfig.getDlqWriterType()) {
case KAFKA:
DlqKafkaProducerConfig dlqKafkaProducerConfig = ConfigFactory.create(DlqKafkaProducerConfig.class, configuration);
KafkaProducer<byte[], byte[]> kafkaProducer = KafkaUtils.getKafkaProducer(dlqKafkaProducerConfig);
KafkaProducer<byte[], byte[]> kafkaProducer = KafkaUtils.getKafkaProducer(KafkaProducerTypesMetadata.DLQ, dlqKafkaProducerConfig, configuration);
TracingKafkaProducer<byte[], byte[]> tracingProducer = new TracingKafkaProducer<>(kafkaProducer, tracer);

return new KafkaDlqWriter(tracingProducer, dlqKafkaProducerConfig.getDlqKafkaTopic(), new FirehoseInstrumentation(client, KafkaDlqWriter.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.gotocompany.firehose.utils;

import java.util.regex.Pattern;

public enum KafkaProducerTypesMetadata {
DLQ("DLQ_KAFKA_");

private final String configurationPrefix;

KafkaProducerTypesMetadata(String dlqKafka) {
this.configurationPrefix = dlqKafka;
}

public Pattern getConfigurationPattern() {
return Pattern.compile(String.format("^%s(.*)", configurationPrefix), Pattern.CASE_INSENSITIVE);
}
}
39 changes: 27 additions & 12 deletions src/main/java/com/gotocompany/firehose/utils/KafkaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
Expand All @@ -33,7 +34,6 @@ public class KafkaUtils {
private static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";



/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
*
Expand Down Expand Up @@ -92,20 +92,35 @@ public static FirehoseKafkaConsumer createConsumer(KafkaConsumerConfig config, M
/**
* Gets kafka producer.
*
* @param config the config
* @param kafkaProducerTypesMetadata the connector type, current supported value is DLQ and SOURCE
* @param dlqKafkaProducerConfig the dlqKafkaProducerConfig
* @param configurations the configurations which dynamically set by the user
* @return the kafka producer
*/
public static KafkaProducer<byte[], byte[]> getKafkaProducer(DlqKafkaProducerConfig config) {
public static KafkaProducer<byte[], byte[]> getKafkaProducer(KafkaProducerTypesMetadata kafkaProducerTypesMetadata,
DlqKafkaProducerConfig dlqKafkaProducerConfig,
Map<String, String> configurations) {
Properties props = new Properties();
props.put("bootstrap.servers", config.getDlqKafkaBrokers());
props.put("acks", config.getDlqKafkaAcks());
props.put("retries", config.getDlqKafkaRetries());
props.put("batch.size", config.getDlqKafkaBatchSize());
props.put("linger.ms", config.getDlqKafkaLingerMs());
props.put("buffer.memory", config.getDlqKafkaBufferMemory());
props.put("key.serializer", config.getDlqKafkaKeySerializer());
props.put("value.serializer", config.getDlqKafkaValueSerializer());

props.put("bootstrap.servers", dlqKafkaProducerConfig.getDlqKafkaBrokers());
props.put("acks", dlqKafkaProducerConfig.getDlqKafkaAcks());
props.put("retries", dlqKafkaProducerConfig.getDlqKafkaRetries());
props.put("batch.size", dlqKafkaProducerConfig.getDlqKafkaBatchSize());
props.put("linger.ms", dlqKafkaProducerConfig.getDlqKafkaLingerMs());
props.put("buffer.memory", dlqKafkaProducerConfig.getDlqKafkaBufferMemory());
props.put("key.serializer", dlqKafkaProducerConfig.getDlqKafkaKeySerializer());
props.put("value.serializer", dlqKafkaProducerConfig.getDlqKafkaValueSerializer());
props.putAll(getAdditionalKafkaConfiguration(kafkaProducerTypesMetadata, configurations));
return new KafkaProducer<>(props);
}

private static Properties getAdditionalKafkaConfiguration(KafkaProducerTypesMetadata kafkaProducerTypesMetadata, Map<String, String> configurations) {
Properties additionalProperties = new Properties();
configurations.forEach((key, value) -> {
Matcher matcher = kafkaProducerTypesMetadata.getConfigurationPattern().matcher(key);
if (matcher.find()) {
additionalProperties.put(matcher.group(1).replaceAll("_", ".").toLowerCase(), value);
}
});
return additionalProperties;
}
}
103 changes: 103 additions & 0 deletions src/test/java/com/gotocompany/firehose/utils/KafkaUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.gotocompany.firehose.utils;

import com.gotocompany.firehose.config.DlqKafkaProducerConfig;
import org.aeonbits.owner.ConfigFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.types.Password;
import org.junit.Test;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class KafkaUtilsTest {

private static final String DLQ_KAFKA_ACKS = "DLQ_KAFKA_ACKS";
private static final String DLQ_KAFKA_RETRIES = "DLQ_KAFKA_RETRIES";
private static final String DLQ_KAFKA_BATCH_SIZE = "DLQ_KAFKA_BATCH_SIZE";
private static final String DLQ_KAFKA_LINGER_MS = "DLQ_KAFKA_LINGER_MS";
private static final String DLQ_KAFKA_BUFFER_MEMORY = "DLQ_KAFKA_BUFFER_MEMORY";
private static final String DLQ_KAFKA_KEY_SERIALIZER = "DLQ_KAFKA_KEY_SERIALIZER";
private static final String DLQ_KAFKA_VALUE_SERIALIZER = "DLQ_KAFKA_VALUE_SERIALIZER";
private static final String DLQ_KAFKA_BROKERS = "DLQ_KAFKA_BROKERS";
private static final String DLQ_KAFKA_TOPIC = "DLQ_KAFKA_TOPIC";
private static final String DLQ_KAFKA_SECURITY_PROTOCOL = "dlQ_KaFKa_SeCuRITY_proTOCOL";
private static final String DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD_CONFIG = "DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD";
private static final String DLQ_KAFKA_SASL_MECHANISM = "DLQ_KAFKA_SASL_MECHANISM";
private static final String DLQ_KAFKA_SASL_JAAS_CONFIG = "DLQ_KAFKA_SASL_JAAS_CONFIG";

@Test
public void createShouldReturnKafkaProducerWithCorrectProperties() throws NoSuchFieldException, IllegalAccessException {
Map<String, String> properties = getDlqProperties();
DlqKafkaProducerConfig dlqKafkaProducerConfig = ConfigFactory.create(DlqKafkaProducerConfig.class, properties);

KafkaProducer<byte[], byte[]> kafkaProducer = KafkaUtils.getKafkaProducer(KafkaProducerTypesMetadata.DLQ, dlqKafkaProducerConfig, properties);
Field producerConfigField = KafkaProducer.class.getDeclaredField("producerConfig");
producerConfigField.setAccessible(true);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer);

assertEquals(properties.get(DLQ_KAFKA_ACKS), producerConfig.getString("acks"));
assertEquals(properties.get(DLQ_KAFKA_RETRIES), String.valueOf(producerConfig.getInt("retries")));
assertEquals(properties.get(DLQ_KAFKA_BATCH_SIZE), String.valueOf(producerConfig.getInt("batch.size")));
assertEquals(properties.get(DLQ_KAFKA_LINGER_MS), String.valueOf(producerConfig.getLong("linger.ms")));
assertEquals(properties.get(DLQ_KAFKA_BUFFER_MEMORY), String.valueOf(producerConfig.getLong("buffer.memory")));
assertEquals(properties.get(DLQ_KAFKA_KEY_SERIALIZER), producerConfig.getClass("key.serializer").getName());
assertEquals(properties.get(DLQ_KAFKA_VALUE_SERIALIZER), producerConfig.getClass("value.serializer").getName());
assertEquals(Arrays.asList(properties.get(DLQ_KAFKA_BROKERS).split(",")), producerConfig.getList("bootstrap.servers"));
assertEquals(properties.get(DLQ_KAFKA_SECURITY_PROTOCOL), producerConfig.getString("security.protocol"));
assertEquals(properties.get(DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD_CONFIG), producerConfig.getString("ssl.truststore.password"));
assertEquals(properties.get(DLQ_KAFKA_SASL_MECHANISM), producerConfig.getString("sasl.mechanism"));
assertEquals(new Password(properties.get(DLQ_KAFKA_SASL_JAAS_CONFIG)), producerConfig.getPassword("sasl.jaas.config"));
}

@Test
public void createShouldIgnoreParametersNotMatchingPrefix() throws NoSuchFieldException, IllegalAccessException {
Map<String, String> properties = getDlqProperties();
properties.put("DLQ_KAFKACLIENT_ID", "clientId");
properties.put("NOTDLQ_KAFKA_CLIENT_ID", "clientId");
DlqKafkaProducerConfig kafkaProducerConfig = ConfigFactory.create(DlqKafkaProducerConfig.class, properties);

KafkaProducer<byte[], byte[]> kafkaProducer = KafkaUtils.getKafkaProducer(KafkaProducerTypesMetadata.DLQ, kafkaProducerConfig, properties);
Field producerConfigField = KafkaProducer.class.getDeclaredField("producerConfig");
producerConfigField.setAccessible(true);
ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer);

assertEquals(properties.get(DLQ_KAFKA_ACKS), producerConfig.getString("acks"));
assertEquals(properties.get(DLQ_KAFKA_RETRIES), String.valueOf(producerConfig.getInt("retries")));
assertEquals(properties.get(DLQ_KAFKA_BATCH_SIZE), String.valueOf(producerConfig.getInt("batch.size")));
assertEquals(properties.get(DLQ_KAFKA_LINGER_MS), String.valueOf(producerConfig.getLong("linger.ms")));
assertEquals(properties.get(DLQ_KAFKA_BUFFER_MEMORY), String.valueOf(producerConfig.getLong("buffer.memory")));
assertEquals(properties.get(DLQ_KAFKA_KEY_SERIALIZER), producerConfig.getClass("key.serializer").getName());
assertEquals(properties.get(DLQ_KAFKA_VALUE_SERIALIZER), producerConfig.getClass("value.serializer").getName());
assertEquals(Arrays.asList(properties.get(DLQ_KAFKA_BROKERS).split(",")), producerConfig.getList("bootstrap.servers"));
assertEquals(properties.get(DLQ_KAFKA_SECURITY_PROTOCOL), producerConfig.getString("security.protocol"));
assertEquals(properties.get(DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD_CONFIG), producerConfig.getString("ssl.truststore.password"));
assertEquals(properties.get(DLQ_KAFKA_SASL_MECHANISM), producerConfig.getString("sasl.mechanism"));
assertEquals(new Password(properties.get(DLQ_KAFKA_SASL_JAAS_CONFIG)), producerConfig.getPassword("sasl.jaas.config"));
assertTrue(StringUtils.isEmpty(producerConfig.getString("client.id")));

}

private static Map<String, String> getDlqProperties() {
Map<String, String> properties = new HashMap<>();
properties.put(DLQ_KAFKA_ACKS, "all");
properties.put(DLQ_KAFKA_RETRIES, "2147483647");
properties.put(DLQ_KAFKA_BATCH_SIZE, "16384");
properties.put(DLQ_KAFKA_LINGER_MS, "0");
properties.put(DLQ_KAFKA_BUFFER_MEMORY, "33554432");
properties.put(DLQ_KAFKA_KEY_SERIALIZER, "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(DLQ_KAFKA_VALUE_SERIALIZER, "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(DLQ_KAFKA_BROKERS, "localhost:9092");
properties.put(DLQ_KAFKA_TOPIC, "firehose-retry-topic");
properties.put(DLQ_KAFKA_SECURITY_PROTOCOL, "SASL_SSL");
properties.put(DLQ_KAFKA_SASL_MECHANISM, "OAUTHBEARER");
properties.put(DLQ_KAFKA_SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");
return properties;
}
}

0 comments on commit a376678

Please sign in to comment.