Skip to content

Commit

Permalink
Serialize span events via a dedicated field
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Jan 14, 2025
1 parent 3dddec7 commit 33a7fc9
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 47 deletions.
13 changes: 12 additions & 1 deletion .github/forced-tests-list.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
{

"DEFAULT":
[
"tests/test_span_events.py::Test_SpanEvents_WithAgentSupport::test_v04_v07_default_format"
],
"AGENT_NOT_SUPPORTING_SPAN_EVENTS":
[
"tests/test_span_events.py"
],
"PARAMETRIC":
[
"tests/parametric/test_span_events.py"
]
}
2 changes: 0 additions & 2 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ target :datadog do
ignore 'lib/datadog/tracing/transport/http/traces.rb'
ignore 'lib/datadog/tracing/transport/io/client.rb'
ignore 'lib/datadog/tracing/transport/io/traces.rb'
ignore 'lib/datadog/tracing/transport/serializable_trace.rb'
ignore 'lib/datadog/tracing/transport/statistics.rb'
ignore 'lib/datadog/tracing/transport/trace_formatter.rb'
ignore 'lib/datadog/tracing/transport/traces.rb'
ignore 'lib/datadog/tracing/workers.rb'
ignore 'lib/datadog/tracing/workers/trace_writer.rb'
ignore 'lib/datadog/tracing/writer.rb'
Expand Down
8 changes: 7 additions & 1 deletion lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
require_relative '../../di/component'
require_relative '../crashtracking/component'

require_relative '../environment/agent_info'

module Datadog
module Core
module Configuration
Expand Down Expand Up @@ -85,7 +87,8 @@ def build_crashtracker(settings, agent_settings, logger:)
:tracer,
:crashtracker,
:dynamic_instrumentation,
:appsec
:appsec,
:agent_info

def initialize(settings)
@logger = self.class.build_logger(settings)
Expand All @@ -96,6 +99,9 @@ def initialize(settings)
# the Core resolver from within your product/component's namespace.
agent_settings = AgentSettingsResolver.call(settings, logger: @logger)

# Exposes agent capability information for detection by any components
@agent_info = Core::Environment::AgentInfo.new(agent_settings)

@telemetry = self.class.build_telemetry(settings, agent_settings, @logger)

@remote = Remote::Component.build(settings, agent_settings, telemetry: telemetry)
Expand Down
16 changes: 16 additions & 0 deletions lib/datadog/core/encoding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Encoding
# Encoder interface that provides the logic to encode traces and service
# @abstract
module Encoder
# :nocov:
def content_type
raise NotImplementedError
end
Expand All @@ -23,6 +24,13 @@ def join(encoded_elements)
def encode(_)
raise NotImplementedError
end

# Deserializes a value serialized with {#encode}.
# This method is used for debugging purposes.
def decode(_)
raise NotImplementedError
end
# :nocov:
end

# Encoder for the JSON format
Expand All @@ -41,6 +49,10 @@ def encode(obj)
JSON.dump(obj)
end

def decode(obj)
JSON.parse(obj)
end

def join(encoded_data)
"[#{encoded_data.join(',')}]"
end
Expand All @@ -62,6 +74,10 @@ def encode(obj)
MessagePack.pack(obj)
end

def decode(obj)
MessagePack.unpack(obj)
end

def join(encoded_data)
packer = MessagePack::Packer.new
packer.write_array_header(encoded_data.size)
Expand Down
1 change: 1 addition & 0 deletions lib/datadog/core/remote/transport/http/negotiation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def initialize(http_response, options = {})
@version = options[:version]
@endpoints = options[:endpoints]
@config = options[:config]
@span_events = options[:span_events]
end
end

Expand Down
14 changes: 13 additions & 1 deletion lib/datadog/core/remote/transport/negotiation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,19 @@ class Request < Datadog::Core::Transport::Request

