diff --git a/build.gradle b/build.gradle index 1f5450373..bdd91140b 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'com.gotocompany' -version '0.10.6' +version '0.10.7' def projName = "firehose" diff --git a/docs/docs/advance/dlq.md b/docs/docs/advance/dlq.md index 80a79c629..91d9b2ff8 100644 --- a/docs/docs/advance/dlq.md +++ b/docs/docs/advance/dlq.md @@ -154,6 +154,13 @@ If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Curre * Type: `optional` * Default value: `firehose-retry-topic` +## `DLQ_KAFKA_(.*)` + +* Example property: `DLQ_KAFKA_SASL_JAAS_CONFIG`, `DLQ_KAFKA_SECURITY_PROTOCOL`, `DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD`, `DLQ_KAFKA_SASL_MECHANISM` +* Type: `optional` +* Default value: `null` +* Description: Any property starting with `DLQ_KAFKA_` will be passed to the kafka producer. This one is useful for setting any kafka producer property that is not available in the configuration. + ## `DLQ_S3_REGION"` Amazon S3 creates buckets in a Region that you specify. diff --git a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java index 79e3d9bf8..be7e315a0 100644 --- a/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java @@ -8,6 +8,7 @@ import com.gotocompany.firehose.sink.dlq.blobstorage.BlobStorageDlqWriter; import com.gotocompany.firehose.sink.dlq.kafka.KafkaDlqWriter; import com.gotocompany.firehose.sink.dlq.log.LogDlqWriter; +import com.gotocompany.firehose.utils.KafkaProducerTypesMetadata; import com.gotocompany.firehose.utils.KafkaUtils; import com.gotocompany.depot.metrics.StatsDReporter; import io.opentracing.Tracer; @@ -25,7 +26,7 @@ public static DlqWriter create(Map configuration, StatsDReporter switch (dlqConfig.getDlqWriterType()) { case KAFKA: DlqKafkaProducerConfig dlqKafkaProducerConfig = ConfigFactory.create(DlqKafkaProducerConfig.class, configuration); - KafkaProducer kafkaProducer = KafkaUtils.getKafkaProducer(dlqKafkaProducerConfig); + KafkaProducer kafkaProducer = KafkaUtils.getKafkaProducer(KafkaProducerTypesMetadata.DLQ, dlqKafkaProducerConfig, configuration); TracingKafkaProducer tracingProducer = new TracingKafkaProducer<>(kafkaProducer, tracer); return new KafkaDlqWriter(tracingProducer, dlqKafkaProducerConfig.getDlqKafkaTopic(), new FirehoseInstrumentation(client, KafkaDlqWriter.class)); diff --git a/src/main/java/com/gotocompany/firehose/utils/KafkaProducerTypesMetadata.java b/src/main/java/com/gotocompany/firehose/utils/KafkaProducerTypesMetadata.java new file mode 100644 index 000000000..0dec5440d --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/utils/KafkaProducerTypesMetadata.java @@ -0,0 +1,17 @@ +package com.gotocompany.firehose.utils; + +import java.util.regex.Pattern; + +public enum KafkaProducerTypesMetadata { + DLQ("DLQ_KAFKA_"); + + private final String configurationPrefix; + + KafkaProducerTypesMetadata(String dlqKafka) { + this.configurationPrefix = dlqKafka; + } + + public Pattern getConfigurationPattern() { + return Pattern.compile(String.format("^%s(.*)", configurationPrefix), Pattern.CASE_INSENSITIVE); + } +} diff --git a/src/main/java/com/gotocompany/firehose/utils/KafkaUtils.java b/src/main/java/com/gotocompany/firehose/utils/KafkaUtils.java index 5c0db1706..5cc3b5a03 100644 --- a/src/main/java/com/gotocompany/firehose/utils/KafkaUtils.java +++ b/src/main/java/com/gotocompany/firehose/utils/KafkaUtils.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.regex.Matcher; import java.util.regex.Pattern; /** @@ -33,7 +34,6 @@ public class KafkaUtils { private static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy"; - /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * @@ -92,20 +92,35 @@ public static FirehoseKafkaConsumer createConsumer(KafkaConsumerConfig config, M /** * Gets kafka producer. * - * @param config the config + * @param kafkaProducerTypesMetadata the connector type, current supported value is DLQ and SOURCE + * @param dlqKafkaProducerConfig the dlqKafkaProducerConfig + * @param configurations the configurations which dynamically set by the user * @return the kafka producer */ - public static KafkaProducer getKafkaProducer(DlqKafkaProducerConfig config) { + public static KafkaProducer getKafkaProducer(KafkaProducerTypesMetadata kafkaProducerTypesMetadata, + DlqKafkaProducerConfig dlqKafkaProducerConfig, + Map configurations) { Properties props = new Properties(); - props.put("bootstrap.servers", config.getDlqKafkaBrokers()); - props.put("acks", config.getDlqKafkaAcks()); - props.put("retries", config.getDlqKafkaRetries()); - props.put("batch.size", config.getDlqKafkaBatchSize()); - props.put("linger.ms", config.getDlqKafkaLingerMs()); - props.put("buffer.memory", config.getDlqKafkaBufferMemory()); - props.put("key.serializer", config.getDlqKafkaKeySerializer()); - props.put("value.serializer", config.getDlqKafkaValueSerializer()); - + props.put("bootstrap.servers", dlqKafkaProducerConfig.getDlqKafkaBrokers()); + props.put("acks", dlqKafkaProducerConfig.getDlqKafkaAcks()); + props.put("retries", dlqKafkaProducerConfig.getDlqKafkaRetries()); + props.put("batch.size", dlqKafkaProducerConfig.getDlqKafkaBatchSize()); + props.put("linger.ms", dlqKafkaProducerConfig.getDlqKafkaLingerMs()); + props.put("buffer.memory", dlqKafkaProducerConfig.getDlqKafkaBufferMemory()); + props.put("key.serializer", dlqKafkaProducerConfig.getDlqKafkaKeySerializer()); + props.put("value.serializer", dlqKafkaProducerConfig.getDlqKafkaValueSerializer()); + props.putAll(getAdditionalKafkaConfiguration(kafkaProducerTypesMetadata, configurations)); return new KafkaProducer<>(props); } + + private static Properties getAdditionalKafkaConfiguration(KafkaProducerTypesMetadata kafkaProducerTypesMetadata, Map configurations) { + Properties additionalProperties = new Properties(); + configurations.forEach((key, value) -> { + Matcher matcher = kafkaProducerTypesMetadata.getConfigurationPattern().matcher(key); + if (matcher.find()) { + additionalProperties.put(matcher.group(1).replaceAll("_", ".").toLowerCase(), value); + } + }); + return additionalProperties; + } } diff --git a/src/test/java/com/gotocompany/firehose/utils/KafkaUtilsTest.java b/src/test/java/com/gotocompany/firehose/utils/KafkaUtilsTest.java new file mode 100644 index 000000000..6579695eb --- /dev/null +++ b/src/test/java/com/gotocompany/firehose/utils/KafkaUtilsTest.java @@ -0,0 +1,103 @@ +package com.gotocompany.firehose.utils; + +import com.gotocompany.firehose.config.DlqKafkaProducerConfig; +import org.aeonbits.owner.ConfigFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.types.Password; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaUtilsTest { + + private static final String DLQ_KAFKA_ACKS = "DLQ_KAFKA_ACKS"; + private static final String DLQ_KAFKA_RETRIES = "DLQ_KAFKA_RETRIES"; + private static final String DLQ_KAFKA_BATCH_SIZE = "DLQ_KAFKA_BATCH_SIZE"; + private static final String DLQ_KAFKA_LINGER_MS = "DLQ_KAFKA_LINGER_MS"; + private static final String DLQ_KAFKA_BUFFER_MEMORY = "DLQ_KAFKA_BUFFER_MEMORY"; + private static final String DLQ_KAFKA_KEY_SERIALIZER = "DLQ_KAFKA_KEY_SERIALIZER"; + private static final String DLQ_KAFKA_VALUE_SERIALIZER = "DLQ_KAFKA_VALUE_SERIALIZER"; + private static final String DLQ_KAFKA_BROKERS = "DLQ_KAFKA_BROKERS"; + private static final String DLQ_KAFKA_TOPIC = "DLQ_KAFKA_TOPIC"; + private static final String DLQ_KAFKA_SECURITY_PROTOCOL = "dlQ_KaFKa_SeCuRITY_proTOCOL"; + private static final String DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD_CONFIG = "DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD"; + private static final String DLQ_KAFKA_SASL_MECHANISM = "DLQ_KAFKA_SASL_MECHANISM"; + private static final String DLQ_KAFKA_SASL_JAAS_CONFIG = "DLQ_KAFKA_SASL_JAAS_CONFIG"; + + @Test + public void createShouldReturnKafkaProducerWithCorrectProperties() throws NoSuchFieldException, IllegalAccessException { + Map properties = getDlqProperties(); + DlqKafkaProducerConfig dlqKafkaProducerConfig = ConfigFactory.create(DlqKafkaProducerConfig.class, properties); + + KafkaProducer kafkaProducer = KafkaUtils.getKafkaProducer(KafkaProducerTypesMetadata.DLQ, dlqKafkaProducerConfig, properties); + Field producerConfigField = KafkaProducer.class.getDeclaredField("producerConfig"); + producerConfigField.setAccessible(true); + ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer); + + assertEquals(properties.get(DLQ_KAFKA_ACKS), producerConfig.getString("acks")); + assertEquals(properties.get(DLQ_KAFKA_RETRIES), String.valueOf(producerConfig.getInt("retries"))); + assertEquals(properties.get(DLQ_KAFKA_BATCH_SIZE), String.valueOf(producerConfig.getInt("batch.size"))); + assertEquals(properties.get(DLQ_KAFKA_LINGER_MS), String.valueOf(producerConfig.getLong("linger.ms"))); + assertEquals(properties.get(DLQ_KAFKA_BUFFER_MEMORY), String.valueOf(producerConfig.getLong("buffer.memory"))); + assertEquals(properties.get(DLQ_KAFKA_KEY_SERIALIZER), producerConfig.getClass("key.serializer").getName()); + assertEquals(properties.get(DLQ_KAFKA_VALUE_SERIALIZER), producerConfig.getClass("value.serializer").getName()); + assertEquals(Arrays.asList(properties.get(DLQ_KAFKA_BROKERS).split(",")), producerConfig.getList("bootstrap.servers")); + assertEquals(properties.get(DLQ_KAFKA_SECURITY_PROTOCOL), producerConfig.getString("security.protocol")); + assertEquals(properties.get(DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD_CONFIG), producerConfig.getString("ssl.truststore.password")); + assertEquals(properties.get(DLQ_KAFKA_SASL_MECHANISM), producerConfig.getString("sasl.mechanism")); + assertEquals(new Password(properties.get(DLQ_KAFKA_SASL_JAAS_CONFIG)), producerConfig.getPassword("sasl.jaas.config")); + } + + @Test + public void createShouldIgnoreParametersNotMatchingPrefix() throws NoSuchFieldException, IllegalAccessException { + Map properties = getDlqProperties(); + properties.put("DLQ_KAFKACLIENT_ID", "clientId"); + properties.put("NOTDLQ_KAFKA_CLIENT_ID", "clientId"); + DlqKafkaProducerConfig kafkaProducerConfig = ConfigFactory.create(DlqKafkaProducerConfig.class, properties); + + KafkaProducer kafkaProducer = KafkaUtils.getKafkaProducer(KafkaProducerTypesMetadata.DLQ, kafkaProducerConfig, properties); + Field producerConfigField = KafkaProducer.class.getDeclaredField("producerConfig"); + producerConfigField.setAccessible(true); + ProducerConfig producerConfig = (ProducerConfig) producerConfigField.get(kafkaProducer); + + assertEquals(properties.get(DLQ_KAFKA_ACKS), producerConfig.getString("acks")); + assertEquals(properties.get(DLQ_KAFKA_RETRIES), String.valueOf(producerConfig.getInt("retries"))); + assertEquals(properties.get(DLQ_KAFKA_BATCH_SIZE), String.valueOf(producerConfig.getInt("batch.size"))); + assertEquals(properties.get(DLQ_KAFKA_LINGER_MS), String.valueOf(producerConfig.getLong("linger.ms"))); + assertEquals(properties.get(DLQ_KAFKA_BUFFER_MEMORY), String.valueOf(producerConfig.getLong("buffer.memory"))); + assertEquals(properties.get(DLQ_KAFKA_KEY_SERIALIZER), producerConfig.getClass("key.serializer").getName()); + assertEquals(properties.get(DLQ_KAFKA_VALUE_SERIALIZER), producerConfig.getClass("value.serializer").getName()); + assertEquals(Arrays.asList(properties.get(DLQ_KAFKA_BROKERS).split(",")), producerConfig.getList("bootstrap.servers")); + assertEquals(properties.get(DLQ_KAFKA_SECURITY_PROTOCOL), producerConfig.getString("security.protocol")); + assertEquals(properties.get(DLQ_KAFKA_SSL_TRUSTSTORE_PASSWORD_CONFIG), producerConfig.getString("ssl.truststore.password")); + assertEquals(properties.get(DLQ_KAFKA_SASL_MECHANISM), producerConfig.getString("sasl.mechanism")); + assertEquals(new Password(properties.get(DLQ_KAFKA_SASL_JAAS_CONFIG)), producerConfig.getPassword("sasl.jaas.config")); + assertTrue(StringUtils.isEmpty(producerConfig.getString("client.id"))); + + } + + private static Map getDlqProperties() { + Map properties = new HashMap<>(); + properties.put(DLQ_KAFKA_ACKS, "all"); + properties.put(DLQ_KAFKA_RETRIES, "2147483647"); + properties.put(DLQ_KAFKA_BATCH_SIZE, "16384"); + properties.put(DLQ_KAFKA_LINGER_MS, "0"); + properties.put(DLQ_KAFKA_BUFFER_MEMORY, "33554432"); + properties.put(DLQ_KAFKA_KEY_SERIALIZER, "org.apache.kafka.common.serialization.ByteArraySerializer"); + properties.put(DLQ_KAFKA_VALUE_SERIALIZER, "org.apache.kafka.common.serialization.ByteArraySerializer"); + properties.put(DLQ_KAFKA_BROKERS, "localhost:9092"); + properties.put(DLQ_KAFKA_TOPIC, "firehose-retry-topic"); + properties.put(DLQ_KAFKA_SECURITY_PROTOCOL, "SASL_SSL"); + properties.put(DLQ_KAFKA_SASL_MECHANISM, "OAUTHBEARER"); + properties.put(DLQ_KAFKA_SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"); + return properties; + } +}