Skip to content
This repository has been archived by the owner on May 23, 2023. It is now read-only.

Add ability to specify parent span in tracer_span_context in TornadoScopeManager #126

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions opentracing/scope_managers/tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import threading
import tornado.stack_context

from opentracing import Scope
from opentracing import Scope, global_tracer
from opentracing.scope_managers import ThreadLocalScopeManager


Expand Down Expand Up @@ -246,7 +246,7 @@ def __exit__(self, *_):
return False


def tracer_stack_context():
def tracer_stack_context(parent_span=None):
"""
Create a custom Tornado's :class:`StackContext` that allows
:class:`TornadoScopeManager` to store the active
Expand All @@ -269,12 +269,47 @@ def handle_request_wrapper(request, actual_handler, *args, **kwargs)
with tracer.scope_manager.activate(span, True):
return actual_handler(*args, **kwargs)

You can specify `parent_span` when schedule callback/coroutine/future in
loop or run fire & forget coroutine, to achieve parent span propagation.
Note that you cannot use one stack context for more than one
callback/coroutine/future, because this will lead to interception of
context by different tasks:

.. code-block:: python
from opentracing import global_tracer
from opentracing.scope_managers.tornado import tracer_stack_context

@tornado.gen.coroutine
def send_notification(message):
# should be used as fire & forget coroutine
with tracer.start_active_span('send_notification'):
# send.

# Ok
with tracer_stack_context(tracer.active_span):
send_notification('hello!')
with tracer_stack_context(tracer.active_span):
io_loop.add_callback(send_notification, 'foobar!')

# Incorrect
with tracer_stack_context(tracer.active_span):
send_notification('hello!')
io_loop.add_callback(send_notification, 'foobar!')

:param parent_span: the :class:`~opentracing.Span` that should be used as
parent in the context.

:return:
Return a custom :class:`StackContext` that allows
:class:`TornadoScopeManager` to activate and propagate
:class:`~opentracing.Span` instances.
"""
context = _TracerRequestContext()
if parent_span is not None:
scope = _TornadoScope(
global_tracer().scope_manager, parent_span, False)
else:
scope = None
context = _TracerRequestContext(scope)
return ThreadSafeStackContext(
lambda: _TracerRequestContextManager(context)
)
41 changes: 38 additions & 3 deletions testbed/test_late_span_finish/test_tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ def setUp(self):
self.tracer = MockTracer(TornadoScopeManager())
self.loop = ioloop.IOLoop.current()

def test_main(self):
def test_active_parent_span_explicitly(self):
# Create a Span and use it as (explicit) parent of a pair of subtasks.
with tracer_stack_context():
parent_span = self.tracer.start_span('parent')
self.submit_subtasks(parent_span)
self.submit_subtasks_with_span(parent_span)

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 2)
Expand All @@ -41,7 +41,7 @@ def test_main(self):

# Fire away a few subtasks, passing a parent Span whose lifetime
# is not tied at all to the children.
def submit_subtasks(self, parent_span):
def submit_subtasks_with_span(self, parent_span):
@gen.coroutine
def task(name):
logger.info('Running %s' % name)
Expand All @@ -51,3 +51,38 @@ def task(name):

self.loop.add_callback(task, 'task1')
self.loop.add_callback(task, 'task2')

def test_use_active_span_implicitly(self):
parent_span = self.tracer.start_span('parent')
# Make new stack context which scope has `parent_span` as active span.
with tracer_stack_context(parent_span):
self.submit_subtasks()

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) >= 2)
self.loop.start()

# Late-finish the parent Span now.
parent_span.finish()

spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
self.assertNamesEqual(spans, ['task1', 'task2', 'parent'])

for i in range(2):
self.assertSameTrace(spans[i], spans[-1])
self.assertIsChildOf(spans[i], spans[-1])
self.assertTrue(spans[i].finish_time <= spans[-1].finish_time)

# Fire away a few subtasks, whose lifetime is not tied at all to the
# children. Parent (active) span will be taken from tornado's stack
# context implicitly.
def submit_subtasks(self):
@gen.coroutine
def task(name):
logger.info('Running %s' % name)
with self.tracer.start_active_span(name):
gen.sleep(0.1)

self.loop.add_callback(task, 'task1')
self.loop.add_callback(task, 'task2')
93 changes: 93 additions & 0 deletions testbed/test_multiple_callbacks/test_tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,96 @@ def submit_callbacks(self):
tasks.append(t)

return tasks

def test_multiple_callbacks_scheduled_in_loop(self):
"""
This test emulates concurrent execution of two scheduled callbacks.
One callback has yield operation that switching context of execution to
another one callback.
When execution of first callback resumes back, it's active scope must
be the same.
"""

@gen.coroutine
def callback_1(name):
with self.tracer.start_active_span(name) as scope:
yield gen.sleep(0.1)
# Check that another concurrently executed callback doesn't
# change original scope.
self.assertEqual(self.tracer.scope_manager.active, scope)
self.assertEqual(self.tracer.active_span, scope.span)

def callback_2(name):
with self.tracer.start_active_span(name):
pass

@gen.coroutine
def main_task():
with self.tracer.start_active_span('parent') as scope:
# Each callback should be wrapped by their own stack context.
with tracer_stack_context(scope.span):
self.loop.add_callback(callback_1, 'foo')
with tracer_stack_context(scope.span):
self.loop.add_callback(callback_2, 'bar')

with tracer_stack_context():
self.loop.add_callback(main_task)

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) == 3)
self.loop.start()

