Skip to content

Commit

Permalink
Add Karafka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
nvh0412 committed Nov 22, 2024
1 parent 0be95cb commit dd02868
Show file tree
Hide file tree
Showing 19 changed files with 709 additions and 192 deletions.
1 change: 1 addition & 0 deletions appraisal/ruby-2.7.rb
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc'
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.0.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
1 change: 1 addition & 0 deletions appraisal/ruby-3.4.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'
gem 'mongo', '>= 2.8.0', '< 2.15.0' # TODO: FIX TEST BREAKAGES ON >= 2.15 https://github.com/DataDog/dd-trace-rb/issues/1596
gem 'rack-test' # Dev dependencies for testing rack-based code
gem 'rake', '>= 12.3'
Expand Down
406 changes: 214 additions & 192 deletions docs/GettingStarted.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions lib/datadog/tracing/contrib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module Contrib
require_relative 'contrib/httprb/integration'
require_relative 'contrib/integration'
require_relative 'contrib/kafka/integration'
require_relative 'contrib/karafka'
require_relative 'contrib/lograge/integration'
require_relative 'contrib/mongodb/integration'
require_relative 'contrib/mysql2/integration'
Expand Down
37 changes: 37 additions & 0 deletions lib/datadog/tracing/contrib/karafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

require_relative 'component'
require_relative 'karafka/integration'
require_relative 'karafka/distributed/propagation'

module Datadog
module Tracing
module Contrib
# `Karafka` integration public API
module Karafka
def self.inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

def self.extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('karafka') do |config|
tracing = config.tracing
tracing.propagation_style

@propagation = Sidekiq::Distributed::Propagation.new(
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
end
27 changes: 27 additions & 0 deletions lib/datadog/tracing/contrib/karafka/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require_relative '../../configuration/settings'
require_relative '../ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Configuration
# @public_api
class Settings < Contrib::Configuration::Settings
option :enabled do |o|
o.type :bool
o.env Ext::ENV_ENABLED
o.default true
end

option :service_name

option :distributed_tracing, default: false, type: :bool
end
end
end
end
end
end
47 changes: 47 additions & 0 deletions lib/datadog/tracing/contrib/karafka/distributed/propagation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

require_relative '../../../distributed/fetcher'
require_relative '../../../distributed/propagation'
require_relative '../../../distributed/b3_multi'
require_relative '../../../distributed/b3_single'
require_relative '../../../distributed/datadog'
require_relative '../../../distributed/none'
require_relative '../../../distributed/trace_context'
require_relative '../../../configuration/ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Distributed
# Extracts and injects propagation through Kafka message headers.
class Propagation < Tracing::Distributed::Propagation
def initialize(
propagation_style_inject:,
propagation_style_extract:,
propagation_extract_first:
)
super(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
},
propagation_style_inject: propagation_style_inject,
propagation_style_extract: propagation_style_extract,
propagation_extract_first: propagation_extract_first
)
end
end
end
end
end
end
end

28 changes: 28 additions & 0 deletions lib/datadog/tracing/contrib/karafka/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

require_relative '../analytics'
require_relative 'ext'

module Datadog
module Tracing
module Contrib
module Karafka
module Event
def self.included(base)
base.extend(ClassMethods)
end

module ClassMethods
def span_options
{ service: configuration[:service_name] }
end

def configuration
Datadog.configuration.tracing[:karafka]
end
end
end
end
end
end
end
28 changes: 28 additions & 0 deletions lib/datadog/tracing/contrib/karafka/events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

require_relative 'events/worker/process'

module Datadog
module Tracing
module Contrib
module Karafka
# Defines collection of instrumented Kafka events
module Events
ALL = [
Events::Worker::Process,
]

module_function

def all
self::ALL
end

def subscribe!
all.each(&:subscribe!)
end
end
end
end
end
end
28 changes: 28 additions & 0 deletions lib/datadog/tracing/contrib/karafka/events/error/occur.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

require_relative '../../ext'
require_relative '../../event'

require 'byebug'

module Datadog
module Tracing
module Contrib
module Karafka
module Events
module Error
module Occur
include Karafka::Event

def self.subscribe!
::Karafka.monitor.subscribe 'error.consume' do |event|
end
end
end
end
end
end
end
end
end

77 changes: 77 additions & 0 deletions lib/datadog/tracing/contrib/karafka/events/worker/process.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# frozen_string_literal: true

require_relative '../../ext'
require_relative '../../event'

module Datadog
module Tracing
module Contrib
module Karafka
module Events
module Worker
module Process
include Karafka::Event

def self.subscribe!
::Karafka.monitor.subscribe 'worker.process' do |event|
# Start a trace
span = Tracing.trace(Ext::SPAN_WORKER_PROCESS, **span_options)

job = event[:job]
job_type = fetch_job_type(job.class)
consumer = job.executor.topic.consumer
topic = job.executor.topic.name

action = case job_type
when 'Periodic'
'tick'
when 'PeriodicNonBlocking'
'tick'
when 'Shutdown'
'shutdown'
when 'Revoked'
'revoked'
when 'RevokedNonBlocking'
'revoked'
when 'Idle'
'idle'
when 'Eofed'
'eofed'
when 'EofedNonBlocking'
'eofed'
else
'consume'
end

span.resource = "#{consumer}##{action}"
span.set_tag(Ext::TAG_TOPIC, topic) if topic

if action == 'consume'
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count)
span.set_tag(Ext::TAG_PARTITION, job.executor.partition)
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset)
end

span
end

::Karafka.monitor.subscribe 'worker.completed' do |event|
Tracing.active_span&.finish
end
end

def self.span_options
super.merge({ tags: { Tracing::Metadata::Ext::TAG_OPERATION => Ext::TAG_OPERATION_PROCESS_BATCH } })
end

def self.fetch_job_type(job_class)
@job_types_cache ||= {}
@job_types_cache[job_class] ||= job_class.to_s.split('::').last
end
end
end
end
end
end
end
end
25 changes: 25 additions & 0 deletions lib/datadog/tracing/contrib/karafka/ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module Datadog
module Tracing
module Contrib
module Karafka
module Ext
ENV_ENABLED = 'DD_TRACE_KARAFKA_ENABLED'

SPAN_MESSAGE_CONSUME = 'karafka.consume'
SPAN_WORKER_PROCESS = 'worker.process'

TAG_TOPIC = 'kafka.topic'
TAG_PARTITION = 'kafka.partition'
TAG_OFFSET = 'kafka.offset'
TAG_OFFSET_LAG = 'kafka.offset_lag'
TAG_MESSAGE_COUNT = 'kafka.message_count'
TAG_MESSAGE_KEY = 'kafka.message_key'

TAG_OPERATION_PROCESS_BATCH = 'consumer.process_batch'
end
end
end
end
end
Loading

0 comments on commit dd02868

Please sign in to comment.