Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Karafka integration #4147

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open

Conversation

nvh0412
Copy link
Contributor

@nvh0412 nvh0412 commented Nov 22, 2024

What does this PR do?

Fixed #1660

In this PR, we introduce the Kafka integration for the Karafka gem. Which includes:

  1. Distributed tracing by utilizing message.metadata.headers
  2. Traces for worker.process and each message executor inside the batch, so we can link them to the origin trace if the distributed tracing is on

Motivation:

We’re integrating Karafka to implement proper distributed tracing in our system with Datadog, as it lacks an official integration. This integration will also enable distributed tracing if the message headers include distributed tracing data.

Distributed tracing will help us create a proper service map, connecting Kafka producers and consumers.

Change log entry

Yes. Add Karafka integration for distributed tracing.

(Added by @ivoanjo)

Additional Notes:

How to test the change?

Screenshot 2024-11-22 at 3 43 09 PM

@github-actions github-actions bot added integrations Involves tracing integrations tracing labels Nov 22, 2024
@nvh0412 nvh0412 marked this pull request as ready for review November 22, 2024 06:07
@nvh0412 nvh0412 requested review from a team as code owners November 22, 2024 06:07

option :service_name

option :distributed_tracing, default: false, type: :bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, for your use case, wouldn't you prefer that distributed_tracing is enabled by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually in limbo with it. The reason why I disable it by default is the same reason as the distributed_tracing of Sidekiq. The trace is actually easily blown up and getting out of control if we turn it on by default.

@drichards-87
Copy link

Created a Jira card for Docs Team editorial review.

@drichards-87 drichards-87 added the editorial review Waiting for a review from the docs team label Nov 22, 2024
Comment on lines 16 to 60
::Karafka.monitor.subscribe 'worker.process' do |event|
# Start a trace
span = Tracing.trace(Ext::SPAN_WORKER_PROCESS, **span_options)

job = event[:job]
job_type = fetch_job_type(job.class)
consumer = job.executor.topic.consumer
topic = job.executor.topic.name

action = case job_type
when 'Periodic'
'tick'
when 'PeriodicNonBlocking'
'tick'
when 'Shutdown'
'shutdown'
when 'Revoked'
'revoked'
when 'RevokedNonBlocking'
'revoked'
when 'Idle'
'idle'
when 'Eofed'
'eofed'
when 'EofedNonBlocking'
'eofed'
else
'consume'
end

span.resource = "#{consumer}##{action}"
span.set_tag(Ext::TAG_TOPIC, topic) if topic

if action == 'consume'
span.set_tag(Ext::TAG_MESSAGE_COUNT, job.messages.count)
span.set_tag(Ext::TAG_PARTITION, job.executor.partition)
span.set_tag(Ext::TAG_OFFSET, job.messages.first.metadata.offset)
end

span
end

::Karafka.monitor.subscribe 'worker.completed' do |event|
Tracing.active_span&.finish
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having separate locations for span creation Tracing.trace and span conclusion Tracing.active_span&.finish is always a possible source of hard-to-debug errors and span leakage (and thus memory leaks). We normally only do it when it's impossible to use Tracing.trace { do_work_here }.

Also, Tracing.trace { do_work_here } takes care of error handling, properly tagging the current span with error information.

In this case, we have the event worker.processed that looks like it's just what we need:
https://github.com/karafka/karafka/blob/ab4f9bcd3620f46adb8c0d158b5396b245619ed3/lib/karafka/processing/worker.rb#L58-L78

Except that it doesn't call the event listeners when an error is raised by the job. There are no error handlers in this method that call assigned_listeners: https://github.com/karafka/karafka-core/blob/a1425725d275796673424c1cd9be517d06518ec9/lib/karafka/core/monitoring/notifications.rb#L120

I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library: karafka/karafka-core#145

The being said, I still lean towards using a single Tracing.trace { do_work_here } only because the safety of not having to worry about leaky spans is too advantageous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s precisely what I’d like to hear from the Datadog team regarding this. My initial intuition when using Tracing.active_span was that I couldn’t be certain whether the span I’m currently working on is the one I want to complete. Considering this and your insights, would it be feasible to remove this tracing event? At the moment, I prefer to retain the Ext::SPAN_MESSAGE_CONSUME event only for this integration, as it effectively meets my requirement: enabling distributed tracing, ensuring that each message span is linked to the root trace in the distributed tracing system.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a PR to Karafka to address this, but even if it's approved, it won't affect users of older versions of the library

This PR is not needed to achieve the expected wrapping because it is already done for example for OpenTelemetry:

ref1: https://karafka.io/docs/Monitoring-and-Logging/#opentelemetry
ref2: https://karafka.io/docs/Monitoring-and-Logging/#monitor-wrapping-and-replacement

@@ -193,6 +193,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc'
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 2.7 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

@@ -115,6 +115,7 @@
gem 'concurrent-ruby'
gem 'dalli', '>= 3.0.0'
gem 'grpc', '>= 1.38.0', platform: :ruby # Minimum version with Ruby 3.0 support
gem 'karafka'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

karafka no longer supports 3.0 based on: https://karafka.io/docs/Versions-Lifecycle-and-EOL/

include Karafka::Event

def self.subscribe!
::Karafka.monitor.subscribe 'worker.process' do |event|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not recommend using this layer for instrumentation of that type and would advise you to reconsider something "closer" to the actual execution of work.

Copy link
Contributor Author