# Negotiation response
module Response
attr_reader :version, :endpoints, :config
# @!attribute [r] version
# The version of the agent.
# @return [String]
# @!attribute [r] endpoints
# The HTTP endpoints the agent supports.
# @return [Array<String>]
# @!attribute [r] config
# The agent configuration. These are configured by the user when starting the agent, as well as any defaults.
# @return [Hash]
# @!attribute [r] span_events
# Whether the agent supports the top-level span events field in flushed spans.
# @return [Boolean,nil]
attr_reader :version, :endpoints, :config, :span_events
end

# Negotiation transport
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/transport/serializable_trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class SerializableTrace

# @param trace [Datadog::Trace] the trace to serialize
# @param native_events_supported [Boolean] whether the agent supports span events as a top-level field
def initialize(trace, native_events_supported = false)
def initialize(trace, native_events_supported)
@trace = trace
@native_events_supported = native_events_supported
end
Expand Down
30 changes: 22 additions & 8 deletions lib/datadog/tracing/transport/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class Chunker
#
# @param encoder [Datadog::Core::Encoding::Encoder]
# @param max_size [String] maximum acceptable payload size
def initialize(encoder, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
def initialize(encoder, native_events_supported, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
@encoder = encoder
@native_events_supported = native_events_supported
@max_size = max_size
end

Expand All @@ -77,7 +78,7 @@ def encode_in_chunks(traces)
private

def encode_one(trace)
encoded = Encoder.encode_trace(encoder, trace)
encoded = Encoder.encode_trace(encoder, trace, @native_events_supported)

if encoded.size > max_size
# This single trace is too large, we can't flush it
Expand All @@ -95,17 +96,18 @@ def encode_one(trace)
module Encoder
module_function

def encode_trace(encoder, trace)
def encode_trace(encoder, trace, native_events_supported)
# Format the trace for transport
TraceFormatter.format!(trace)

# Make the trace serializable
serializable_trace = SerializableTrace.new(trace)

Datadog.logger.debug { "Flushing trace: #{JSON.dump(serializable_trace)}" }
serializable_trace = SerializableTrace.new(trace, native_events_supported)

# Encode the trace
encoder.encode(serializable_trace)
encoder.encode(serializable_trace).tap do |encoded|
# Print the actual serialized trace, since the encoder can change make non-trivial changes
Datadog.logger.debug { "Flushing trace: #{encoder.decode(encoded)}" }
end
end
end

Expand All @@ -126,7 +128,7 @@ def initialize(apis, default_api)

def send_traces(traces)
encoder = current_api.encoder
chunker = Datadog::Tracing::Transport::Traces::Chunker.new(encoder)
chunker = Datadog::Tracing::Transport::Traces::Chunker.new(encoder, native_events_supported?)

responses = chunker.encode_in_chunks(traces.lazy).map do |encoded_traces, trace_count|
request = Request.new(EncodedParcel.new(encoded_traces, trace_count))
Expand Down Expand Up @@ -188,6 +190,18 @@ def change_api!(api_id)
@client = HTTP::Client.new(current_api)
end

# Queries the agent for native span events serialization support.
# This changes how the serialization of span events performed.
def native_events_supported?
return @native_events_supported if defined?(@native_events_supported)

if (res = Datadog.send(:components).agent_info.fetch)
@native_events_supported = res.span_events == true
else
false
end
end

# Raised when configured with an unknown API version
class UnknownApiVersionError < StandardError
attr_reader :version
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/configuration/components.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ module Datadog

attr_reader remote: Datadog::Core::Remote::Component

attr_reader agent_info: Datadog::Core::Environment::AgentInfo

def initialize: (untyped settings) -> untyped

def startup!: (untyped settings) -> untyped
Expand Down
30 changes: 17 additions & 13 deletions sig/datadog/core/encoding.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,43 @@ module Datadog
# Encoder interface that provides the logic to encode traces and service
# @abstract
module Encoder
def content_type: () -> untyped
def content_type: () -> String

# Concatenates a list of elements previously encoded by +#encode+.
def join: (untyped encoded_elements) -> untyped
def encode: (untyped obj) -> String

# Serializes a single trace into a String suitable for network transmission.
def encode: (untyped _) -> untyped
def join: (Array[untyped] encoded_data) -> String

def decode: (String obj)-> untyped
end

# Encoder for the JSON format
module JSONEncoder
extend Encoder

CONTENT_TYPE: "application/json"
CONTENT_TYPE: String

def self?.content_type: () -> String

def self?.content_type: () -> untyped
def self?.encode: (untyped obj) -> String

def self?.encode: (untyped obj) -> untyped
def self?.join: (Array[untyped] encoded_data) -> String

def self?.join: (untyped encoded_data) -> ::String
def self?.decode: (String obj)-> untyped
end

# Encoder for the Msgpack format
module MsgpackEncoder
extend Encoder

CONTENT_TYPE: "application/msgpack"
CONTENT_TYPE: String

def self?.content_type: () -> String

def self?.content_type: () -> untyped
def self?.encode: (untyped obj) -> String

def self?.encode: (untyped obj) -> untyped
def self?.join: (Array[untyped] encoded_data) -> String

def self?.join: (untyped encoded_data) -> untyped
def self?.decode: (String obj)-> untyped
end
end
end
Expand Down
12 changes: 8 additions & 4 deletions sig/datadog/core/remote/transport/negotiation.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ module Datadog
end

module Response
attr_reader version: untyped
attr_reader version: String

attr_reader endpoints: untyped
attr_reader endpoints: Array[String]

attr_reader config: untyped
attr_reader config: Hash[String,untyped]

attr_reader span_events: bool
end

class Transport
Expand All @@ -25,7 +27,9 @@ module Datadog

def initialize: (untyped apis, untyped default_api) -> void

def send_info: () -> untyped
type send_info_return = HTTP::Negotiation::Response & Core::Transport::InternalErrorResponse

def send_info: () -> send_info_return

def current_api: () -> untyped
end
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/transport/response.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Datadog
class InternalErrorResponse
include Response

attr_reader error: untyped
attr_reader error: Exception

def initialize: (untyped error) -> void

Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/tracing/transport/serializable_trace.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Datadog
class SerializableTrace
@native_events_supported: bool

attr_reader trace: Span
attr_reader trace: TraceSegment

def initialize: (untyped trace, bool native_events_supported) -> void

Expand Down
10 changes: 7 additions & 3 deletions sig/datadog/tracing/transport/traces.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module Datadog

attr_reader max_size: untyped

def initialize: (untyped encoder, ?max_size: untyped) -> void
def initialize: (untyped encoder, bool native_events_supported, ?max_size: untyped) -> void

def encode_in_chunks: (untyped traces) -> untyped

Expand All @@ -38,10 +38,12 @@ module Datadog
end

module Encoder
def self?.encode_trace: (untyped encoder, untyped trace) -> untyped
def self?.encode_trace: (untyped encoder, untyped trace, bool native_events_supported) -> untyped
end

class Transport
@native_events_supported: bool

attr_reader client: untyped

attr_reader apis: untyped
Expand All @@ -52,7 +54,7 @@ module Datadog

def initialize: (untyped apis, untyped default_api) -> void

def send_traces: (untyped traces) -> untyped
def send_traces: (Array[Tracing::TraceOperation] traces) -> untyped

def stats: () -> untyped

Expand Down Expand Up @@ -81,6 +83,8 @@ module Datadog

def message: () -> ::String
end

def native_events_supported?: -> bool
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions spec/datadog/core/configuration/components_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
let(:logger) { instance_double(Datadog::Core::Logger) }
let(:settings) { Datadog::Core::Configuration::Settings.new }
let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) }
let(:agent_info) { Datadog::Core::Environment::AgentInfo.new(agent_settings) }

let(:profiler_setup_task) { Datadog::Profiling.supported? ? instance_double(Datadog::Profiling::Tasks::Setup) : nil }
let(:remote) { instance_double(Datadog::Core::Remote::Component, start: nil, shutdown!: nil) }
Expand Down Expand Up @@ -95,6 +96,7 @@
expect(components.profiler).to be profiler
expect(components.runtime_metrics).to be runtime_metrics
expect(components.health_metrics).to be health_metrics
expect(components.agent_info).to eq agent_info
end

describe '@environment_logger_extra' do
Expand Down
Loading

0 comments on commit 33a7fc9

Please sign in to comment.