Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Karafka integration #4147

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions appraisal/ruby-2.7.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc'
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 2.7 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.0.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 3.0 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.4.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
406 changes: 214 additions & 192 deletions docs/GettingStarted.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions lib/datadog/tracing/contrib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module Contrib
require_relative 'contrib/httprb/integration'
require_relative 'contrib/integration'
require_relative 'contrib/kafka/integration'
require_relative 'contrib/karafka'
require_relative 'contrib/lograge/integration'
require_relative 'contrib/mongodb/integration'
require_relative 'contrib/mysql2/integration'
Expand Down
37 changes: 37 additions & 0 deletions lib/datadog/tracing/contrib/karafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

require_relative 'component'
require_relative 'karafka/integration'
require_relative 'karafka/distributed/propagation'

module Datadog
module Tracing
module Contrib
# `Karafka` integration public API
module Karafka
def self.inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

def self.extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('karafka') do |config|
tracing = config.tracing
tracing.propagation_style

@propagation = Sidekiq::Distributed::Propagation.new(
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
end
27 changes: 27 additions & 0 deletions lib/datadog/tracing/contrib/karafka/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require_relative '../../configuration/settings'
require_relative '../ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Configuration
# @public_api
class Settings < Contrib::Configuration::Settings
option :enabled do |o|
o.type :bool
o.env Ext::ENV_ENABLED
o.default true
end

option :service_name

option :distributed_tracing, default: false, type: :bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, for your use case, wouldn't you prefer that distributed_tracing is enabled by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually in limbo with it. The reason why I disable it by default is the same reason as the distributed_tracing of Sidekiq. The trace is actually easily blown up and getting out of control if we turn it on by default.

end
end
end
end
end
end
47 changes: 47 additions & 0 deletions lib/datadog/tracing/contrib/karafka/distributed/propagation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

require_relative '../../../distributed/fetcher'
require_relative '../../../distributed/propagation'
require_relative '../../../distributed/b3_multi'
require_relative '../../../distributed/b3_single'
require_relative '../../../distributed/datadog'
require_relative '../../../distributed/none'
require_relative '../../../distributed/trace_context'
require_relative '../../../configuration/ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Distributed
# Extracts and injects propagation through Kafka message headers.
class Propagation < Tracing::Distributed::Propagation
def initialize(
propagation_style_inject:,
propagation_style_extract:,
propagation_extract_first:
)
super(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
},
propagation_style_inject: propagation_style_inject,
propagation_style_extract: propagation_style_extract,
propagation_extract_first: propagation_extract_first
)
end
end
end
end
end
end
end

28 changes: 28 additions & 0 deletions lib/datadog/tracing/contrib/karafka/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

require_relative '../analytics'
require_relative 'ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Event
def self.included(base)
base.extend(ClassMethods)
end

module ClassMethods
def span_options
{ service: configuration[:service_name] }
end

def configuration
Datadog.configuration.tracing[:karafka]
end
end
end
end
end
end
end
28 changes: 28 additions & 0 deletions lib/datadog/tracing/contrib/karafka/events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

require_relative 'events/worker/process'

module Datadog
module Tracing
module Contrib
module Karafka
# Defines collection of instrumented Kafka events
module Events
ALL = [
Events::Worker::Process,
]

module_function

def all
self::ALL
end

def subscribe!
all.each(&:subscribe!)
end
end
end
end
end
end
28 changes: 28 additions & 0 deletions lib/datadog/tracing/contrib/karafka/events/error/occur.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

require_relative '../../ext'
require_relative '../../event'

require 'byebug'

module Datadog
module Tracing
module Contrib
module Karafka
module Events
module Error
module Occur
include Karafka::Event

def self.subscribe!
::Karafka.monitor.subscribe 'error.consume' do |event|
end
end
end
end
end
end
end
end
end

77 changes: 77 additions & 0 deletions lib/datadog/tracing/contrib/karafka/events/worker/process.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# frozen_string_literal: true

require_relative '../../ext'
require_relative '../../event'

module Datadog
module Tracing
module Contrib
module Karafka
module Events
module Worker
module Process
include Karafka::Event

def self.subscribe!
::Karafka.monitor.subscribe 'worker.process' do |event|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not recommend using this layer for instrumentation of that type and would advise you to reconsider something "closer" to the actual execution of work.

Copy link
Contributor Author

