diff --git a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala index 376db278..703310a7 100644 --- a/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala +++ b/modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/sinks/kafka/KafkaSink.scala @@ -7,49 +7,66 @@ */ package com.snowplowanalytics.snowplow.sinks.kafka -import cats.effect.Async -import cats.effect.kernel.Resource -import cats.implicits._ -import cats.Monad -import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable} -import fs2.kafka._ +import cats.effect.{Async, Resource, Sync} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} -import scala.reflect._ +import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable} +import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler import java.util.UUID - -import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler +import java.nio.charset.StandardCharsets +import scala.reflect.ClassTag +import scala.jdk.CollectionConverters._ object KafkaSink { def resource[F[_]: Async, T <: AzureAuthenticationCallbackHandler]( config: KafkaSinkConfig, authHandlerClass: ClassTag[T] - ): Resource[F, Sink[F]] = { - val producerSettings = - ProducerSettings[F, String, Array[Byte]] - .withProperty("sasl.login.callback.handler.class", authHandlerClass.runtimeClass.getName) - .withBootstrapServers(config.bootstrapServers) - .withProperties(config.producerConf) - + ): Resource[F, Sink[F]] = for { - producer <- KafkaProducer[F].resource(producerSettings) - } yield fromFs2Producer(config, producer) + producer <- makeProducer(config, authHandlerClass) + } yield impl(config, producer) + + private def makeProducer[F[_]: Async, T <: AzureAuthenticationCallbackHandler]( + config: KafkaSinkConfig, + authHandlerClass: ClassTag[T] + ): Resource[F, KafkaProducer[String, Array[Byte]]] = { + val producerSettings = Map( + "bootstrap.servers" -> config.bootstrapServers, + "sasl.login.callback.handler.class" -> authHandlerClass.runtimeClass.getName, + "key.serializer" -> classOf[StringSerializer].getName, + "value.serializer" -> classOf[ByteArraySerializer].getName + ) ++ config.producerConf + val make = Sync[F].delay { + new KafkaProducer[String, Array[Byte]]((producerSettings: Map[String, AnyRef]).asJava) + } + Resource.make(make)(p => Sync[F].blocking(p.close)) } - private def fromFs2Producer[F[_]: Monad](config: KafkaSinkConfig, producer: KafkaProducer[F, String, Array[Byte]]): Sink[F] = + private def impl[F[_]: Sync](config: KafkaSinkConfig, producer: KafkaProducer[String, Array[Byte]]): Sink[F] = Sink { batch => - val records = batch.copyToChunk.map(toProducerRecord(config, _)) - producer.produce(records).flatten.void + Sync[F].interruptible { + val futures = batch.asIterable.map { e => + val record = toProducerRecord(config, e) + producer.send(record) + }.toIndexedSeq + + futures.foreach(_.get) + } } private def toProducerRecord(config: KafkaSinkConfig, sinkable: Sinkable): ProducerRecord[String, Array[Byte]] = { - val headers = Headers.fromIterable { - sinkable.attributes.map { case (k, v) => - Header(k, v) + + val headers = sinkable.attributes.map { case (k, v) => + new Header { + def key: String = k + def value: Array[Byte] = v.getBytes(StandardCharsets.UTF_8) } } - ProducerRecord(config.topicName, sinkable.partitionKey.getOrElse(UUID.randomUUID.toString), sinkable.bytes) - .withHeaders(headers) + + new ProducerRecord(config.topicName, null, sinkable.partitionKey.getOrElse(UUID.randomUUID.toString), sinkable.bytes, headers.asJava) } }