diff --git a/CHANGELOG.md b/CHANGELOG.md index f3c823a..3dbdcf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on Keep a Changelog and this project adheres to Semantic Versioning. +## [0.1.82] - 2026-04-10 + +- Add relative_time_to_first_token attribute on LLM spans +- Create a common util function to centralize adding time duration attributes on a span +- Add time_to_first_token and relative_time_to_first_token for litellm instrumentation + + +## [0.1.81] - 2026-04-08 + +- Add a centralized span processor to manage root span handling + + ## [0.1.80] - 2026-04-06 + - Added input/output attributes across LLM, traceloop, and custom spans - Added utility function to explicitly set input/output attributes on the active span - Added utility function to explicitly set input/output attributes on the root span @@ -13,14 +26,17 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver ## [0.1.79] - 2026-04-02 + - Added version-safe check for _shutdown attribute in _JsonOTLPMetricExporter for compatability with opentelemetry libraries ## [0.1.78] - 2026-03-31 + - Added descriptor based binding of class methods when using decorators. ## [0.1.77] - 2026-03-27 + - Added custom-metric utility in SDK - Added support for custom-metric in dashboard utility @@ -232,4 +248,4 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver - Added utility to set input and output data for any active span in a trace -[0.1.80]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main +[0.1.82]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main diff --git a/netra/instrumentation/cerebras/wrappers.py b/netra/instrumentation/cerebras/wrappers.py index 66f10c8..d7c2ee2 100644 --- a/netra/instrumentation/cerebras/wrappers.py +++ b/netra/instrumentation/cerebras/wrappers.py @@ -14,11 +14,15 @@ set_response_attributes, should_suppress_instrumentation, ) +from netra.instrumentation.utils import record_span_timing logger = logging.getLogger(__name__) CHAT_SPAN_NAME = "cerebras.chat.completions" COMPLETION_SPAN_NAME = "cerebras.completions" +TIME_TO_FIRST_TOKEN = "gen_ai.performance.time_to_first_token" +RELATIVE_TIME_TO_FIRST_TOKEN = "gen_ai.performance.relative_time_to_first_token" +LLM_RESPONSE_DURATION = "llm.response.duration" def _detect_streaming(args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> bool: @@ -36,10 +40,9 @@ def _detect_streaming(args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> bool: class StreamingWrapper(ObjectProxy): # type: ignore[misc] """Wrapper for streaming responses""" - def __init__(self, span: Span, response: Iterator[Any], start_time: float, request_kwargs: Dict[str, Any]) -> None: + def __init__(self, span: Span, response: Iterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} self._first_content_recorded: bool = False @@ -87,8 +90,10 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = str(delta.get("content", "")) if content_piece and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -98,8 +103,10 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = str(choice.get("text", "")) if content_piece and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -116,10 +123,8 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span when streaming is complete""" - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() @@ -127,12 +132,9 @@ def _finalize_span(self) -> None: class AsyncStreamingWrapper(ObjectProxy): # type: ignore[misc] """Async wrapper for streaming responses""" - def __init__( - self, span: Span, response: AsyncIterator[Any], start_time: float, request_kwargs: Dict[str, Any] - ) -> None: + def __init__(self, span: Span, response: AsyncIterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} self._first_content_recorded: bool = False @@ -181,8 +183,10 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = str(delta.get("content", "")) if content_piece and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -193,8 +197,10 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = str(choice.get("text", "")) if content_piece and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -211,10 +217,8 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span when streaming is complete""" - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() @@ -234,9 +238,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: ctx = context_api.attach(set_span_in_context(span)) set_request_attributes(span, LLMRequestTypeValues.CHAT, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return StreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.cerebras: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -252,13 +255,12 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, LLMRequestTypeValues.CHAT, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() set_response_attributes(span, response) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -281,9 +283,8 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . try: ctx = context_api.attach(set_span_in_context(span)) set_request_attributes(span, LLMRequestTypeValues.CHAT, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return AsyncStreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.cerebras: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -296,13 +297,12 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . ) as span: try: set_request_attributes(span, LLMRequestTypeValues.CHAT, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() set_response_attributes(span, response) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -327,12 +327,10 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: ctx = context_api.attach(set_span_in_context(span)) set_request_attributes(span, LLMRequestTypeValues.COMPLETION, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) return StreamingWrapper( span=span, response=response, - start_time=start_time, request_kwargs=kwargs, ) except Exception as e: @@ -348,13 +346,12 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, LLMRequestTypeValues.COMPLETION, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() set_response_attributes(span, response) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -378,12 +375,10 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . try: ctx = context_api.attach(set_span_in_context(span)) set_request_attributes(span, LLMRequestTypeValues.COMPLETION, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) return AsyncStreamingWrapper( span=span, response=response, - start_time=start_time, request_kwargs=kwargs, ) except Exception as e: @@ -398,13 +393,12 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . ) as span: try: set_request_attributes(span, LLMRequestTypeValues.COMPLETION, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() set_response_attributes(span, response) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: diff --git a/netra/instrumentation/elevenlabs/wrappers.py b/netra/instrumentation/elevenlabs/wrappers.py index 3e05c32..166f67d 100644 --- a/netra/instrumentation/elevenlabs/wrappers.py +++ b/netra/instrumentation/elevenlabs/wrappers.py @@ -11,6 +11,7 @@ set_response_attributes, should_suppress_instrumentation, ) +from netra.instrumentation.utils import record_span_timing logger = logging.getLogger(__name__) @@ -79,7 +80,6 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) context = context_api.attach(set_span_in_context(span)) - start_time = time.time() try: set_request_attributes(span, kwargs) @@ -87,7 +87,6 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k return ElevenLabsStreamingWrapper( span=span, response=response, - start_time=start_time, context=context, ) @@ -273,7 +272,6 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) context = context_api.attach(set_span_in_context(span)) - start_time = time.time() try: set_request_attributes(span, kwargs) @@ -281,7 +279,6 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k return ElevenLabsAsyncStreamingWrapper( span=span, response=response, - start_time=start_time, context=context, ) except Exception as e: @@ -364,12 +361,10 @@ def __init__( self, span: Span, response: Iterator[Any], - start_time: float, context: Any, ) -> None: self._span = span self._response = response - self._start_time = start_time self._context = context self._buffer: Dict[str, Any] = { @@ -414,8 +409,7 @@ def _process_chunk(self, chunk: Any) -> None: self._buffer["chunks"].append(chunk) def _finalize_span(self) -> None: - end_time = time.time() - self._span.set_attribute("gen_ai.response.duration", end_time - self._start_time) + record_span_timing(self._span, "gen_ai.response.duration") if self._buffer["duration"]: self._span.set_attribute("gen_ai.audio.duration", self._buffer["duration"]) @@ -428,17 +422,15 @@ def _finalize_span(self) -> None: class ElevenLabsAsyncStreamingWrapper: """Async wrapper for streaming responses (async generators).""" - def __init__(self, span: Span, response: AsyncIterator[Any], start_time: float, context: Any) -> None: + def __init__(self, span: Span, response: AsyncIterator[Any], context: Any) -> None: """Initialize async streaming wrapper. Args: span: OpenTelemetry span for this operation response: Async generator/iterator from the wrapped method - start_time: Start time for duration calculation context: OpenTelemetry context token from context_api.attach() """ self._span: Span = span - self._start_time: float = start_time self._response: AsyncIterator[Any] = response self._context: Any = context self._buffer: Dict[str, Any] = {"chunks": [], "duration": 0.0, "alignment": [], "voice_segments": []} @@ -490,8 +482,7 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span after streaming completes.""" - end_time = time.time() - self._span.set_attribute("gen_ai.response.duration", end_time - self._start_time) + record_span_timing(self._span, "gen_ai.response.duration") if self._buffer["duration"]: self._span.set_attribute("gen_ai.audio.duration", self._buffer["duration"]) diff --git a/netra/instrumentation/google_genai/wrappers.py b/netra/instrumentation/google_genai/wrappers.py index 6564047..33b3e26 100644 --- a/netra/instrumentation/google_genai/wrappers.py +++ b/netra/instrumentation/google_genai/wrappers.py @@ -11,6 +11,7 @@ set_response_attributes, should_suppress_instrumentation, ) +from netra.instrumentation.utils import record_span_timing logger = logging.getLogger(__name__) @@ -19,6 +20,10 @@ CONTENT_STREAM_SPAN_NAME = "genai.generate_content_stream" IMAGES_SPAN_NAME = "genai.generate_images" VIDEOS_SPAN_NAME = "genai.generate_videos" +TIME_TO_FIRST_TOKEN = "gen_ai.performance.time_to_first_token" +RELATIVE_TIME_TO_FIRST_TOKEN = "gen_ai.performance.relative_time_to_first_token" +LLM_RESPONSE_DURATION = "llm.response.duration" +GEN_AI_RESPONSE_DURATION = "gen_ai.response.duration" def content_wrapper(tracer: Tracer) -> Callable[..., Any]: @@ -33,13 +38,12 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, args, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() set_response_attributes(span, response) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -63,13 +67,12 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . ) as span: try: set_request_attributes(span, args, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() set_response_attributes(span, response) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -94,9 +97,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, args, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time) + return StreamingWrapper(span=span, response=response) except Exception as e: logger.error("netra.instrumentation.google_genai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -122,9 +124,8 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, args, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time) + return AsyncStreamingWrapper(span=span, response=response) except Exception as e: logger.error("netra.instrumentation.google_genai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -147,11 +148,9 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, args, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, GEN_AI_RESPONSE_DURATION) set_response_attributes(span, response) - span.set_attribute("gen_ai.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -173,11 +172,9 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . ) as span: try: set_request_attributes(span, args, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, GEN_AI_RESPONSE_DURATION) set_response_attributes(span, response) - span.set_attribute("gen_ai.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -199,11 +196,9 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, args, kwargs) - start_time = time.time() response = wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, GEN_AI_RESPONSE_DURATION) set_response_attributes(span, response) - span.set_attribute("gen_ai.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -225,11 +220,9 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . ) as span: try: set_request_attributes(span, args, kwargs) - start_time = time.time() response = await wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, GEN_AI_RESPONSE_DURATION) set_response_attributes(span, response) - span.set_attribute("gen_ai.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -242,9 +235,8 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . class StreamingWrapper: - def __init__(self, span: Span, response: Iterator[Any], start_time: float) -> None: + def __init__(self, span: Span, response: Iterator[Any]) -> None: self._span = span - self._start_time = start_time self._buffer: dict[Any, Any] = {"chunk": None, "content": ""} self._chunk: Any = None self._response = response @@ -271,23 +263,22 @@ def _process_chunk(self, chunk: Any) -> None: if isinstance(text, str): if text and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute("gen_ai.performance.time_to_first_token", time.time() - self._start_time) + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing(self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True) self._buffer["content"] += text self._span.add_event("llm.content.completion.chunk") def _finalize_span(self) -> None: - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._buffer) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() class AsyncStreamingWrapper: - def __init__(self, span: Span, response: AsyncIterator[Any], start_time: float) -> None: + def __init__(self, span: Span, response: AsyncIterator[Any]) -> None: self._span = span - self._start_time = start_time self._buffer: dict[Any, Any] = {"chunk": None, "content": ""} self._response = response self._first_content_recorded: bool = False @@ -313,14 +304,14 @@ def _process_chunk(self, chunk: Any) -> None: if isinstance(text, str): if text and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute("gen_ai.performance.time_to_first_token", time.time() - self._start_time) + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing(self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True) self._buffer["content"] += text self._span.add_event("llm.content.completion.chunk") def _finalize_span(self) -> None: - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._buffer) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() diff --git a/netra/instrumentation/groq/wrappers.py b/netra/instrumentation/groq/wrappers.py index 514e65a..ecfc190 100644 --- a/netra/instrumentation/groq/wrappers.py +++ b/netra/instrumentation/groq/wrappers.py @@ -13,19 +13,22 @@ set_response_attributes, should_suppress_instrumentation, ) +from netra.instrumentation.utils import record_span_timing logger = logging.getLogger(__name__) CHAT_SPAN_NAME = "groq.chat" +TIME_TO_FIRST_TOKEN = "gen_ai.performance.time_to_first_token" +RELATIVE_TIME_TO_FIRST_TOKEN = "gen_ai.performance.relative_time_to_first_token" +LLM_RESPONSE_DURATION = "llm.response.duration" class StreamingWrapper(ObjectProxy): # type: ignore[misc] """Wrapper for streaming responses (OpenAI-style).""" - def __init__(self, span: Span, response: Iterator[Any], start_time: float, request_kwargs: Dict[str, Any]) -> None: + def __init__(self, span: Span, response: Iterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} self._first_content_recorded: bool = False @@ -70,8 +73,10 @@ def _process_chunk(self, chunk: Any) -> None: if content: if not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) message = self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -89,10 +94,8 @@ def _process_chunk(self, chunk: Any) -> None: self._span.add_event("llm.content.completion.chunk") def _finalize_span(self) -> None: - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() @@ -100,12 +103,9 @@ def _finalize_span(self) -> None: class AsyncStreamingWrapper(ObjectProxy): # type: ignore[misc] """Async wrapper for streaming responses (OpenAI-style).""" - def __init__( - self, span: Span, response: AsyncIterator[Any], start_time: float, request_kwargs: Dict[str, Any] - ) -> None: + def __init__(self, span: Span, response: AsyncIterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} self._first_content_recorded: bool = False @@ -150,8 +150,10 @@ def _process_chunk(self, chunk: Any) -> None: if content: if not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) message = self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -169,10 +171,8 @@ def _process_chunk(self, chunk: Any) -> None: self._span.add_event("llm.content.completion.chunk") def _finalize_span(self) -> None: - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() @@ -190,9 +190,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return StreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: # pylint: disable=broad-except span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) @@ -207,13 +206,13 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = wrapped(*args, **kwargs) + end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = time.time() - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) except Exception: logger.warning("Failed to set response attributes for Groq span", exc_info=True) @@ -235,9 +234,8 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return AsyncStreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: # pylint: disable=broad-except span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) @@ -252,13 +250,13 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, . ) as span: try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = await wrapped(*args, **kwargs) + end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = time.time() - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) except Exception: logger.warning("Failed to set response attributes for Groq span", exc_info=True) diff --git a/netra/instrumentation/litellm/wrappers.py b/netra/instrumentation/litellm/wrappers.py index 4a1e93e..d0d4c5a 100644 --- a/netra/instrumentation/litellm/wrappers.py +++ b/netra/instrumentation/litellm/wrappers.py @@ -14,6 +14,7 @@ set_response_attributes, should_suppress_instrumentation, ) +from netra.instrumentation.utils import record_span_timing logger = logging.getLogger(__name__) @@ -21,6 +22,9 @@ EMBEDDING_SPAN_NAME = "litellm.embedding" IMAGE_GENERATION_SPAN_NAME = "litellm.image_generation" RESPONSE_SPAN_NAME = "litellm.responses" +TIME_TO_FIRST_TOKEN = "gen_ai.performance.time_to_first_token" +RELATIVE_TIME_TO_FIRST_TOKEN = "gen_ai.performance.relative_time_to_first_token" +LLM_RESPONSE_DURATION = "llm.response.duration" def is_streaming_response(response: Any) -> bool: @@ -47,9 +51,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k context = context_api.attach(set_span_in_context(span)) try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return StreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) @@ -64,12 +67,13 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -97,9 +101,8 @@ async def wrapper( context = context_api.attach(set_span_in_context(span)) try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return AsyncStreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) @@ -113,12 +116,13 @@ async def wrapper( ) as span: set_request_attributes(span, kwargs, "chat") try: - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -143,9 +147,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "response") - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return StreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.openai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -160,12 +163,13 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, kwargs, "response") - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -191,9 +195,8 @@ async def wrapper(wrapped: Callable[..., Awaitable[Any]], instance: Any, args: A try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "response") - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return AsyncStreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.openai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -208,12 +211,13 @@ async def wrapper(wrapped: Callable[..., Awaitable[Any]], instance: Any, args: A ) as span: try: set_request_attributes(span, kwargs, "response") - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -236,12 +240,10 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: set_request_attributes(span, kwargs, "embedding") try: - start_time = time.time() response = wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, LLM_RESPONSE_DURATION) response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -265,12 +267,10 @@ async def wrapper( ) as span: set_request_attributes(span, kwargs, "embedding") try: - start_time = time.time() response = await wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, LLM_RESPONSE_DURATION) response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -292,12 +292,10 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: set_request_attributes(span, kwargs, "image_generation") try: - start_time = time.time() response = wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, LLM_RESPONSE_DURATION) response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -321,12 +319,10 @@ async def wrapper( ) as span: set_request_attributes(span, kwargs, "image_generation") try: - start_time = time.time() response = await wrapped(*args, **kwargs) - end_time = time.time() + record_span_timing(span, LLM_RESPONSE_DURATION) response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - span.set_attribute("llm.response.duration", end_time - start_time) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -339,12 +335,12 @@ async def wrapper( class StreamingWrapper(ObjectProxy): # type: ignore[misc] """Wrapper for streaming responses""" - def __init__(self, span: Span, response: Iterator[Any], start_time: float, request_kwargs: Dict[str, Any]) -> None: + def __init__(self, span: Span, response: Iterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} + self._first_content_recorded: bool = False def _is_chat(self) -> bool: """Determine if the request is a chat request.""" @@ -401,6 +397,13 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = None if isinstance(delta, dict) and delta.get("content"): content_piece = str(delta.get("content", "")) + if content_piece and not self._first_content_recorded: + self._first_content_recorded = True + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True + ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} ) @@ -413,6 +416,11 @@ def _process_chunk(self, chunk: Any) -> None: self._complete_response["usage"] = chunk_dict["usage"] # Response API + if chunk_dict.get("delta") and not self._first_content_recorded: + self._first_content_recorded = True + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing(self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True) if chunk_dict.get("response"): response = chunk_dict.get("response", {}) if response.get("status") == "completed": @@ -432,10 +440,8 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span when streaming is complete""" - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() @@ -443,14 +449,12 @@ def _finalize_span(self) -> None: class AsyncStreamingWrapper(ObjectProxy): # type: ignore[misc] """Async wrapper for streaming responses""" - def __init__( - self, span: Span, response: AsyncIterator[Any], start_time: float, request_kwargs: Dict[str, Any] - ) -> None: + def __init__(self, span: Span, response: AsyncIterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} + self._first_content_recorded: bool = False def _is_chat(self) -> bool: """Determine if the request is a chat request.""" @@ -507,6 +511,13 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = None if isinstance(delta, dict) and delta.get("content"): content_piece = str(delta.get("content", "")) + if content_piece and not self._first_content_recorded: + self._first_content_recorded = True + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True + ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} ) @@ -519,6 +530,11 @@ def _process_chunk(self, chunk: Any) -> None: self._complete_response["usage"] = chunk_dict["usage"] # Response API + if chunk_dict.get("delta") and not self._first_content_recorded: + self._first_content_recorded = True + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing(self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True) if chunk_dict.get("response"): response = chunk_dict.get("response", {}) if response.get("status") == "completed": @@ -538,9 +554,7 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span when streaming is complete""" - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() diff --git a/netra/instrumentation/openai/wrappers.py b/netra/instrumentation/openai/wrappers.py index 971d98e..0c72875 100644 --- a/netra/instrumentation/openai/wrappers.py +++ b/netra/instrumentation/openai/wrappers.py @@ -14,6 +14,7 @@ set_response_attributes, should_suppress_instrumentation, ) +from netra.instrumentation.utils import record_span_timing logger = logging.getLogger(__name__) @@ -21,6 +22,9 @@ CHAT_SPAN_NAME = "openai.chat" EMBEDDING_SPAN_NAME = "openai.embedding" RESPONSE_SPAN_NAME = "openai.response" +TIME_TO_FIRST_TOKEN = "gen_ai.performance.time_to_first_token" +RELATIVE_TIME_TO_FIRST_TOKEN = "gen_ai.performance.relative_time_to_first_token" +LLM_RESPONSE_DURATION = "llm.response.duration" def chat_wrapper(tracer: Tracer) -> Callable[..., Any]: @@ -36,9 +40,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return StreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.openai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -54,14 +57,13 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -87,9 +89,8 @@ async def wrapper( try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return AsyncStreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.openai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -104,14 +105,13 @@ async def wrapper( ) as span: try: set_request_attributes(span, kwargs, "chat") - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -134,14 +134,13 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, kwargs, "embedding") - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -166,14 +165,13 @@ async def wrapper( ) as span: try: set_request_attributes(span, kwargs, "embedding") - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -199,9 +197,8 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "response") - start_time = time.time() response = wrapped(*args, **kwargs) - return StreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return StreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.openai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -216,14 +213,13 @@ def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], k ) as span: try: set_request_attributes(span, kwargs, "response") - start_time = time.time() response = wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -249,9 +245,8 @@ async def wrapper(wrapped: Callable[..., Awaitable[Any]], instance: Any, args: A try: context = context_api.attach(set_span_in_context(span)) set_request_attributes(span, kwargs, "response") - start_time = time.time() response = await wrapped(*args, **kwargs) - return AsyncStreamingWrapper(span=span, response=response, start_time=start_time, request_kwargs=kwargs) + return AsyncStreamingWrapper(span=span, response=response, request_kwargs=kwargs) except Exception as e: logger.error("netra.instrumentation.openai: %s", e) span.set_status(Status(StatusCode.ERROR, str(e))) @@ -266,14 +261,13 @@ async def wrapper(wrapped: Callable[..., Awaitable[Any]], instance: Any, args: A ) as span: try: set_request_attributes(span, kwargs, "response") - start_time = time.time() response = await wrapped(*args, **kwargs) end_time = time.time() response_dict = model_as_dict(response) set_response_attributes(span, response_dict) - duration = end_time - start_time - span.set_attribute("llm.response.duration", duration) - span.set_attribute("gen_ai.performance.time_to_first_token", duration) + record_span_timing(span, LLM_RESPONSE_DURATION, end_time) + record_span_timing(span, TIME_TO_FIRST_TOKEN, end_time) + record_span_timing(span, RELATIVE_TIME_TO_FIRST_TOKEN, end_time, use_root_span=True) span.set_status(Status(StatusCode.OK)) return response except Exception as e: @@ -287,10 +281,9 @@ async def wrapper(wrapped: Callable[..., Awaitable[Any]], instance: Any, args: A class StreamingWrapper(ObjectProxy): # type: ignore[misc] """Wrapper for streaming responses""" - def __init__(self, span: Span, response: Iterator[Any], start_time: float, request_kwargs: Dict[str, Any]) -> None: + def __init__(self, span: Span, response: Iterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} self._first_content_recorded: bool = False @@ -352,8 +345,10 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = str(delta.get("content", "")) if content_piece and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -369,7 +364,9 @@ def _process_chunk(self, chunk: Any) -> None: # Response API if chunk_dict.get("delta") and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute("gen_ai.performance.time_to_first_token", time.time() - self._start_time) + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing(self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True) if chunk_dict.get("response"): response = chunk_dict.get("response", {}) if response.get("status") == "completed": @@ -389,10 +386,8 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span when streaming is complete""" - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() @@ -400,12 +395,9 @@ def _finalize_span(self) -> None: class AsyncStreamingWrapper(ObjectProxy): # type: ignore[misc] """Async wrapper for streaming responses""" - def __init__( - self, span: Span, response: AsyncIterator[Any], start_time: float, request_kwargs: Dict[str, Any] - ) -> None: + def __init__(self, span: Span, response: AsyncIterator[Any], request_kwargs: Dict[str, Any]) -> None: super().__init__(response) self._span = span - self._start_time = start_time self._request_kwargs = request_kwargs self._complete_response: Dict[str, Any] = {"choices": [], "model": ""} self._first_content_recorded: bool = False @@ -467,8 +459,10 @@ def _process_chunk(self, chunk: Any) -> None: content_piece = str(delta.get("content", "")) if content_piece and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute( - "gen_ai.performance.time_to_first_token", time.time() - self._start_time + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing( + self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True ) self._complete_response["choices"][index].setdefault( "message", {"role": "assistant", "content": ""} @@ -484,7 +478,9 @@ def _process_chunk(self, chunk: Any) -> None: # Response API if chunk_dict.get("delta") and not self._first_content_recorded: self._first_content_recorded = True - self._span.set_attribute("gen_ai.performance.time_to_first_token", time.time() - self._start_time) + first_token_time = time.time() + record_span_timing(self._span, TIME_TO_FIRST_TOKEN, first_token_time) + record_span_timing(self._span, RELATIVE_TIME_TO_FIRST_TOKEN, first_token_time, use_root_span=True) if chunk_dict.get("response"): response = chunk_dict.get("response", {}) if response.get("status") == "completed": @@ -504,9 +500,7 @@ def _process_chunk(self, chunk: Any) -> None: def _finalize_span(self) -> None: """Finalize span when streaming is complete""" - end_time = time.time() - duration = end_time - self._start_time + record_span_timing(self._span, LLM_RESPONSE_DURATION) set_response_attributes(self._span, self._complete_response) - self._span.set_attribute("llm.response.duration", duration) self._span.set_status(Status(StatusCode.OK)) self._span.end() diff --git a/netra/instrumentation/pydantic_ai/wrappers.py b/netra/instrumentation/pydantic_ai/wrappers.py index 83db59e..14d2db6 100644 --- a/netra/instrumentation/pydantic_ai/wrappers.py +++ b/netra/instrumentation/pydantic_ai/wrappers.py @@ -573,11 +573,8 @@ def _finalize_parent_span(self) -> None: if not self._span or not self._span.is_recording(): return - # Calculate duration - end_time = time.time() - end_time - self._start_time - # Set timing attributes + end_time = time.time() _set_timing_attributes(self._span, self._start_time, end_time) # Set response attributes if we have access to the final result diff --git a/netra/instrumentation/pydantic_ai_slim/wrappers.py b/netra/instrumentation/pydantic_ai_slim/wrappers.py index 83db59e..14d2db6 100644 --- a/netra/instrumentation/pydantic_ai_slim/wrappers.py +++ b/netra/instrumentation/pydantic_ai_slim/wrappers.py @@ -573,11 +573,8 @@ def _finalize_parent_span(self) -> None: if not self._span or not self._span.is_recording(): return - # Calculate duration - end_time = time.time() - end_time - self._start_time - # Set timing attributes + end_time = time.time() _set_timing_attributes(self._span, self._start_time, end_time) # Set response attributes if we have access to the final result diff --git a/netra/instrumentation/utils.py b/netra/instrumentation/utils.py new file mode 100644 index 0000000..c96b895 --- /dev/null +++ b/netra/instrumentation/utils.py @@ -0,0 +1,83 @@ +import logging +import time +from typing import Any, Optional + +logger = logging.getLogger(__name__) + +from opentelemetry.trace import Span + +from netra.processors.root_span_processor import RootSpanProcessor + + +def _safe_set_attribute(span: Span, key: str, value: Any, max_length: Optional[int] = None) -> bool: + """Safely set a span attribute with optional truncation and null checks. + + Args: + span: The OpenTelemetry span on which to set the attribute. + key: The attribute key. + value: The attribute value. If None, the attribute is not set. + max_length: If provided, the string representation of value is truncated + to this length before being set. + + Returns: + True if the attribute was successfully set, False otherwise. + """ + if not span.is_recording() or value is None: + return False + + try: + str_value = str(value) + if max_length and len(str_value) > max_length: + str_value = str_value[:max_length] + except Exception: + logger.warning("Failed to convert value to string for attribute '%s'", key, exc_info=True) + return False + + try: + span.set_attribute(key, str_value) + except Exception: + logger.warning("Failed to set span attribute '%s'", key, exc_info=True) + return False + return True + + +def record_span_timing( + span: Span, + attribute: str, + event_time: Optional[float] = None, + use_root_span: bool = False, +) -> bool: + """Compute elapsed time for an event and set it as a span attribute. + + Elapsed time is measured from: + - ``use_root_span=False`` (default): the start time of the given span. + - ``use_root_span=True``: the start time of the root span of the given span. + + Args: + span: The OpenTelemetry span on which to record the timing attribute. + attribute: The attribute key under which the elapsed time is stored. + event_time: The event timestamp in seconds since epoch. Defaults to + ``time.time()`` if not provided. + use_root_span: If True, elapsed time is measured from the root span's + start time instead of the given span's start time. + + Returns: + True if the timing attribute was successfully set, False if the elapsed + time could not be computed (e.g. missing start time or root span). + """ + t = event_time if event_time is not None else time.time() + start_time = None + + if not use_root_span: + start_time = getattr(span, "start_time", None) + else: + root_span = RootSpanProcessor.get_root_span(span) + if not root_span: + return False + start_time = getattr(root_span, "start_time", None) + + if not start_time: + return False + + elapsed = t - start_time / 1e9 # Convert nanoseconds to seconds + return _safe_set_attribute(span, attribute, elapsed) diff --git a/netra/processors/__init__.py b/netra/processors/__init__.py index a247ead..8891e11 100644 --- a/netra/processors/__init__.py +++ b/netra/processors/__init__.py @@ -2,6 +2,7 @@ from netra.processors.llm_trace_identifier_span_processor import LlmTraceIdentifierSpanProcessor from netra.processors.local_filtering_span_processor import LocalFilteringSpanProcessor from netra.processors.root_instrument_filter_processor import RootInstrumentFilterProcessor +from netra.processors.root_span_processor import RootSpanProcessor from netra.processors.scrubbing_span_processor import ScrubbingSpanProcessor from netra.processors.session_span_processor import SessionSpanProcessor from netra.processors.span_io_processor import SpanIOProcessor @@ -14,4 +15,5 @@ "ScrubbingSpanProcessor", "LocalFilteringSpanProcessor", "RootInstrumentFilterProcessor", + "RootSpanProcessor", ] diff --git a/netra/processors/llm_trace_identifier_span_processor.py b/netra/processors/llm_trace_identifier_span_processor.py index b29ca2c..50b4d51 100644 --- a/netra/processors/llm_trace_identifier_span_processor.py +++ b/netra/processors/llm_trace_identifier_span_processor.py @@ -1,9 +1,12 @@ import logging import threading -from typing import Dict, Optional, Set +from typing import Optional, Set -from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor -from opentelemetry.trace import SpanContext +from opentelemetry import context as context_api +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.trace import Span, SpanContext + +from netra.processors.root_span_processor import RootSpanProcessor logger = logging.getLogger(__name__) @@ -44,7 +47,12 @@ def __init__( Raises: ValueError: If any attribute key is empty. + + NOTE: This processor must be registered before RootSpanProcessor so that + ``_is_root_span_ending`` can query the mapping before it is cleaned up by + RootSpanProcessor.on_end. """ + self._request_model_key = request_model_attribute_key self._response_model_key = response_model_attribute_key self._root_marker_key = root_marker_attribute_key @@ -53,47 +61,26 @@ def __init__( self._lock = threading.Lock() # Trace state tracking - self._root_spans_by_trace: Dict[int, Span] = {} - self._root_span_ids_by_trace: Dict[int, int] = {} self._marked_traces: Set[int] = set() - def on_start(self, span: Span, parent_context: Optional[object] = None) -> None: - """ - Handle span start events. - - Registers "local root spans" for trace tracking. A local root is either: - - A true root span (no parent), OR - - The first span seen for a trace in this process (handles remote parent case) + def on_start( + self, + span: Span, + parent_context: Optional[context_api.Context] = None, + ) -> None: + """No-op. LLM attributes are only available once the span completes, so all processing is deferred to on_end. Args: - span: The span that has started. - parent_context: Optional parent context (unused). + span: The span that was started. + parent_context: The parent context. """ - try: - span_context = self._get_span_context(span) - if span_context is None: - return - - trace_id = span_context.trace_id - span_id = span_context.span_id - - with self._lock: - # Register the first span we see for this trace as the local root. - # Using setdefault ensures we only register the first span, handling - # both true roots and distributed tracing (remote parent) cases. - if trace_id not in self._root_spans_by_trace: - self._root_spans_by_trace[trace_id] = span - self._root_span_ids_by_trace[trace_id] = span_id - - except Exception as e: - logger.warning("Error processing span start: %s", e, exc_info=True) def on_end(self, span: ReadableSpan) -> None: """ Handle span end events. Checks if the span contains LLM call attributes and marks the root span - if found. Cleans up trace state when root spans complete. + if found. Cleans up marked-trace state when the root span completes. Args: span: The span that has ended. @@ -138,34 +125,11 @@ def shutdown(self) -> None: """ Shutdown the processor and release all resources. - Clears all internal state and releases locks. + Clears all internal state """ with self._lock: - self._root_spans_by_trace.clear() - self._root_span_ids_by_trace.clear() self._marked_traces.clear() - @staticmethod - def _is_root_span(span: Span) -> bool: - """ - Determine if a span is a true root span (no valid parent). - - Args: - span: The span to check. - - Returns: - True if the span has no valid parent, False otherwise. - """ - parent = getattr(span, "parent", None) - if parent is None: - return True - - # is_valid can be either a property (bool) or a method (callable) - is_valid = getattr(parent, "is_valid", False) - if callable(is_valid): - return not is_valid() - return not is_valid - @staticmethod def _get_span_context(span: ReadableSpan) -> Optional[SpanContext]: """ @@ -194,9 +158,7 @@ def _is_root_span_ending(self, trace_id: int, span_id: int) -> bool: Returns: True if this is the root span ending, False otherwise. """ - with self._lock: - root_span_id = self._root_span_ids_by_trace.get(trace_id) - return root_span_id is not None and span_id == root_span_id + return RootSpanProcessor.is_root_span_for_trace(trace_id, span_id) def _is_trace_marked(self, trace_id: int) -> bool: """ @@ -237,27 +199,27 @@ def _mark_root_span(self, trace_id: int) -> None: Args: trace_id: The trace ID whose root span should be marked. """ - with self._lock: - root_span = self._root_spans_by_trace.get(trace_id) - self._marked_traces.add(trace_id) + root_span = RootSpanProcessor.get_root_span_by_trace_id(trace_id) + if root_span is None: + return - if root_span is None: - return + is_recording = getattr(root_span, "is_recording", lambda: False)() + if not is_recording: + logger.debug("Root span not recording for trace_id=%s", trace_id) + return - is_recording = getattr(root_span, "is_recording", lambda: False)() - if not is_recording: - logger.debug("Root span not recording for trace_id=%s", trace_id) - return + try: + root_span.set_attribute(self._root_marker_key, True) + except Exception as e: + logger.warning( + "Failed to mark root span for trace_id=%s: %s", + trace_id, + e, + exc_info=True, + ) - try: - root_span.set_attribute(self._root_marker_key, True) - except Exception as e: - logger.warning( - "Failed to mark root span for trace_id=%s: %s", - trace_id, - e, - exc_info=True, - ) + with self._lock: + self._marked_traces.add(trace_id) def _cleanup_trace(self, trace_id: int) -> None: """ @@ -267,6 +229,4 @@ def _cleanup_trace(self, trace_id: int) -> None: trace_id: The trace ID to clean up. """ with self._lock: - self._root_spans_by_trace.pop(trace_id, None) - self._root_span_ids_by_trace.pop(trace_id, None) self._marked_traces.discard(trace_id) diff --git a/netra/processors/root_span_processor.py b/netra/processors/root_span_processor.py new file mode 100644 index 0000000..690ea68 --- /dev/null +++ b/netra/processors/root_span_processor.py @@ -0,0 +1,193 @@ +import logging +import threading +from typing import Dict, Optional + +from opentelemetry import context as context_api +from opentelemetry import trace +from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor +from opentelemetry.trace import Span + +logger = logging.getLogger(__name__) + + +class RootSpanProcessor(SpanProcessor): # type: ignore[misc] + """ + A SpanProcessor that tracks root spans using an in-memory dictionary + keyed by trace_id. + + This implementation identifies root spans at span start and stores them + in a global mapping. The mapping is cleaned up when the corresponding + root span ends. + + The dict is a class-level variable so that static lookup helpers can + resolve root spans without requiring a processor instance. All mutations + are protected by the class-level ``_lock``. + """ + + _root_spans: Dict[int, Span] = {} + _lock: threading.Lock = threading.Lock() + + @staticmethod + def get_root_span_by_trace_id(trace_id: int) -> Optional[Span]: + """ + Retrieve the root span associated with a given trace ID. + + Args: + trace_id: The trace identifier. + + Returns: + The root Span if present, otherwise None. + """ + try: + with RootSpanProcessor._lock: + return RootSpanProcessor._root_spans.get(trace_id) + except Exception: + logger.debug("Failed to get root span", exc_info=True) + return None + + @staticmethod + def is_root_span_for_trace(trace_id: int, span_id: int) -> bool: + """ + Check whether the given span_id is the root span for the given trace_id. + + Args: + trace_id: The trace identifier. + span_id: The span identifier to test. + + Returns: + True if the span is the recorded root span for this trace, False otherwise. + """ + try: + with RootSpanProcessor._lock: + root = RootSpanProcessor._root_spans.get(trace_id) + if root is None: + return False + root_ctx = root.get_span_context() + return root_ctx is not None and root_ctx.span_id == span_id + except Exception: + logger.debug("RootSpanProcessor: Failed to check root span", exc_info=True) + return False + + @staticmethod + def get_root_span(span: Span) -> Optional[Span]: + """ + Resolve the root span for a given span. + + Args: + span: The span whose root span is to be determined. + + Returns: + The root Span if available, otherwise None. + """ + try: + span_ctx = span.get_span_context() + if not span_ctx or not span_ctx.is_valid: + return None + + return RootSpanProcessor.get_root_span_by_trace_id(span_ctx.trace_id) + except Exception: + logger.debug("RootSpanProcessor: Failed to resolve root span", exc_info=True) + return None + + def _is_root_span( + self, + parent_context: Optional[context_api.Context], + ) -> bool: + """ + Determine whether a span is a root span. + + A span is considered root if: + - There is no parent span, or + - The parent span context is invalid. + + Args: + parent_context: The parent context passed to on_start. + + Returns: + True if the span is a root span, False otherwise. + """ + parent_span = trace.get_current_span(parent_context) + if parent_span is None: + return True + + parent_span_ctx = parent_span.get_span_context() + return parent_span_ctx is None or not parent_span_ctx.is_valid + + def on_start( + self, + span: Span, + parent_context: Optional[context_api.Context], + ) -> None: + """ + Hook executed when a span starts. + + If the span is identified as a root span, it is stored in the + internal mapping using its trace_id. + + Args: + span: The span being started. + parent_context: The parent context of the span. + """ + try: + span_ctx = span.get_span_context() + if span_ctx is None or not span_ctx.is_valid: + return + + if not self._is_root_span(parent_context): + return + + with self._lock: + self._root_spans.setdefault(span_ctx.trace_id, span) + + except Exception: + logger.debug("RootSpanProcessor: error in on_start", exc_info=True) + + def on_end(self, span: ReadableSpan) -> None: + """ + Hook executed when a span ends. + + If the ending span is the root span for its trace, it is removed + from the internal mapping to prevent stale entries. + + Args: + span: The span being ended. + """ + try: + span_ctx = span.get_span_context() + if span_ctx is None or not span_ctx.is_valid: + return + + with self._lock: + root = self._root_spans.get(span_ctx.trace_id) + if root is not None: + root_ctx = root.get_span_context() + if root_ctx is not None and root_ctx.span_id == span_ctx.span_id: + self._root_spans.pop(span_ctx.trace_id, None) + + except Exception: + logger.debug("RootSpanProcessor: error in on_end", exc_info=True) + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """ + Force flush any pending data. + + This processor holds no buffered export data, so this is a no-op + that always returns True. + + Args: + timeout_millis: Maximum time to wait in milliseconds (unused). + + Returns: + Always True. + """ + return True + + def shutdown(self) -> None: + """ + Shutdown the processor and release all tracked root spans. + + Clears the class-level mapping so that stale entries do not leak + across test runs or provider resets. + """ + with self._lock: + self._root_spans.clear() diff --git a/netra/processors/span_io_processor.py b/netra/processors/span_io_processor.py index ddca677..c192825 100644 --- a/netra/processors/span_io_processor.py +++ b/netra/processors/span_io_processor.py @@ -1,7 +1,6 @@ import json import logging import re -import threading from typing import Any, Callable, Dict, Optional from opentelemetry import context as otel_context @@ -82,62 +81,23 @@ class SpanIOProcessor(SpanProcessor): # type: ignore[misc] """Normalises ``input`` / ``output`` attributes and remaps ``traceloop.*`` keys to ``netra.*`` on all spans. - Also tracks the root span per trace (the first span seen for each trace_id) - so that callers can set input/output attributes directly on the trace root. - All interception is done in ``on_start`` via a per-span closure that wraps ``span.set_attribute``, following the same pattern as ``InstrumentationSpanProcessor``. """ - def __init__(self) -> None: - self._lock = threading.Lock() - self._root_spans: Dict[int, Span] = {} - self._root_span_ids: Dict[int, int] = {} - - def set_root_attribute(self, trace_id: int, key: str, value: Any) -> None: - """Set an attribute on the root span for the given trace. - - Args: - trace_id: The trace ID whose root span should be updated. - key: Attribute key to set. - value: Attribute value to set. - """ - with self._lock: - span = self._root_spans.get(trace_id) - if span is not None and not getattr(span, "is_recording", lambda: False)(): - span = None - - if span is None: - logger.warning("No root span found for trace_id=%s; cannot set '%s'", trace_id, key) - return - - try: - span.set_attribute(key, value) - except Exception: - logger.exception("Failed to set attribute '%s' on root span", key) - def on_start( self, span: Span, parent_context: Optional[otel_context.Context] = None, ) -> None: """Wrap the span's ``set_attribute`` to intercept and normalise writes. - Also registers the first span seen for each trace as the root span. Args: span: The span that was started. parent_context: The parent context (unused). """ try: - span_context = span.get_span_context() - if span_context is not None and span_context.is_valid: - trace_id = span_context.trace_id - with self._lock: - if trace_id not in self._root_spans: - self._root_spans[trace_id] = span - self._root_span_ids[trace_id] = span_context.span_id - attrs = span.attributes or {} if "input" not in attrs: span.set_attribute("input", "") @@ -148,18 +108,11 @@ def on_start( logger.exception("SpanIOProcessor.on_start failed") def on_end(self, span: ReadableSpan) -> None: - """Clean up root span tracking when the root span ends.""" - try: - span_context = span.get_span_context() - if span_context is None: - return - trace_id = span_context.trace_id - with self._lock: - if self._root_span_ids.get(trace_id) == span_context.span_id: - self._root_spans.pop(trace_id, None) - self._root_span_ids.pop(trace_id, None) - except Exception: - logger.exception("SpanIOProcessor.on_end failed") + """No-op. All attribute normalisation is applied eagerly via the set_attribute wrapper installed in on_start. + + Args: + span: The span that has ended. + """ def force_flush(self, timeout_millis: int = 30000) -> bool: """No-op flush. diff --git a/netra/session_manager.py b/netra/session_manager.py index 5c37828..9522016 100644 --- a/netra/session_manager.py +++ b/netra/session_manager.py @@ -38,9 +38,6 @@ class SessionManager: # Maintained for spans registered via SessionManager (e.g., SpanWrapper) _active_spans: List[trace.Span] = [] - # Reference to SpanIOProcessor for root span lookup keyed by trace_id - _span_io_processor: Optional[Any] = None - @classmethod def set_current_span(cls, span: Optional[trace.Span]) -> None: """ @@ -468,35 +465,30 @@ def set_root_output(cls, value: Any) -> None: except Exception: logger.exception("SessionManager.set_root_output: failed to set output attribute") - @classmethod - def set_span_io_processor(cls, processor: Any) -> None: - """Store a reference to the SpanIOProcessor for root span lookup. - - Args: - processor: The SpanIOProcessor instance. - """ - cls._span_io_processor = processor - @classmethod def set_attribute_on_root_span(cls, attr_key: str, attr_value: Any) -> None: """Set an attribute on the root span of the current trace. - Resolves the root span via SpanIOProcessor (keyed by trace_id of the - current active span), which is correct under concurrent traces. Args: attr_key: Key for the attribute to set attr_value: Value for the attribute to set """ try: - if cls._span_io_processor is None: - logger.warning("SpanIOProcessor not initialised; cannot set root attribute '%s'", attr_key) - return + from netra.processors.root_span_processor import RootSpanProcessor + span_ctx = trace.get_current_span().get_span_context() if not span_ctx.is_valid: logger.warning("set_attribute_on_root_span called outside any active span context") return - cls._span_io_processor.set_root_attribute(span_ctx.trace_id, attr_key, attr_value) + + trace_id = span_ctx.trace_id + root_span = RootSpanProcessor.get_root_span_by_trace_id(trace_id) + if not root_span: + # Format as 32-character zero-padded lowercase hex + logger.warning(f"Cannot find root span for trace_id: {trace_id:032x}") + return + root_span.set_attribute(attr_key, attr_value) except Exception: logger.exception("Failed to set attribute '%s' on root span", attr_key) diff --git a/netra/tracer.py b/netra/tracer.py index 8f4d7ca..f7d59e0 100644 --- a/netra/tracer.py +++ b/netra/tracer.py @@ -99,6 +99,7 @@ def _setup_tracer(self) -> None: LlmTraceIdentifierSpanProcessor, LocalFilteringSpanProcessor, RootInstrumentFilterProcessor, + RootSpanProcessor, ScrubbingSpanProcessor, SessionSpanProcessor, SpanIOProcessor, @@ -110,13 +111,13 @@ def _setup_tracer(self) -> None: provider.add_span_processor(LocalFilteringSpanProcessor()) provider.add_span_processor(InstrumentationSpanProcessor()) provider.add_span_processor(SessionSpanProcessor()) - span_io_processor = SpanIOProcessor() - provider.add_span_processor(span_io_processor) + provider.add_span_processor(SpanIOProcessor()) provider.add_span_processor(LlmTraceIdentifierSpanProcessor()) - from netra.session_manager import SessionManager - - SessionManager.set_span_io_processor(span_io_processor) + # Adding RootSpanProcessor after LlmTraceIdentifierSpanProcessor + # to ensure the on_end function in LlmTraceIdentifierSpanProcessor + # can get root_span correctly using RootSpanProcessor + provider.add_span_processor(RootSpanProcessor()) if self.cfg.enable_scrubbing: provider.add_span_processor(ScrubbingSpanProcessor()) # type: ignore[no-untyped-call] diff --git a/netra/version.py b/netra/version.py index 939e02b..2db3392 100644 --- a/netra/version.py +++ b/netra/version.py @@ -1 +1 @@ -__version__ = "0.1.80" +__version__ = "0.1.82" diff --git a/pyproject.toml b/pyproject.toml index c6cf0b4..2bebdcd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "netra-sdk" -version = "0.1.80" +version = "0.1.82" description = "A Python SDK for AI application observability that provides OpenTelemetry-based monitoring, tracing, and PII protection for LLM and vector database applications. Enables easy instrumentation, session tracking, and privacy-focused data collection for AI systems in production environments." authors = [ {name = "Sooraj Thomas",email = "sooraj@keyvalue.systems"}