-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Proper termination sequence
edenhill edited this page Dec 22, 2014
·
8 revisions
librdkafka is asynchronous in its nature and this is also true for the rd_kafka_destroy()
call.
It simply tells librdkafka to decommission the provided instance but it will take some time in doing so;
cleanly closing network connections, terminating internal threads, etc.
The proper termination sequence is:
/* 1) Make sure all outstanding requests are transmitted and handled. */
while (rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 50);
/* 2) Destroy the topic and handle objects */
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
/* 3) Wait for ALL rdkafka handles to decommission, this is typically done at the very end of an application. */
rd_kafka_wait_destroyed();
Effects of not doing the above, for:
-
- Outstanding produce or offsetCommit requests may be dropped
-
- librdkafka will continue to operate on the handles. Actual memory leaks.
-
- Cosmetic memory leaks reported by memory profilers (e.g., valgrind)
To speed up the termination of librdkafka an application can set a termination signal that will be used by librdkafka internally to quickly cancel any outstanding I/O waits. Make sure you block this signal in your application.
char tmp[16];
snprintf(tmp, sizeof(tmp), "%i", SIGIO); /* Or whatever signal you decide */
rd_kafka_conf_set(rk_conf, "internal.termination.signal", tmp, errstr, sizeof(errstr));