Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions src/strands/telemetry/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,31 @@ class Tracer:
or "gen_ai_use_latest_invocation_tokens", respectively, in the OTEL_SEMCONV_STABILITY_OPT_IN environment variable.
"""

def __init__(self) -> None:
"""Initialize the tracer."""
def __init__(self, instrument_threading: bool | None = None) -> None:
"""Initialize the tracer.

Args:
instrument_threading: When True, install OpenTelemetry's
``ThreadingInstrumentor`` so spans propagate across
``concurrent.futures.ThreadPoolExecutor``/``threading.Thread``
boundaries. When False, skip it. When None (default), honor
the ``STRANDS_INSTRUMENT_THREADING`` environment variable
(``true``/``1``/``yes`` enables, anything else disables).
Threading instrumentation is off by default because it
mutates global Python state (monkey-patches
``ThreadPoolExecutor.submit``) and can conflict with other
OpenTelemetry setups in the host application.

The standard ``OTEL_PYTHON_DISABLED_INSTRUMENTATIONS``
env var is honored regardless — if ``threading`` appears in
that comma-separated list, instrumentation is skipped even
when explicitly opted in.
"""
self.service_name = __name__
self.tracer_provider: trace_api.TracerProvider | None = None
self.tracer_provider = trace_api.get_tracer_provider()
self.tracer = self.tracer_provider.get_tracer(self.service_name)
ThreadingInstrumentor().instrument()
self._maybe_instrument_threading(instrument_threading)

# Read OTEL_SEMCONV_STABILITY_OPT_IN environment variable
opt_in_values = self._parse_semconv_opt_in()
Expand All @@ -102,6 +120,69 @@ def __init__(self) -> None:
self._include_tool_definitions = "gen_ai_tool_definitions" in opt_in_values
self._use_latest_invocation_tokens = "gen_ai_use_latest_invocation_tokens" in opt_in_values

@staticmethod
def _threading_opt_in(explicit: bool | None) -> tuple[bool, bool]:
"""Resolve whether to install ThreadingInstrumentor, and whether the user asked.

Precedence (highest to lowest):

1. ``OTEL_PYTHON_DISABLED_INSTRUMENTATIONS`` containing ``threading`` →
always disabled (matches OpenTelemetry's auto-loader semantics).
2. Explicit ``instrument_threading`` kwarg on ``Tracer()``.
3. ``STRANDS_INSTRUMENT_THREADING`` env var.
4. Default: disabled.

Returns:
A tuple ``(enabled, user_requested)``. ``user_requested`` is True
when the caller explicitly opted in via kwarg=True or via the
``STRANDS_INSTRUMENT_THREADING`` env var — i.e. a failure at that
point is a broken feature the user asked for (logged at ERROR).
False when instrumentation is enabled only by default/auto behavior
(logged at WARNING). Currently default is off, so enabled ⇒
user_requested ⇒ ERROR on failure, but we keep the distinction so
a future "enabled by default" flip doesn't silently escalate every
log.
"""
disabled_env = os.getenv("OTEL_PYTHON_DISABLED_INSTRUMENTATIONS", "")
disabled = {value.strip().lower() for value in disabled_env.split(",") if value.strip()}
if "threading" in disabled:
return False, False
if explicit is not None:
# An explicit kwarg (True or False) is always a user intent signal.
return explicit, explicit
env_opt_in = os.getenv("STRANDS_INSTRUMENT_THREADING", "").strip().lower() in {"1", "true", "yes"}
return env_opt_in, env_opt_in

def _maybe_instrument_threading(self, instrument_threading: bool | None) -> None:
"""Install ``ThreadingInstrumentor`` if requested and not already active."""
enabled, user_requested = self._threading_opt_in(instrument_threading)
if not enabled:
return
instrumentor = ThreadingInstrumentor()
# Skip if this process already has OTel threading instrumentation — prevents
# wrapper stacking if the host application (e.g. ``opentelemetry-distro``,
# AWS OTel Distro, ``opentelemetry-instrument`` CLI, Azure Monitor's
# distro) installed it first. Read the documented underscore-prefixed
# attribute directly rather than the ``is_instrumented_by_opentelemetry``
# property, so the guard state is unambiguous.
if getattr(instrumentor, "_is_instrumented_by_opentelemetry", False):
return
# Telemetry is ancillary — a failure inside the instrumentor must not
# crash the host application. Mirror the log+continue pattern used
# elsewhere in this module (see ``_end_span``). Use ERROR when the
# user explicitly asked for threading instrumentation (they asked for
# a feature and it silently didn't work); WARNING when it was only
# auto-enabled.
try:
instrumentor.instrument()
except Exception as e:
log = logger.error if user_requested else logger.warning
log(
"error=<%s> | ThreadingInstrumentor.instrument() failed; continuing without threading span propagation",
e,
exc_info=True,
)

def _parse_semconv_opt_in(self) -> set[str]:
"""Parse the OTEL_SEMCONV_STABILITY_OPT_IN environment variable.

Expand Down
Loading