@nvh0412 nvh0412 Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for you to explain what "closer" means here :)? I utilised what you did on instrumentation/vendors/datadog/logger_listener.rb here, but what I'm reconsidering here that we don't actually need to subscribe to this event anymore, because our goal is to have message traces for each message inside messages enumerator, so we can link them to the distributed traces. So I can remove this even if everything is getting out of control here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Karafka has several layers of reporting when a "job" is executed. The worker level is the highest, and I think of it as conceptually "distant" from the end user code execution layer. In between those, there are coordinators, executors, and more. The closest to the user code is the one that has consumer. events, and while I myself use the worker level once in a while, I, in general, do not recommend it and recommend using the one mentioned above. At some point, I will probably migrate the once that I wrote myself. There is nothing fundamentally wrong about using the worker one but as mentioned, there's a lot in between.

because our goal is to have message traces for each message inside messages enumerator

But this is not the only way users process code. You need to keep in mind users that do batch operations as well, thus you want to trace around all the operational code also.

# Start a trace
span = Tracing.trace(Ext::SPAN_WORKER_PROCESS, **span_options)

job = event[:job]
job_type = fetch_job_type(job.class)
consumer = job.executor.topic.consumer
topic = job.executor.topic.name

action = case job_type
when 'Periodic'
'tick'
when 'PeriodicNonBlocking'
'tick'
when 'Shutdown'
'shutdown'
when 'Revoked'
'revoked'
when 'RevokedNonBlocking'
'revoked'
when 'Idle'
'idle'
when 'Eofed'
'eofed'
when 'EofedNonBlocking'
'eofed'
else
'consume'
end

span.resource = "#{consumer}##{action}"
span.set_tag(Ext::TAG_TOPIC, topic) if topic

if action == 'consume'
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count)
span.set_tag(Ext::TAG_PARTITION, job.executor.partition)
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset)
end

span
end

::Karafka.monitor.subscribe 'worker.completed' do |event|
Tracing.active_span&.finish
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having separate locations for span creation Tracing.trace and span conclusion Tracing.active_span&.finish is always a possible source of hard-to-debug errors and span leakage (and thus memory leaks). We normally only do it when it's impossible to use Tracing.trace { do_work_here }.

Also, Tracing.trace { do_work_here } takes care of error handling, properly tagging the current span with error information.

In this case, we have the event worker.processed that looks like it's just what we need:
https://github.com/karafka/karafka/blob/ab4f9bcd3620f46adb8c0d158b5396b245619ed3/lib/karafka/processing/worker.rb#L58-L78

Except that it doesn't call the event listeners when an error is raised by the job. There are no error handlers in this method that call assigned_listeners: https://github.com/karafka/karafka-core/blob/a1425725d275796673424c1cd9be517d06518ec9/lib/karafka/core/monitoring/notifications.rb#L120

I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library: karafka/karafka-core#145

The being said, I still lean towards using a single Tracing.trace { do_work_here } only because the safety of not having to worry about leaky spans is too advantageous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s precisely what I’d like to hear from the Datadog team regarding this. My initial intuition when using Tracing.active_span was that I couldn’t be certain whether the span I’m currently working on is the one I want to complete. Considering this and your insights, would it be feasible to remove this tracing event? At the moment, I prefer to retain the Ext::SPAN_MESSAGE_CONSUME event only for this integration, as it effectively meets my requirement: enabling distributed tracing, ensuring that each message span is linked to the root trace in the distributed tracing system.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library

This PR is not needed to achieve the expected wrapping because it is already done for example for OpenTelemetry:

ref1: https://karafka.io/docs/Monitoring-and-Logging/#opentelemetry
ref2: https://karafka.io/docs/Monitoring-and-Logging/#monitor-wrapping-and-replacement

end

def self.span_options
super.merge({ tags: { Tracing::Metadata::Ext::TAG_OPERATION => Ext::TAG_OPERATION_PROCESS_BATCH } })
end

def self.fetch_job_type(job_class)
@job_types_cache ||= {}
@job_types_cache[job_class] ||= job_class.to_s.split('::').last
end
end
end
end
end
end
end
end
25 changes: 25 additions & 0 deletions lib/datadog/tracing/contrib/karafka/ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module Datadog
module Tracing
module Contrib
module Karafka
module Ext
ENV_ENABLED = 'DD_TRACE_KARAFKA_ENABLED'

SPAN_MESSAGE_CONSUME = 'karafka.consume'
SPAN_WORKER_PROCESS = 'worker.process'

TAG_TOPIC = 'kafka.topic'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a side note because maybe it is done: please keep in mind that karafka supports multi-consumer group operations in one karafka process thus CG (consumer group) always needs to be reported alongside metrics.

TAG_PARTITION = 'kafka.partition'
TAG_OFFSET = 'kafka.offset'
TAG_OFFSET_LAG = 'kafka.offset_lag'
TAG_MESSAGE_COUNT = 'kafka.message_count'
TAG_MESSAGE_KEY = 'kafka.message_key'

TAG_OPERATION_PROCESS_BATCH = 'consumer.process_batch'
end
end
end
end
end
Loading