Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog and this project adheres to Semantic Versioning.


## [0.1.81] - 2026-04-08
- Add a centralized span processor to manage root span handling


## [0.1.80] - 2026-04-06
- Added input/output attributes across LLM, traceloop, and custom spans
- Added utility function to explicitly set input/output attributes on the active span
Expand Down Expand Up @@ -232,4 +236,4 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver

- Added utility to set input and output data for any active span in a trace

[0.1.80]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main
[0.1.81]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main
2 changes: 2 additions & 0 deletions netra/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from netra.processors.llm_trace_identifier_span_processor import LlmTraceIdentifierSpanProcessor
from netra.processors.local_filtering_span_processor import LocalFilteringSpanProcessor
from netra.processors.root_instrument_filter_processor import RootInstrumentFilterProcessor
from netra.processors.root_span_processor import RootSpanProcessor
from netra.processors.scrubbing_span_processor import ScrubbingSpanProcessor
from netra.processors.session_span_processor import SessionSpanProcessor
from netra.processors.span_io_processor import SpanIOProcessor
Expand All @@ -14,4 +15,5 @@
"ScrubbingSpanProcessor",
"LocalFilteringSpanProcessor",
"RootInstrumentFilterProcessor",
"RootSpanProcessor",
]
116 changes: 38 additions & 78 deletions netra/processors/llm_trace_identifier_span_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import threading
from typing import Dict, Optional, Set
from typing import Optional, Set

from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.trace import SpanContext
from opentelemetry import context as context_api
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor
from opentelemetry.trace import Span, SpanContext

from netra.processors.root_span_processor import RootSpanProcessor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +47,12 @@ def __init__(

Raises:
ValueError: If any attribute key is empty.

NOTE: This processor must be registered before RootSpanProcessor so that
``_is_root_span_ending`` can query the mapping before it is cleaned up by
RootSpanProcessor.on_end.
"""

self._request_model_key = request_model_attribute_key
self._response_model_key = response_model_attribute_key
self._root_marker_key = root_marker_attribute_key
Expand All @@ -53,47 +61,26 @@ def __init__(
self._lock = threading.Lock()

# Trace state tracking
self._root_spans_by_trace: Dict[int, Span] = {}
self._root_span_ids_by_trace: Dict[int, int] = {}
self._marked_traces: Set[int] = set()

def on_start(self, span: Span, parent_context: Optional[object] = None) -> None:
"""
Handle span start events.

Registers "local root spans" for trace tracking. A local root is either:
- A true root span (no parent), OR
- The first span seen for a trace in this process (handles remote parent case)
def on_start(
self,
span: Span,
parent_context: Optional[context_api.Context] = None,
) -> None:
"""No-op. LLM attributes are only available once the span completes, so all processing is deferred to on_end.

Args:
span: The span that has started.
parent_context: Optional parent context (unused).
span: The span that was started.
parent_context: The parent context.
"""
try:
span_context = self._get_span_context(span)
if span_context is None:
return

trace_id = span_context.trace_id
span_id = span_context.span_id

with self._lock:
# Register the first span we see for this trace as the local root.
# Using setdefault ensures we only register the first span, handling
# both true roots and distributed tracing (remote parent) cases.
if trace_id not in self._root_spans_by_trace:
self._root_spans_by_trace[trace_id] = span
self._root_span_ids_by_trace[trace_id] = span_id

except Exception as e:
logger.warning("Error processing span start: %s", e, exc_info=True)

def on_end(self, span: ReadableSpan) -> None:
"""
Handle span end events.

Checks if the span contains LLM call attributes and marks the root span
if found. Cleans up trace state when root spans complete.
if found. Cleans up marked-trace state when the root span completes.

Args:
span: The span that has ended.
Expand Down Expand Up @@ -138,34 +125,11 @@ def shutdown(self) -> None:
"""
Shutdown the processor and release all resources.

Clears all internal state and releases locks.
Clears all internal state
"""
with self._lock:
self._root_spans_by_trace.clear()
self._root_span_ids_by_trace.clear()
self._marked_traces.clear()

@staticmethod
def _is_root_span(span: Span) -> bool:
"""
Determine if a span is a true root span (no valid parent).

Args:
span: The span to check.

Returns:
True if the span has no valid parent, False otherwise.
"""
parent = getattr(span, "parent", None)
if parent is None:
return True

# is_valid can be either a property (bool) or a method (callable)
is_valid = getattr(parent, "is_valid", False)
if callable(is_valid):
return not is_valid()
return not is_valid

@staticmethod
def _get_span_context(span: ReadableSpan) -> Optional[SpanContext]:
"""
Expand Down Expand Up @@ -194,9 +158,7 @@ def _is_root_span_ending(self, trace_id: int, span_id: int) -> bool:
Returns:
True if this is the root span ending, False otherwise.
"""
with self._lock:
root_span_id = self._root_span_ids_by_trace.get(trace_id)
return root_span_id is not None and span_id == root_span_id
return RootSpanProcessor.is_root_span_for_trace(trace_id, span_id)

def _is_trace_marked(self, trace_id: int) -> bool:
"""
Expand Down Expand Up @@ -238,26 +200,26 @@ def _mark_root_span(self, trace_id: int) -> None:
trace_id: The trace ID whose root span should be marked.
"""
with self._lock:
root_span = self._root_spans_by_trace.get(trace_id)
self._marked_traces.add(trace_id)

if root_span is None:
return
root_span = RootSpanProcessor.get_root_span_by_trace_id(trace_id)
if root_span is None:
return

is_recording = getattr(root_span, "is_recording", lambda: False)()
if not is_recording:
logger.debug("Root span not recording for trace_id=%s", trace_id)
return
is_recording = getattr(root_span, "is_recording", lambda: False)()
if not is_recording:
logger.debug("Root span not recording for trace_id=%s", trace_id)
return

try:
root_span.set_attribute(self._root_marker_key, True)
except Exception as e:
logger.warning(
"Failed to mark root span for trace_id=%s: %s",
trace_id,
e,
exc_info=True,
)
try:
root_span.set_attribute(self._root_marker_key, True)
except Exception as e:
logger.warning(
"Failed to mark root span for trace_id=%s: %s",
trace_id,
e,
exc_info=True,
)

def _cleanup_trace(self, trace_id: int) -> None:
"""
Expand All @@ -267,6 +229,4 @@ def _cleanup_trace(self, trace_id: int) -> None:
trace_id: The trace ID to clean up.
"""
with self._lock:
self._root_spans_by_trace.pop(trace_id, None)
self._root_span_ids_by_trace.pop(trace_id, None)
self._marked_traces.discard(trace_id)
Loading