-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Proper termination sequence
librdkafka is asynchronous in its nature and performs most operation in its background threads. It simply tells librdkafka to decommission the provided instance but it will take some time in doing so; cleanly closing network connections, terminating internal threads, etc.
All non-handle (C: rd_kafka_t, C++: Consumer,KafkaConsumer,Producer) objects related to a handle, such as topic objects, messages, topic_partition_t, TopicPartition, events, etc, MUST be destroyed/deleted prior to destroying or closing the handle.
For C:
- rd_kafka_message_t
- rd_kafka_topic_t
- rd_kafka_topic_partition_t
- rd_kafka_topic_partition_list_t
- rd_kafka_event_t
For C++:
- Message
- Topic
- TopicPartition
- Event
Proper termination sequence for the high-level KafkaConsumer is:
/* 1) Close the consumer, committing final offsets, etc. */
rd_kafka_consumer_close(rk);
/* 2) Destroy handle object */
rd_kafka_destroy(rk);
NOTE: There is no need to unsubscribe prior to calling consumer close()
NOTE: Any topic objects created must be destroyed prior to rd_kafka_destroy()
Effects of not doing the above, for:
-
- Final offsets are not committed.
-
- librdkafka will continue to operate on the handles. Actual memory leaks.
The proper termination sequence for Producers and simple legacy consumers sequence is:
/* 1) Make sure all outstanding requests are transmitted and handled. */
while (rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 50);
/* 2) Destroy the topic and handle objects */
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
/* NOTE: This is only needed for librdkafka versions < 0.9, since 0.9 the rd_kafka_destroy() call is blocking. */
/* 3) Wait for ALL rdkafka handles to decommission, this is typically done at the very end of an application. */
rd_kafka_wait_destroyed();
Effects of not doing the above, for:
-
- Outstanding produce or offsetCommit requests may be dropped
-
- librdkafka will continue to operate on the handles. Actual memory leaks.
-
- Only versions < 0.9: Cosmetic memory leaks reported by memory profilers (e.g., valgrind)
To speed up the termination of librdkafka an application can set a termination signal that will be used by librdkafka internally to quickly cancel any outstanding I/O waits. Make sure you block this signal in your application.
char tmp[16];
snprintf(tmp, sizeof(tmp), "%i", SIGIO); /* Or whatever signal you decide */
rd_kafka_conf_set(rk_conf, "internal.termination.signal", tmp, errstr, sizeof(errstr));