@nvh0412 nvh0412 Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for you to explain what "closer" means here :)? I utilised what you did on instrumentation/vendors/datadog/logger_listener.rb here, but what I'm reconsidering here that we don't actually need to subscribe to this event anymore, because our goal is to have message traces for each message inside messages enumerator, so we can link them to the distributed traces. So I can remove this even if everything is getting out of control here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Karafka has several layers of reporting when a "job" is executed. The worker level is the highest, and I think of it as conceptually "distant" from the end user code execution layer. In between those, there are coordinators, executors, and more. The closest to the user code is the one that has consumer. events, and while I myself use the worker level once in a while, I, in general, do not recommend it and recommend using the one mentioned above. At some point, I will probably migrate the once that I wrote myself. There is nothing fundamentally wrong about using the worker one but as mentioned, there's a lot in between.

because our goal is to have message traces for each message inside messages enumerator

But this is not the only way users process code. You need to keep in mind users that do batch operations as well, thus you want to trace around all the operational code also.

@codecov-commenter
Copy link

codecov-commenter commented Nov 22, 2024

Codecov Report

Attention: Patch coverage is 80.58252% with 20 lines in your changes missing coverage. Please review.

Project coverage is 97.75%. Comparing base (ce4393e) to head (27e5711).
Report is 41 commits behind head on master.

Files with missing lines Patch % Lines
lib/datadog/tracing/contrib/karafka/patcher.rb 51.61% 15 Missing ⚠️
lib/datadog/tracing/contrib/karafka.rb 80.00% 4 Missing ⚠️
lib/datadog/tracing/contrib/karafka/integration.rb 95.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4147      +/-   ##
==========================================
- Coverage   97.78%   97.75%   -0.03%     
==========================================
  Files        1353     1358       +5     
  Lines       81817    81920     +103     
  Branches     4145     4150       +5     
==========================================
+ Hits        80001    80081      +80     
- Misses       1816     1839      +23     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mensfeld
Copy link

FYI feel free to ping me once remarks are done. I will be happy to help and maybe in the future retire my own instrumentation in favour of the DD one ;)

SPAN_MESSAGE_CONSUME = 'karafka.consume'
SPAN_WORKER_PROCESS = 'worker.process'

TAG_TOPIC = 'kafka.topic'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a side note because maybe it is done: please keep in mind that karafka supports multi-consumer group operations in one karafka process thus CG (consumer group) always needs to be reported alongside metrics.

end

def each(&block)
@messages_array.each do |message|

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned that once but I will do it again here: this is not the only way users process data. Several high scale users use batch operations. Karafka messages API allows you to also fetch deserializers payloads and more making this implementation only partial (not including batch processing users)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding what you wrote here @mensfeld https://karafka.io/docs/Consuming-Messages/#in-batches, this messages enumerator is supposed to be the only API interface that our users use for batch operations, am I right? Seems I can't see any alternative ways in this doc, except you're saying these high-scale users delegate the whole messages to another process by saving them to a storage (e.g., like Event.insert_all messages) without calling .each ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this messages enumerator is supposed to be the only API interface that our users use for batch operations, am I right?

No.

except you're saying these high-scale users delegate the whole messages to another process by saving them to a storage (e.g., like Event.insert_all messages) without calling .each ?

Yes. Alongside you can use Messages#payloads directly omitting the headers when not needed. Messages#each is a popular use-case but not the only one. While Karafka provides primitives for both, it my itself makes no assumptions about that nature of the processing, that's why #each is not instrumented on my side. The moment I yield control to the user, it is on the user to define the nature of the processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mensfeld, I see your point, but honestly, this seems like the most suitable place for me to patch and enable distributed tracing, if the users (who also use Datadog like me) want to use #payloads to do their batching implementation, then loosing distributed tracing, in that case, would be reasonable.

I understand that the #each method in the Karafka gem isn’t instrumented since neither you nor the gem itself needs to expose it. However, this distributed tracing is specific to the datadog gem and aligns perfectly with my motivation (in my PR description). So would love to hear thoughts on this from the Datadog team as well. @marcotc

willing to address if there is another place that we should patch to have our distributed tracing instead.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

willing to address if there is another place that we should patch to have our distributed tracing instead.

The only thing here is a philosophy of tracing. Ideally if we could have both and allow users to go with the approach they are taking for processing their data, then it would be ideal.

FYI I'm not sure if I mentioned this but feel free to ping me if you need any more help. Also happy to help with this work via Slack/etc if needed.

Copy link

@drichards-87 drichards-87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of very small suggestions from Docs and approved the PR.

docs/GettingStarted.md Outdated Show resolved Hide resolved
docs/GettingStarted.md Outdated Show resolved Hide resolved
Use Tracing.trace wrapper to add consumer trace
Use Instrumentation::Monitor to instrument and have a proper trace wrapper
@nvh0412
Copy link
Contributor Author

nvh0412 commented Nov 27, 2024

Hi team and @marcotc

Let's settle this. My gut feeling about this PR is that we do have some limitations in how tracing is integrated into the Karafka gem. I’ve switched to using Tracing.trace with a block rather than starting and finishing traces manually. However, the core of this PR is the distributed tracing with the message block.

This implementation fits perfectly with my system and has been working well with an internal patch. That said, let me know your thoughts on this integration. I’m fine if we can't settle on it, as I can always maintain the internal patch if needed.

@marcotc
Copy link
Member

marcotc commented Jan 13, 2025

Sorry for the delay, I'll circle back here next week to get this PR unblocked and hopefully merged.

@mensfeld
Copy link

@marcotc feel free to ping me as well prior to merging/deciding what to do with it. I will give it a second look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
editorial review Waiting for a review from the docs team integrations Involves tracing integrations tracing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Distributed tracing support through Kafka integration
5 participants