Skip to content

Latest commit

 

History

History
377 lines (316 loc) · 11.1 KB

README.md

File metadata and controls

377 lines (316 loc) · 11.1 KB

Tarantool kafka

Full featured high performance kafka library for Tarantool based on librdkafka.

Can produce more then 150k messages per second and consume more then 140k messages per second.

Features

  • Kafka producer and consumer implementations.
  • Fiber friendly.
  • Mostly errorless functions and methods. Error handling in Tarantool ecosystem is quite a mess, some libraries throws lua native error while others throws box.error instead. kafka returns non critical errors as strings which allows you to decide how to handle it.

Requirements

  • Tarantool >= 1.10.2
  • Tarantool development headers
  • librdkafka >= 0.11.5
  • librdkafka development headers
  • make
  • cmake
  • gcc

Installation

    tarantoolctl rocks install kafka

Build module with statically linked librdkafka

To install kafka module with builtin librdkafka dependency, use option STATIC_BUILD:

tarantoolctl rocks STATIC_BUILD=ON install kafka

Examples

Consumer

With auto offset store

    local fiber = require('fiber')
    local os = require('os')
    local log = require('log')
    local tnt_kafka = require('kafka')
    
    local error_callback = function(err)
        log.error("got error: %s", err)
    end
    local log_callback = function(fac, str, level)
        log.info("got log: %d - %s - %s", level, fac, str)
    end
    local rebalance_callback = function(msg)
        log.info("got rebalance msg: %s", json.encode(msg))
    end

    local consumer, err = tnt_kafka.Consumer.create({
        brokers = "localhost:9092", -- brokers for bootstrap
        options = {
            ["enable.auto.offset.store"] = "true",
            ["group.id"] = "example_consumer",
            ["auto.offset.reset"] = "earliest",
            ["enable.partition.eof"] = "false"
        }, -- options for librdkafka
        error_callback = error_callback, -- optional callback for errors
        log_callback = log_callback, -- optional callback for logs and debug messages
        rebalance_callback = rebalance_callback,  -- optional callback for rebalance messages
        default_topic_options = {
            ["auto.offset.reset"] = "earliest",
        }, -- optional default topic options
    })
    if err ~= nil then
        print(err)
        os.exit(1)
    end

    local err = consumer:subscribe({"test_topic"}) -- array of topics to subscribe
    if err ~= nil then
        print(err)
        os.exit(1)
    end
    
    fiber.create(function()
        local out, err = consumer:output()
        if err ~= nil then
            print(string.format("got fatal error '%s'", err))
            return
        end
        
        while true do
            if out:is_closed() then
                return
            end

            local msg = out:get()
            if msg ~= nil then
                print(string.format(
                    "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'", 
                    msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value()
                ))
            end
        end
    end)
    
    fiber.sleep(10)
    
    local err = consumer:unsubscribe({"test_topic"}) -- array of topics to unsubscribe
    if err ~= nil then
        print(err)
        os.exit(1)
    end
    
    local err = consumer:close() -- always stop consumer to commit all pending offsets before app close
    if err ~= nil then
        print(err)
        os.exit(1)
    end

With multiple fibers and manual offset store

    local fiber = require('fiber')
    local os = require('os')
    local log = require('log')
    local tnt_kafka = require('kafka')
    
    local error_callback = function(err)
        log.error("got error: %s", err)
    end
    local log_callback = function(fac, str, level)
        log.info("got log: %d - %s - %s", level, fac, str)
    end
    local rebalance_callback = function(msg)
        log.info("got rebalance msg: %s", json.encode(msg))
    end

    local consumer, err = tnt_kafka.Consumer.create({
        brokers = "localhost:9092", -- brokers for bootstrap
        options = {
            ["enable.auto.offset.store"] = "false",
            ["group.id"] = "example_consumer",
            ["auto.offset.reset"] = "earliest",
            ["enable.partition.eof"] = "false"
        }, -- options for librdkafka
        error_callback = error_callback, -- optional callback for errors
        log_callback = log_callback, -- optional callback for logs and debug messages
        rebalance_callback = rebalance_callback,  -- optional callback for rebalance messages
        default_topic_options = {
            ["auto.offset.reset"] = "earliest",
        }, -- optional default topic options
    })
    if err ~= nil then
        print(err)
        os.exit(1)
    end

    local err = consumer:subscribe({"test_topic"}) -- array of topics to subscribe
    if err ~= nil then
        print(err)
        os.exit(1)
    end
    
    for i = 1, 10 do
        fiber.create(function()
            local out, err = consumer:output()
            if err ~= nil then
                print(string.format("got fatal error '%s'", err))
                return
            end
            while true do
                if out:is_closed() then
                    return
                end
    
                local msg = out:get()
                if msg ~= nil then
                    print(string.format(
                        "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'", 
                        msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value()
                    ))
                    
                    local err = consumer:store_offset(msg) -- don't forget to commit processed messages
                    if err ~= nil then
                        print(string.format(
                            "got error '%s' while commiting msg from topic '%s'", 
                            err, msg:topic()
                        ))
                    end
                end
            end
        end)
    end    
    
    fiber.sleep(10)
    
    local err = consumer:unsubscribe({"test_topic"}) -- array of topics to unsubscribe
    if err ~= nil then
        print(err)
        os.exit(1)
    end
    
    local err = consumer:close() -- always stop consumer to commit all pending offsets before app close
    if err ~= nil then
        print(err)
        os.exit(1)
    end