parent, bar, foo = self.tracer.finished_spans()
# Callbacks will be finished later than their parent.
self.assertNamesEqual([parent, bar, foo], ['parent', 'bar', 'foo', ])

self.assertSameTrace(parent, bar)
self.assertSameTrace(parent, foo)

self.assertIsChildOf(foo, parent)
self.assertIsChildOf(bar, parent)

def test_concurrent_fire_and_forget_coroutines(self):
"""
This test emulates concurrent execution of fire & forget coroutines.
Each coroutine has two yield operation that switching context of
execution to another one coroutine.
When execution of a coroutine resumes back, it's active scope must be
the same.
"""

@gen.coroutine
def coro(name):
with self.tracer.start_active_span(name) as scope:
yield gen.sleep(0.1)
# Check that another concurrently executed coroutine doesn't
# change original scope.
self.assertEqual(self.tracer.scope_manager.active, scope)
self.assertEqual(self.tracer.active_span, scope.span)
yield gen.sleep(0.1)

@gen.coroutine
def main_task():
with self.tracer.start_active_span('parent') as scope:
# Each coroutine should be wrapped by their own stack context.
with tracer_stack_context(scope.span):
coro('foo')
with tracer_stack_context(scope.span):
coro('bar')

with tracer_stack_context():
main_task()

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) == 3)
self.loop.start()

parent, foo, bar = self.tracer.finished_spans()
# Coroutines will be finished later than their parent.
self.assertNamesEqual([parent, foo, bar], ['parent', 'foo', 'bar', ])

self.assertSameTrace(parent, foo)
self.assertSameTrace(parent, bar)

self.assertIsChildOf(foo, parent)
self.assertIsChildOf(bar, parent)
101 changes: 89 additions & 12 deletions testbed/test_subtask_span_propagation/test_tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from opentracing.scope_managers.tornado import TornadoScopeManager, \
tracer_stack_context
from ..testcase import OpenTracingTestCase
from ..utils import stop_loop_when


class TestTornado(OpenTracingTestCase):
Expand All @@ -16,7 +17,22 @@ def setUp(self):
self.loop = ioloop.IOLoop.current()

def test_main(self):
parent_task = functools.partial(self.parent_task, 'message')

@gen.coroutine
def child_task(message):
# No need to pass/activate the parent Span, as
# it stays in the context.
with self.tracer.start_active_span('child'):
raise gen.Return('%s::response' % message)

@gen.coroutine
def parent_task(message):
with self.tracer.start_active_span('parent'):
res = yield child_task(message)

raise gen.Return(res)

parent_task = functools.partial(parent_task, 'message')
with tracer_stack_context():
res = self.loop.run_sync(parent_task)
self.assertEqual(res, 'message::response')
Expand All @@ -26,16 +42,77 @@ def test_main(self):
self.assertNamesEqual(spans, ['child', 'parent'])
self.assertIsChildOf(spans[0], spans[1])

@gen.coroutine
def parent_task(self, message):
with self.tracer.start_active_span('parent'):
res = yield self.child_task(message)
def test_callbacks(self):

def child_callback_2():
with self.tracer.start_active_span('child_2'):
pass

def child_callback_1():
with self.tracer.start_active_span('child_1') as scope:
# Should be wrapped by `tracer_stack_context` to store right
# context in scheduled callback.
with tracer_stack_context(scope.span):
self.loop.add_callback(child_callback_2)

def parent_callback():
with self.tracer.start_active_span('parent') as scope:
with tracer_stack_context(scope.span):
self.loop.add_callback(child_callback_1)

with tracer_stack_context():
self.loop.add_callback(parent_callback)

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) == 3)
self.loop.start()

# Callback will be finished later than their parent.
parent, child_1, child_2 = self.tracer.finished_spans()
self.assertNamesEqual(
[parent, child_1, child_2], ['parent', 'child_1', 'child_2'])
self.assertSameTrace(child_1, parent)
self.assertSameTrace(child_2, parent)

self.assertIsChildOf(child_1, parent)
self.assertIsChildOf(child_2, child_1)

def test_fire_and_forget_coroutines(self):

@gen.coroutine
def child_coro_2():
yield gen.sleep(0.1)
with self.tracer.start_active_span('child_2'):
pass

@gen.coroutine
def child_coro_1():
with self.tracer.start_active_span('child_1') as scope:
# Should be wrapped by `tracer_stack_context` to store right
# context in scheduled callback.
yield gen.sleep(0.1)
with tracer_stack_context(scope.span):
child_coro_2()

@gen.coroutine
def parent_coro():
with self.tracer.start_active_span('parent') as scope:
with tracer_stack_context(scope.span):
child_coro_1()

with tracer_stack_context():
parent_coro()

stop_loop_when(self.loop,
lambda: len(self.tracer.finished_spans()) == 3)
self.loop.start()

raise gen.Return(res)
# Callback will be finished later than their parent.
parent, child_1, child_2 = self.tracer.finished_spans()
self.assertNamesEqual(
[parent, child_1, child_2], ['parent', 'child_1', 'child_2'])
self.assertSameTrace(child_1, parent)
self.assertSameTrace(child_2, parent)

@gen.coroutine
def child_task(self, message):
# No need to pass/activate the parent Span, as
# it stays in the context.
with self.tracer.start_active_span('child'):
raise gen.Return('%s::response' % message)
self.assertIsChildOf(child_1, parent)
self.assertIsChildOf(child_2, child_1)