From 77cd693d90be19ea22aa78fa63e05e97c9ecf72a Mon Sep 17 00:00:00 2001 From: Marcel Wagner Date: Sun, 7 Mar 2021 21:37:57 +0100 Subject: [PATCH] Kafka-Consumer: Enabled batch processing of incoming metrics. The MaxPayloadSize paramter has to be added to the KAFKA_CONFIG Closing issue #60. Signed-off-by: Marcel Wagner --- build.gradle | 2 +- .../handlers/kafkaconsumer/KafkaConsumer.java | 96 ++++++++++--------- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/build.gradle b/build.gradle index 14666ae..f1c6aad 100644 --- a/build.gradle +++ b/build.gradle @@ -47,7 +47,7 @@ apply plugin: 'checkstyle' apply plugin: 'pmd' checkstyle { - toolVersion = "6.1.1" + toolVersion = "6.4" configFile = rootProject.file('checkstyle/checkstyle.xml') sourceSets = [sourceSets.main] } diff --git a/src/main/java/com/oisp/databackend/handlers/kafkaconsumer/KafkaConsumer.java b/src/main/java/com/oisp/databackend/handlers/kafkaconsumer/KafkaConsumer.java index a32c479..c26dfb6 100644 --- a/src/main/java/com/oisp/databackend/handlers/kafkaconsumer/KafkaConsumer.java +++ b/src/main/java/com/oisp/databackend/handlers/kafkaconsumer/KafkaConsumer.java @@ -1,7 +1,6 @@ package com.oisp.databackend.handlers.kafkaconsumer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.exc.MismatchedInputException; import com.oisp.databackend.config.oisp.OispConfig; import com.oisp.databackend.datasources.DataDao; import com.oisp.databackend.datastructures.Observation; @@ -16,28 +15,26 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ListenerExecutionFailedException; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; -import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; -import org.springframework.retry.support.RetryTemplate; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; -import java.util.Map; import java.util.HashMap; - +import java.util.List; +import java.util.Map; @EnableKafka @Configuration public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); - private static final int MAX_ATTEMPTS = 3; - private static final int MAX_FAILURES = 3; + private final String maxpolls = "1000"; @Autowired private KafkaConsumerProperties kafkaConsumerProperties; @Autowired @@ -71,7 +68,7 @@ public void setDataDao(DataDao dataDao) { } @Bean - public ConsumerFactory consumerFactory() { + public ConsumerFactory consumerFactory() { Map props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, @@ -91,52 +88,61 @@ public ConsumerFactory consumerFactory() { props.put( ConsumerConfig.FETCH_MAX_BYTES_CONFIG, oispConfig.getBackendConfig().getKafkaConfig().getMaxPayloadSize()); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxpolls); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); return new DefaultKafkaConsumerFactory<>(props); } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory(); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); - factory.setStatefulRetry(true); - RetryTemplate retryTemplate = new RetryTemplate(); - retryTemplate.setBackOffPolicy(new ExponentialRandomBackOffPolicy()); - retryTemplate.setThrowLastExceptionOnExhausted(true); - retryTemplate.setRetryPolicy(new SimpleRetryPolicy(MAX_ATTEMPTS)); - factory.setRetryTemplate(retryTemplate); - return factory; - } + factory.setBatchListener(true); - - @Bean - public ErrorHandler seekToCurrentErrorHandler() { - SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(MAX_FAILURES); - seekToCurrentErrorHandler.setCommitRecovered(true); - return seekToCurrentErrorHandler; + return factory; } @KafkaListener(topics = "#{kafkaConsumerProperties.getTopic()}") - public void receive(String rawObservations) throws IOException, ServiceException { + public void receive(List rawObservationList) throws IOException, ServiceException { + logger.debug("Start processing kafka samples batch " + rawObservationList.size()); ObjectMapper mapper = new ObjectMapper(); - Observation[] observations = null; - try { - Observation observation = mapper.readValue(rawObservations, Observation.class); - observations = new Observation[]{observation}; - if ("ByteArray".equals(observation.getDataType())) { - observation.setbValue(Base64.getDecoder().decode(observation.getValue())); - observation.setValue("0"); - } - } catch (IllegalArgumentException | ListenerExecutionFailedException | MismatchedInputException e) { - logger.debug("Tried to parse single observation. Now trying array: " + e); - observations = mapper.readValue(rawObservations, Observation[].class); - } - logger.info("Received Observations in topic " + kafkaConsumerProperties.getTopic() - + ". Message: " + observations.toString()); - if (!dataDao.put(observations)) { + //String rawObservations = rawObservationList.get(0); + List observationList = new ArrayList<>(); + + rawObservationList.forEach(rawObservation -> { + Observation[] observations = null; + if (rawObservation.trim().startsWith("[")) { + try { + observations = mapper.readValue(rawObservation, Observation[].class); + } catch (IllegalArgumentException | ListenerExecutionFailedException + | com.fasterxml.jackson.core.JsonProcessingException e) { + logger.warn("Tried to parse array. Will ignore the sample: " + e); + } + } else { + try { + Observation observation = mapper.readValue(rawObservation, Observation.class); + observations = new Observation[]{observation}; + if ("ByteArray".equals(observation.getDataType())) { + observation.setbValue(Base64.getDecoder().decode(observation.getValue())); + observation.setValue("0"); + } + } catch (IllegalArgumentException | ListenerExecutionFailedException + | com.fasterxml.jackson.core.JsonProcessingException e) { + logger.warn("Tried to parse single observation. Will ignore the sample " + e); + } + } + if (observations != null) { + logger.debug("Received Observations in topic " + kafkaConsumerProperties.getTopic() + + ". Message: " + observations.toString()); + observationList.addAll(Arrays.asList(observations)); + } + }); + if (!dataDao.put(observationList.stream().toArray(Observation[]::new))) { throw new ServiceException("Data store error."); } - + logger.debug("End processing kafka sample"); } } \ No newline at end of file