Producer

With single fiber and async producer

    local os = require('os')
    local log = require('log')
    local tnt_kafka = require('kafka')
    
    local error_callback = function(err)
        log.error("got error: %s", err)
    end
    local log_callback = function(fac, str, level)
        log.info("got log: %d - %s - %s", level, fac, str)
    end
    
    local producer, err = tnt_kafka.Producer.create({
        brokers = "kafka:9092", -- brokers for bootstrap
        options = {}, -- options for librdkafka
        error_callback = error_callback, -- optional callback for errors
        log_callback = log_callback, -- optional callback for logs and debug messages
        default_topic_options = {
            ["partitioner"] = "murmur2_random",
        }, -- optional default topic options
    })
    if err ~= nil then
        print(err)
        os.exit(1)
    end
    
    for i = 1, 1000 do    
        local err = producer:produce_async({ -- don't wait until message will be delivired to kafka
            topic = "test_topic",
            key = "test_key",
            value = "test_value" -- only strings allowed
        })
        if err ~= nil then
            print(err)
            os.exit(1)
        end
    end
    
    local err = producer:close() -- always stop consumer to send all pending messages before app close
    if err ~= nil then
        print(err)
        os.exit(1)
    end

With multiple fibers and sync producer

    local fiber = require('fiber')
    local os = require('os')
    local log = require('log')
    local tnt_kafka = require('kafka')
    
    local error_callback = function(err)
        log.error("got error: %s", err)
    end
    local log_callback = function(fac, str, level)
        log.info("got log: %d - %s - %s", level, fac, str)
    end
    
    local producer, err = tnt_kafka.Producer.create({
        brokers = "kafka:9092", -- brokers for bootstrap
        options = {}, -- options for librdkafka
        error_callback = error_callback, -- optional callback for errors
        log_callback = log_callback, -- optional callback for logs and debug messages
        default_topic_options = {
            ["partitioner"] = "murmur2_random",
        }, -- optional default topic options
    })
    if err ~= nil then
        print(err)
        os.exit(1)
    end
    
    for i = 1, 1000 do
        fiber.create(function()
            local message = "test_value " .. tostring(i)
            local err = producer:produce({ -- wait until message will be delivired to kafka (using channel under the hood)
                topic = "test_topic",
                key = "test_key", 
                value =  message -- only strings allowed
            })
            if err ~= nil then
                print(string.format("got error '%s' while sending value '%s'", err, message))
            else
                print(string.format("successfully sent value '%s'", message))
            end
        end)
    end
    
    fiber.sleep(10)
    
    local err = producer:close() -- always stop consumer to send all pending messages before app close
    if err ~= nil then
        print(err)
        os.exit(1)
    end

Known issues

  • Consumer and Producer leaves some non gc'able objects in memory after has been stopped. It was done intentionally because rd_kafka_destroy sometimes hangs forever.

TODO

  • Ordered storage for offsets to prevent commits unprocessed messages
  • Fix known issues
  • More examples
  • Better documentation

Benchmarks

Producer

Async

Result: over 160000 produced messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-async-producer-topic
    make docker-run-benchmark-async-producer-interactive

Sync

Result: over 90000 produced messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-sync-producer-topic
    make docker-run-benchmark-sync-producer-interactive

Consumer

Auto offset store enabled

Result: over 190000 consumed messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-auto-offset-store-consumer-topic
    make docker-run-benchmark-auto-offset-store-consumer-interactive

Manual offset store

Result: over 190000 consumed messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-manual-commit-consumer-topic
    make docker-run-benchmark-manual-commit-consumer-interactive

Developing

Tests

You can run docker based integration tests via makefile target

    make test-run-with-docker