diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dbdcf0..6c134bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,24 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on Keep a Changelog and this project adheres to Semantic Versioning. -## [0.1.82] - 2026-04-10 - -- Add relative_time_to_first_token attribute on LLM spans -- Create a common util function to centralize adding time duration attributes on a span -- Add time_to_first_token and relative_time_to_first_token for litellm instrumentation - - -## [0.1.81] - 2026-04-08 - -- Add a centralized span processor to manage root span handling - - -## [0.1.80] - 2026-04-06 +## [0.1.80] - 2026-04-10 - Added input/output attributes across LLM, traceloop, and custom spans - Added utility function to explicitly set input/output attributes on the active span - Added utility function to explicitly set input/output attributes on the root span - Move serialization logic to SessionManager +- Added custom instrumentation for httpx and request libraries +- Add a centralized span processor to manage root span handling +- Add relative_time_to_first_token attribute on LLM spans +- Create a common util function to centralize adding time duration attributes on a span +- Add time_to_first_token and relative_time_to_first_token for litellm instrumentation ## [0.1.79] - 2026-04-02 @@ -248,4 +241,4 @@ The format is based on Keep a Changelog and this project adheres to Semantic Ver - Added utility to set input and output data for any active span in a trace -[0.1.82]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main +[0.1.80]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main diff --git a/netra/instrumentation/__init__.py b/netra/instrumentation/__init__.py index d55eea3..c5e70cc 100644 --- a/netra/instrumentation/__init__.py +++ b/netra/instrumentation/__init__.py @@ -460,9 +460,9 @@ def init_httpx_instrumentation() -> bool: """ try: if is_package_installed("httpx"): - from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor + from netra.instrumentation.httpx import HTTPXInstrumentor - instrumentor = HTTPXClientInstrumentor() + instrumentor = HTTPXInstrumentor() if not instrumentor.is_instrumented_by_opentelemetry: instrumentor.instrument() return True @@ -1111,7 +1111,7 @@ def init_requests_instrumentation() -> bool: """Initialize requests instrumentation.""" try: if is_package_installed("requests"): - from opentelemetry.instrumentation.requests import RequestsInstrumentor + from netra.instrumentation.requests import RequestsInstrumentor instrumentor = RequestsInstrumentor() if not instrumentor.is_instrumented_by_opentelemetry: diff --git a/netra/instrumentation/httpx/__init__.py b/netra/instrumentation/httpx/__init__.py index 84603aa..d1537b0 100644 --- a/netra/instrumentation/httpx/__init__.py +++ b/netra/instrumentation/httpx/__init__.py @@ -1,545 +1,57 @@ -from __future__ import annotations - -import functools import logging -import types -from timeit import default_timer -from typing import Any, Callable, Collection, Dict, Optional, Union -from urllib.parse import urlparse +from typing import Any, Collection -import httpx -from opentelemetry.instrumentation._semconv import ( - HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, - HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, - _client_duration_attrs_new, - _client_duration_attrs_old, - _filter_semconv_duration_attrs, - _get_schema_url, - _OpenTelemetrySemanticConventionStability, - _OpenTelemetryStabilitySignalType, - _report_new, - _report_old, - _set_http_host_client, - _set_http_method, - _set_http_net_peer_name_client, - _set_http_network_protocol_version, - _set_http_peer_port_client, - _set_http_scheme, - _set_http_url, - _set_status, - _StabilityMode, -) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.instrumentation.utils import ( - is_http_instrumentation_enabled, - suppress_http_instrumentation, -) -from opentelemetry.metrics import Histogram, get_meter -from opentelemetry.propagate import inject -from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE -from opentelemetry.semconv.attributes.network_attributes import ( - NETWORK_PEER_ADDRESS, - NETWORK_PEER_PORT, -) -from opentelemetry.semconv.metrics import MetricInstruments -from opentelemetry.semconv.metrics.http_metrics import ( - HTTP_CLIENT_REQUEST_DURATION, -) -from opentelemetry.trace import SpanKind, Tracer, get_tracer -from opentelemetry.trace.span import Span -from opentelemetry.util.http import ( - ExcludeList, - get_excluded_urls, - parse_excluded_urls, - remove_url_credentials, - sanitize_method, -) -from opentelemetry.util.http.httplib import set_ip_on_next_http_connection +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import get_tracer +from wrapt import wrap_function_wrapper +from netra.instrumentation.httpx.utils import get_default_span_name from netra.instrumentation.httpx.version import __version__ +from netra.instrumentation.httpx.wrappers import async_send_wrapper, send_wrapper logger = logging.getLogger(__name__) -# Package info for httpx instrumentation _instruments = ("httpx >= 0.18.0",) -_excluded_urls_from_env = get_excluded_urls("HTTPX") - -_RequestHookT = Optional[Callable[[Span, httpx.Request], None]] -_ResponseHookT = Optional[Callable[[Span, httpx.Request, httpx.Response], None]] - - -def _set_http_status_code_attribute( - span: Span, - status_code: Union[int, str], - metric_attributes: Optional[Dict[str, Any]] = None, - sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT, -) -> None: - status_code_str = str(status_code) - try: - status_code_int = int(status_code) - except ValueError: - status_code_int = -1 - if metric_attributes is None: - metric_attributes = {} - _set_status( - span, - metric_attributes, - status_code_int, - status_code_str, - server_span=False, - sem_conv_opt_in_mode=sem_conv_opt_in_mode, - ) - - -def _instrument( - tracer: Tracer, - duration_histogram_old: Optional[Histogram], - duration_histogram_new: Optional[Histogram], - request_hook: _RequestHookT = None, - response_hook: _ResponseHookT = None, - excluded_urls: Optional[ExcludeList] = None, - sem_conv_opt_in_mode: _StabilityMode = _StabilityMode.DEFAULT, -) -> None: - """Enables tracing of all httpx calls that go through - :code:`httpx.Client.send` and :code:`httpx.AsyncClient.send`.""" - - # Instrument sync client - wrapped_send = httpx.Client.send - - @functools.wraps(wrapped_send) - def instrumented_send(self: httpx.Client, request: httpx.Request, **kwargs: Any) -> httpx.Response: - if excluded_urls and excluded_urls.url_disabled(str(request.url)): - return wrapped_send(self, request, **kwargs) - - if not is_http_instrumentation_enabled(): - return wrapped_send(self, request, **kwargs) - - return _trace_request( - tracer, - duration_histogram_old, - duration_histogram_new, - request, - wrapped_send, - self, - request_hook, - response_hook, - sem_conv_opt_in_mode, - **kwargs, - ) - - # Type ignore for dynamic attribute assignment - instrumented_send.opentelemetry_instrumentation_httpx_applied = True # type: ignore - httpx.Client.send = instrumented_send - - # Instrument async client - wrapped_async_send = httpx.AsyncClient.send - - @functools.wraps(wrapped_async_send) - async def instrumented_async_send(self: httpx.AsyncClient, request: httpx.Request, **kwargs: Any) -> httpx.Response: - if excluded_urls and excluded_urls.url_disabled(str(request.url)): - return await wrapped_async_send(self, request, **kwargs) - - if not is_http_instrumentation_enabled(): - return await wrapped_async_send(self, request, **kwargs) - - return await _trace_async_request( - tracer, - duration_histogram_old, - duration_histogram_new, - request, - wrapped_async_send, - self, - request_hook, - response_hook, - sem_conv_opt_in_mode, - **kwargs, - ) - - # Type ignore for dynamic attribute assignment - instrumented_async_send.opentelemetry_instrumentation_httpx_applied = True # type: ignore - httpx.AsyncClient.send = instrumented_async_send - - -def _trace_request( - tracer: Tracer, - duration_histogram_old: Optional[Histogram], - duration_histogram_new: Optional[Histogram], - request: httpx.Request, - send_func: Callable[..., httpx.Response], - client: httpx.Client, - request_hook: _RequestHookT, - response_hook: _ResponseHookT, - sem_conv_opt_in_mode: _StabilityMode, - **kwargs: Any, -) -> httpx.Response: - """Trace a synchronous HTTP request.""" - method = request.method - span_name = get_default_span_name(method) - url = remove_url_credentials(str(request.url)) - - span_attributes: Dict[str, Any] = {} - _set_http_method( - span_attributes, - method, - sanitize_method(method), - sem_conv_opt_in_mode, - ) - _set_http_url(span_attributes, url, sem_conv_opt_in_mode) - - metric_labels: Dict[str, Any] = {} - _set_http_method( - metric_labels, - method, - sanitize_method(method), - sem_conv_opt_in_mode, - ) - - try: - parsed_url = urlparse(url) - if parsed_url.scheme: - if _report_old(sem_conv_opt_in_mode): - _set_http_scheme(metric_labels, parsed_url.scheme, sem_conv_opt_in_mode) - if parsed_url.hostname: - _set_http_host_client(metric_labels, parsed_url.hostname, sem_conv_opt_in_mode) - _set_http_net_peer_name_client(metric_labels, parsed_url.hostname, sem_conv_opt_in_mode) - if _report_new(sem_conv_opt_in_mode): - _set_http_host_client( - span_attributes, - parsed_url.hostname, - sem_conv_opt_in_mode, - ) - span_attributes[NETWORK_PEER_ADDRESS] = parsed_url.hostname - if parsed_url.port: - _set_http_peer_port_client(metric_labels, parsed_url.port, sem_conv_opt_in_mode) - if _report_new(sem_conv_opt_in_mode): - _set_http_peer_port_client(span_attributes, parsed_url.port, sem_conv_opt_in_mode) - span_attributes[NETWORK_PEER_PORT] = parsed_url.port - except ValueError as error: - logger.error(error) - - with ( - tracer.start_as_current_span(span_name, kind=SpanKind.CLIENT, attributes=span_attributes) as span, - set_ip_on_next_http_connection(span), - ): - exception = None - if callable(request_hook): - request_hook(span, request) - - headers = dict(request.headers) - inject(headers) - request.headers.update(headers) - - with suppress_http_instrumentation(): - start_time = default_timer() - try: - result = send_func(client, request, **kwargs) - except Exception as exc: - exception = exc - result = getattr(exc, "response", None) - finally: - elapsed_time = max(default_timer() - start_time, 0) - - if isinstance(result, httpx.Response): - span_attributes_response: Dict[str, Any] = {} - _set_http_status_code_attribute( - span, - result.status_code, - metric_labels, - sem_conv_opt_in_mode, - ) - - if hasattr(result, "http_version"): - version_text = result.http_version - _set_http_network_protocol_version(metric_labels, version_text, sem_conv_opt_in_mode) - if _report_new(sem_conv_opt_in_mode): - _set_http_network_protocol_version( - span_attributes_response, - version_text, - sem_conv_opt_in_mode, - ) - - for key, val in span_attributes_response.items(): - span.set_attribute(key, val) - - if callable(response_hook): - response_hook(span, request, result) - - if exception is not None and _report_new(sem_conv_opt_in_mode): - span.set_attribute(ERROR_TYPE, type(exception).__qualname__) - metric_labels[ERROR_TYPE] = type(exception).__qualname__ - - _record_duration_metrics( - duration_histogram_old, - duration_histogram_new, - elapsed_time, - metric_labels, - sem_conv_opt_in_mode, - ) - - if exception is not None: - raise exception.with_traceback(exception.__traceback__) - - return result - -async def _trace_async_request( - tracer: Tracer, - duration_histogram_old: Optional[Histogram], - duration_histogram_new: Optional[Histogram], - request: httpx.Request, - send_func: Callable[..., Any], - client: httpx.AsyncClient, - request_hook: _RequestHookT, - response_hook: _ResponseHookT, - sem_conv_opt_in_mode: _StabilityMode, - **kwargs: Any, -) -> httpx.Response: - """Trace an asynchronous HTTP request.""" - method = request.method - span_name = get_default_span_name(method) - url = remove_url_credentials(str(request.url)) - - span_attributes: Dict[str, Any] = {} - _set_http_method( - span_attributes, - method, - sanitize_method(method), - sem_conv_opt_in_mode, - ) - _set_http_url(span_attributes, url, sem_conv_opt_in_mode) - - metric_labels: Dict[str, Any] = {} - _set_http_method( - metric_labels, - method, - sanitize_method(method), - sem_conv_opt_in_mode, - ) - - try: - parsed_url = urlparse(url) - if parsed_url.scheme: - if _report_old(sem_conv_opt_in_mode): - _set_http_scheme(metric_labels, parsed_url.scheme, sem_conv_opt_in_mode) - if parsed_url.hostname: - _set_http_host_client(metric_labels, parsed_url.hostname, sem_conv_opt_in_mode) - _set_http_net_peer_name_client(metric_labels, parsed_url.hostname, sem_conv_opt_in_mode) - if _report_new(sem_conv_opt_in_mode): - _set_http_host_client( - span_attributes, - parsed_url.hostname, - sem_conv_opt_in_mode, - ) - span_attributes[NETWORK_PEER_ADDRESS] = parsed_url.hostname - if parsed_url.port: - _set_http_peer_port_client(metric_labels, parsed_url.port, sem_conv_opt_in_mode) - if _report_new(sem_conv_opt_in_mode): - _set_http_peer_port_client(span_attributes, parsed_url.port, sem_conv_opt_in_mode) - span_attributes[NETWORK_PEER_PORT] = parsed_url.port - except ValueError as error: - logger.error(error) - - with ( - tracer.start_as_current_span(span_name, kind=SpanKind.CLIENT, attributes=span_attributes) as span, - set_ip_on_next_http_connection(span), - ): - exception = None - if callable(request_hook): - request_hook(span, request) - - headers = dict(request.headers) - inject(headers) - request.headers.update(headers) - - with suppress_http_instrumentation(): - start_time = default_timer() - try: - result = await send_func(client, request, **kwargs) - except Exception as exc: - exception = exc - result = getattr(exc, "response", None) - finally: - elapsed_time = max(default_timer() - start_time, 0) - - if isinstance(result, httpx.Response): - span_attributes_response: Dict[str, Any] = {} - _set_http_status_code_attribute( - span, - result.status_code, - metric_labels, - sem_conv_opt_in_mode, - ) - - if hasattr(result, "http_version"): - version_text = result.http_version - _set_http_network_protocol_version(metric_labels, version_text, sem_conv_opt_in_mode) - if _report_new(sem_conv_opt_in_mode): - _set_http_network_protocol_version( - span_attributes_response, - version_text, - sem_conv_opt_in_mode, - ) - - for key, val in span_attributes_response.items(): - span.set_attribute(key, val) - - if callable(response_hook): - response_hook(span, request, result) - - if exception is not None and _report_new(sem_conv_opt_in_mode): - span.set_attribute(ERROR_TYPE, type(exception).__qualname__) - metric_labels[ERROR_TYPE] = type(exception).__qualname__ - - _record_duration_metrics( - duration_histogram_old, - duration_histogram_new, - elapsed_time, - metric_labels, - sem_conv_opt_in_mode, - ) - - if exception is not None: - raise exception.with_traceback(exception.__traceback__) - - return result - - -def _record_duration_metrics( - duration_histogram_old: Optional[Histogram], - duration_histogram_new: Optional[Histogram], - elapsed_time: float, - metric_labels: Dict[str, Any], - sem_conv_opt_in_mode: _StabilityMode, -) -> None: - """Record duration metrics for HTTP requests.""" - if duration_histogram_old is not None: - duration_attrs_old = _filter_semconv_duration_attrs( - metric_labels, - _client_duration_attrs_old, - _client_duration_attrs_new, - _StabilityMode.DEFAULT, - ) - duration_histogram_old.record( - max(round(elapsed_time * 1000), 0), - attributes=duration_attrs_old, - ) - if duration_histogram_new is not None: - duration_attrs_new = _filter_semconv_duration_attrs( - metric_labels, - _client_duration_attrs_old, - _client_duration_attrs_new, - _StabilityMode.HTTP, - ) - duration_histogram_new.record(elapsed_time, attributes=duration_attrs_new) - - -def _uninstrument() -> None: - """Disables instrumentation of :code:`httpx` through this module. - - Note that this only works if no other module also patches httpx.""" - _uninstrument_from(httpx.Client) - _uninstrument_from(httpx.AsyncClient) - - -def _uninstrument_from(instr_root: Union[type, object], restore_as_bound_func: bool = False) -> None: - for instr_func_name in ("send",): - instr_func = getattr(instr_root, instr_func_name) - if not getattr( - instr_func, - "opentelemetry_instrumentation_httpx_applied", - False, - ): - continue - - original = instr_func.__wrapped__ - if restore_as_bound_func: - original = types.MethodType(original, instr_root) - setattr(instr_root, instr_func_name, original) - - -def get_default_span_name(method: str) -> str: - """ - Default implementation for name_callback, returns HTTP {method_name}. - https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/http/#name - - Args: - method: string representing HTTP method - Returns: - span name - """ - method = sanitize_method(method.strip()) - if method == "_OTHER": - return "HTTP" - return method - - -class HTTPXInstrumentor(BaseInstrumentor): # type: ignore - """An instrumentor for httpx - See `BaseInstrumentor` - """ +class HTTPXInstrumentor(BaseInstrumentor): # type: ignore[misc] + """Custom HTTPX instrumentor for Netra SDK.""" def instrumentation_dependencies(self) -> Collection[str]: + """Return the list of required instrumentation dependencies.""" return _instruments def _instrument(self, **kwargs: Any) -> None: - """Instruments httpx module + """Instrument httpx.Client.send and httpx.AsyncClient.send. Args: - **kwargs: Optional arguments - ``tracer_provider``: a TracerProvider, defaults to global - ``request_hook``: An optional callback that is invoked right after a span is created. - ``response_hook``: An optional callback which is invoked right before the span is finished processing a response. - ``excluded_urls``: A string containing a comma-delimited list of regexes used to exclude URLs from tracking - ``duration_histogram_boundaries``: A list of float values representing the explicit bucket boundaries for the duration histogram. + **kwargs: Keyword arguments passed by the instrumentation framework. + tracer_provider: Optional TracerProvider to use for creating spans. """ - semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( - _OpenTelemetryStabilitySignalType.HTTP, - ) - schema_url = _get_schema_url(semconv_opt_in_mode) - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer( - __name__, - __version__, - tracer_provider, - schema_url=schema_url, - ) - excluded_urls = kwargs.get("excluded_urls") - meter_provider = kwargs.get("meter_provider") - duration_histogram_boundaries = kwargs.get("duration_histogram_boundaries") - meter = get_meter( - __name__, - __version__, - meter_provider, - schema_url=schema_url, - ) - duration_histogram_old = None - if _report_old(semconv_opt_in_mode): - duration_histogram_old = meter.create_histogram( - name=MetricInstruments.HTTP_CLIENT_DURATION, - unit="ms", - description="measures the duration of the outbound HTTP request", - explicit_bucket_boundaries_advisory=duration_histogram_boundaries - or HTTP_DURATION_HISTOGRAM_BUCKETS_OLD, - ) - duration_histogram_new = None - if _report_new(semconv_opt_in_mode): - duration_histogram_new = meter.create_histogram( - name=HTTP_CLIENT_REQUEST_DURATION, - unit="s", - description="Duration of HTTP client requests.", - explicit_bucket_boundaries_advisory=duration_histogram_boundaries - or HTTP_DURATION_HISTOGRAM_BUCKETS_NEW, - ) - _instrument( - tracer, - duration_histogram_old, - duration_histogram_new, - request_hook=kwargs.get("request_hook"), - response_hook=kwargs.get("response_hook"), - excluded_urls=(_excluded_urls_from_env if excluded_urls is None else parse_excluded_urls(excluded_urls)), - sem_conv_opt_in_mode=semconv_opt_in_mode, - ) + try: + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + except Exception as e: + logger.error(f"Failed to initialize tracer: {e}") + return + + try: + wrap_function_wrapper("httpx", "Client.send", send_wrapper(tracer)) + wrap_function_wrapper("httpx", "AsyncClient.send", async_send_wrapper(tracer)) + except Exception as e: + logger.error(f"Failed to instrument httpx: {e}") def _uninstrument(self, **kwargs: Any) -> None: - _uninstrument() + """Uninstrument httpx.Client.send and httpx.AsyncClient.send. + + Args: + **kwargs: Keyword arguments passed by the instrumentation framework. + """ + try: + import httpx + + unwrap(httpx.Client, "send") + unwrap(httpx.AsyncClient, "send") + except (AttributeError, ModuleNotFoundError): + logger.error("Failed to uninstrument httpx") diff --git a/netra/instrumentation/httpx/utils.py b/netra/instrumentation/httpx/utils.py new file mode 100644 index 0000000..70627c4 --- /dev/null +++ b/netra/instrumentation/httpx/utils.py @@ -0,0 +1,224 @@ +import json +import logging +from typing import Any, Dict, List + +import httpx +from opentelemetry import context as context_api +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.trace import Span +from opentelemetry.util.http import remove_url_credentials, sanitize_method + +logger = logging.getLogger(__name__) + +_SENSITIVE_HEADERS = frozenset( + { + "authorization", + "cookie", + "set-cookie", + "x-api-key", + "api-key", + "x-auth-token", + "proxy-authorization", + } +) + + +def should_suppress_instrumentation() -> bool: + """Check if instrumentation should be suppressed. + + Returns: + True if the OpenTelemetry suppression key is active in the current + context, False otherwise. + """ + return context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) is True + + +def get_default_span_name(method: str) -> str: + """Derive a span name from the HTTP method. + + Args: + method: The raw HTTP method string. + + Returns: + The sanitized method (e.g. "GET") or "HTTP" for non-standard methods. + """ + method = sanitize_method(method.strip()) + if method == "_OTHER": + return "HTTP" + return method + + +def _sanitize_headers(headers: httpx.Headers) -> Dict[str, str]: + """Redact sensitive header values. + + Args: + headers: The httpx Headers mapping. + + Returns: + A new dict with sensitive values replaced by "[REDACTED]". + """ + return {k: "[REDACTED]" if k.lower() in _SENSITIVE_HEADERS else v for k, v in headers.items()} + + +def _get_request_body(request: httpx.Request) -> Any: + """Extract and deserialize the request body. + + Args: + request: The httpx Request object. + + Returns: + The parsed JSON, decoded string, binary placeholder, or None. + """ + content = request.content + if not content: + return None + try: + return json.loads(content) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + logger.debug(f"Request body is not JSON, falling back to text: {e}") + try: + return content.decode("utf-8") + except UnicodeDecodeError: + return f"" + + +def _get_response_body(response: httpx.Response) -> Any: + """Extract and deserialize the response body. + + Args: + response: The httpx Response object. + + Returns: + The parsed JSON, text content, or None. + """ + try: + return response.json() + except Exception as e: + logger.debug(f"Failed to parse response body: {e}") + try: + text = response.text + if text: + return text + except Exception as e: + logger.debug(f"Failed to parse response body: {e}") + return None + + +def set_span_input(span: Span, request: httpx.Request) -> None: + """Serialize request data and set it as the span ``input`` attribute. + + Args: + span: The active OpenTelemetry span. + request: The outgoing httpx Request. + """ + if not span.is_recording(): + return + try: + input_data: Dict[str, Any] = { + "url": remove_url_credentials(str(request.url)), + "headers": _sanitize_headers(request.headers), + } + body = _get_request_body(request) + if body is not None: + input_data["body"] = body + span.set_attribute("input", json.dumps(input_data)) + except Exception as e: + logger.error(f"Failed to set input attribute on httpx span: {e}") + + +def set_span_output(span: Span, response: httpx.Response) -> None: + """Serialize response data and set it as the span ``output`` attribute. + + Args: + span: The active OpenTelemetry span. + response: The received httpx Response. + """ + if not span.is_recording(): + return + try: + output_data: Dict[str, Any] = { + "status_code": response.status_code, + "headers": _sanitize_headers(response.headers), + } + body = _get_response_body(response) + if body is not None: + output_data["body"] = body + span.set_attribute("output", json.dumps(output_data)) + except Exception as e: + logger.error(f"Failed to set output attribute on httpx span: {e}") + + +def _parse_streaming_body(accumulated: bytes) -> Any: + """Parse accumulated streaming response bytes into a structured value. + + Handles SSE (``data: {...}``), NDJSON, plain concatenated JSON objects, + and falls back to a decoded string or a binary placeholder. + + Args: + accumulated: Raw bytes collected from the streaming response chunks. + + Returns: + A parsed JSON object or list, a plain string, or a binary-size + placeholder string if the bytes cannot be decoded as UTF-8. + """ + try: + text = accumulated.decode("utf-8") + except UnicodeDecodeError: + return f"" + + # SSE: any line starts with "data:" + lines = [ln.strip() for ln in text.splitlines() if ln.strip()] + if any(ln.startswith("data:") for ln in lines): + parsed: List[Any] = [] + for ln in lines: + if ln.startswith("data:"): + data = ln[5:].strip() + if data == "[DONE]": + continue + try: + parsed.append(json.loads(data)) + except json.JSONDecodeError: + parsed.append(data) + if parsed: + return parsed[0] if len(parsed) == 1 else parsed + + # Sequential JSON decoding: handles single JSON, NDJSON, and bare concatenated objects + decoder = json.JSONDecoder() + results: List[Any] = [] + idx = 0 + stripped = text.strip() + try: + while idx < len(stripped): + obj, end_idx = decoder.raw_decode(stripped, idx) + results.append(obj) + idx = end_idx + while idx < len(stripped) and stripped[idx] in " \t\n\r": + idx += 1 + if results and idx == len(stripped): + return results[0] if len(results) == 1 else results + except json.JSONDecodeError: + pass + + return text + + +def set_streaming_span_output(span: Span, response: httpx.Response, chunks: List[bytes]) -> None: + """Serialize accumulated streaming chunks and set them as the span ``output`` attribute. + + Args: + span: The active OpenTelemetry span. + response: The httpx Response whose headers/status are used. + chunks: Raw bytes chunks accumulated during iteration. + """ + if not span.is_recording(): + return + try: + output_data: Dict[str, Any] = { + "status_code": response.status_code, + "headers": _sanitize_headers(response.headers), + } + if chunks: + output_data["body"] = _parse_streaming_body(b"".join(chunks)) + span.set_attribute("output", json.dumps(output_data)) + except Exception as e: + logger.error(f"Failed to set streaming output attribute on httpx span: {e}") diff --git a/netra/instrumentation/httpx/version.py b/netra/instrumentation/httpx/version.py index 67612e9..d1d9f95 100644 --- a/netra/instrumentation/httpx/version.py +++ b/netra/instrumentation/httpx/version.py @@ -1 +1 @@ -__version__ = "0.28.1" +__version__ = "0.29.1" diff --git a/netra/instrumentation/httpx/wrappers.py b/netra/instrumentation/httpx/wrappers.py new file mode 100644 index 0000000..d9bc470 --- /dev/null +++ b/netra/instrumentation/httpx/wrappers.py @@ -0,0 +1,515 @@ +import logging +from collections.abc import AsyncIterator, Awaitable, Iterator +from typing import Any, Callable, Dict, List, Tuple + +from opentelemetry import context as context_api +from opentelemetry.instrumentation.utils import suppress_http_instrumentation +from opentelemetry.propagate import inject +from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.http import remove_url_credentials +from wrapt import ObjectProxy + +from netra.instrumentation.httpx.utils import ( + get_default_span_name, + set_span_input, + set_span_output, + set_streaming_span_output, + should_suppress_instrumentation, +) + +logger = logging.getLogger(__name__) + + +class _BaseStreamingWrapper(ObjectProxy): # type: ignore[misc] + """Base proxy for streaming httpx responses; finalizes the span when the stream ends.""" + + def __init__(self, response: Any, span: Span) -> None: + """Initialize the base streaming wrapper. + + Args: + response: The streaming httpx response to wrap. + span: The open OpenTelemetry span to keep alive during streaming. + """ + super().__init__(response) + self._span = span + self._chunks: List[bytes] = [] + self._finalized = False + + def _finalize_span(self) -> None: + """Write accumulated chunk data to the span output attribute and end the span. + + Idempotent — subsequent calls after the first are no-ops. + """ + if self._finalized: + return + self._finalized = True + try: + set_streaming_span_output(self._span, self.__wrapped__, self._chunks) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to finalize streaming span: %s", e) + finally: + self._span.end() + + def __del__(self) -> None: + """Finalize the span on garbage collection as a last-resort safety net.""" + self._finalize_span() + + +class StreamingWrapper(_BaseStreamingWrapper): + """Wraps a streaming httpx.Response, keeping the span open until the stream is closed. + + httpx streaming responses are consumed via ``iter_bytes()``, ``iter_text()``, + ``iter_lines()``, or ``iter_raw()`` — not via ``iter(response)``. This wrapper + proxies those methods so that each yielded chunk is captured for the span + output attribute, and overrides ``close()`` to finalize the span. + """ + + def _wrap_iter(self, inner: Iterator[Any]) -> Iterator[Any]: + """Proxy a synchronous iterator, accumulating raw bytes for the span. + + Args: + inner: The underlying iterator to wrap. + + Yields: + Each chunk from *inner* unchanged. + """ + try: + for chunk in inner: + if isinstance(chunk, bytes): + self._chunks.append(chunk) + elif isinstance(chunk, str): + self._chunks.append(chunk.encode("utf-8")) + yield chunk + except GeneratorExit: + return + except Exception as e: + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.record_exception(e) + raise + + def iter_bytes(self, *args: Any, **kwargs: Any) -> Iterator[bytes]: + """Proxy ``Response.iter_bytes``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.iter_bytes``. + **kwargs: Keyword arguments forwarded to ``Response.iter_bytes``. + + Returns: + An iterator that yields raw bytes chunks and accumulates them for + the span output attribute. + """ + return self._wrap_iter(self.__wrapped__.iter_bytes(*args, **kwargs)) + + def iter_text(self, *args: Any, **kwargs: Any) -> Iterator[str]: + """Proxy ``Response.iter_text``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.iter_text``. + **kwargs: Keyword arguments forwarded to ``Response.iter_text``. + + Returns: + An iterator that yields decoded text chunks and accumulates the + encoded bytes for the span output attribute. + """ + return self._wrap_iter(self.__wrapped__.iter_text(*args, **kwargs)) + + def iter_lines(self, *args: Any, **kwargs: Any) -> Iterator[str]: + """Proxy ``Response.iter_lines``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.iter_lines``. + **kwargs: Keyword arguments forwarded to ``Response.iter_lines``. + + Returns: + An iterator that yields line strings and accumulates the encoded + bytes for the span output attribute. + """ + return self._wrap_iter(self.__wrapped__.iter_lines(*args, **kwargs)) + + def iter_raw(self, *args: Any, **kwargs: Any) -> Iterator[bytes]: + """Proxy ``Response.iter_raw``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.iter_raw``. + **kwargs: Keyword arguments forwarded to ``Response.iter_raw``. + + Returns: + An iterator that yields raw (un-decoded) bytes chunks and + accumulates them for the span output attribute. + """ + return self._wrap_iter(self.__wrapped__.iter_raw(*args, **kwargs)) + + def __enter__(self) -> "StreamingWrapper": + """Return the wrapper itself so iteration methods are captured inside a with-block. + + Returns: + This StreamingWrapper instance. + """ + self.__wrapped__.__enter__() + return self + + def __exit__(self, *args: Any) -> None: + """Close via the wrapper so the span is finalized. + + Args: + *args: Exception info tuple (exc_type, exc_val, exc_tb) forwarded + from the context manager protocol. + """ + self.close() + + def close(self) -> None: + """Close the underlying response and finalize the span. + + Calls ``Response.close()`` on the wrapped response, then invokes + :meth:`_finalize_span` to record the accumulated output and end the span. + """ + try: + self.__wrapped__.close() + finally: + self._finalize_span() + + +class AsyncStreamingWrapper(_BaseStreamingWrapper): + """Wraps a streaming httpx.Response from an AsyncClient, keeping the span open until closed. + + Mirrors :class:`StreamingWrapper` for the async iteration methods + (``aiter_bytes``, ``aiter_text``, ``aiter_lines``, ``aiter_raw``) and + overrides ``aclose()`` to finalize the span. + """ + + async def _wrap_aiter(self, inner: AsyncIterator[Any]) -> AsyncIterator[Any]: + """Proxy an asynchronous iterator, accumulating raw bytes for the span. + + Args: + inner: The underlying async iterator to wrap. + + Yields: + Each chunk from *inner* unchanged. + """ + try: + async for chunk in inner: + if isinstance(chunk, bytes): + self._chunks.append(chunk) + elif isinstance(chunk, str): + self._chunks.append(chunk.encode("utf-8")) + yield chunk + except GeneratorExit: + return + except Exception as e: + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.record_exception(e) + raise + + def aiter_bytes(self, *args: Any, **kwargs: Any) -> AsyncIterator[bytes]: + """Proxy ``Response.aiter_bytes``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.aiter_bytes``. + **kwargs: Keyword arguments forwarded to ``Response.aiter_bytes``. + + Returns: + An async iterator that yields raw bytes chunks and accumulates + them for the span output attribute. + """ + return self._wrap_aiter(self.__wrapped__.aiter_bytes(*args, **kwargs)) + + def aiter_text(self, *args: Any, **kwargs: Any) -> AsyncIterator[str]: + """Proxy ``Response.aiter_text``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.aiter_text``. + **kwargs: Keyword arguments forwarded to ``Response.aiter_text``. + + Returns: + An async iterator that yields decoded text chunks and accumulates + the encoded bytes for the span output attribute. + """ + return self._wrap_aiter(self.__wrapped__.aiter_text(*args, **kwargs)) + + def aiter_lines(self, *args: Any, **kwargs: Any) -> AsyncIterator[str]: + """Proxy ``Response.aiter_lines``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.aiter_lines``. + **kwargs: Keyword arguments forwarded to ``Response.aiter_lines``. + + Returns: + An async iterator that yields line strings and accumulates the + encoded bytes for the span output attribute. + """ + return self._wrap_aiter(self.__wrapped__.aiter_lines(*args, **kwargs)) + + def aiter_raw(self, *args: Any, **kwargs: Any) -> AsyncIterator[bytes]: + """Proxy ``Response.aiter_raw``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.aiter_raw``. + **kwargs: Keyword arguments forwarded to ``Response.aiter_raw``. + + Returns: + An async iterator that yields raw (un-decoded) bytes chunks and + accumulates them for the span output attribute. + """ + return self._wrap_aiter(self.__wrapped__.aiter_raw(*args, **kwargs)) + + async def __aenter__(self) -> "AsyncStreamingWrapper": + """Return the wrapper itself so iteration methods are captured inside an async with-block. + + Returns: + This AsyncStreamingWrapper instance. + """ + await self.__wrapped__.__aenter__() + return self + + async def __aexit__(self, *args: Any) -> None: + """Close via the wrapper so the span is finalized. + + Args: + *args: Exception info tuple (exc_type, exc_val, exc_tb) forwarded + from the async context manager protocol. + """ + await self.aclose() + + async def aclose(self) -> None: + """Close the underlying response and finalize the span. + + Awaits ``Response.aclose()`` on the wrapped response, then invokes + :meth:`_finalize_span` to record the accumulated output and end the span. + """ + try: + await self.__wrapped__.aclose() + finally: + self._finalize_span() + + +def send_wrapper(tracer: Tracer) -> Callable[..., Any]: + """Return a wrapt-compatible wrapper for ``httpx.Client.send``. + + Args: + tracer: The OpenTelemetry Tracer used to create spans. + + Returns: + A callable suitable for use with ``wrap_function_wrapper``. + """ + + def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Any: + """Intercept ``Client.send``, create a span, and capture request/response data. + + Args: + wrapped: The original ``Client.send`` method. + instance: The ``Client`` instance on which the method is called. + args: Positional arguments passed to ``Client.send``; the first + element is the ``httpx.Request``. + kwargs: Keyword arguments passed to ``Client.send``. + + Returns: + The original ``httpx.Response`` for non-streaming requests, or a + :class:`StreamingWrapper` that keeps the span open while the + caller iterates over a streaming response. + """ + if should_suppress_instrumentation(): + return wrapped(*args, **kwargs) + + try: + request = args[0] if args else kwargs.get("request") + if request is None: + raise ValueError("No request object found in arguments") + method = request.method + url = remove_url_credentials(str(request.url)) + span_name = get_default_span_name(method) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to extract request metadata: %s", e) + return wrapped(*args, **kwargs) + + is_streaming = kwargs.get("stream", False) + if not is_streaming: + with tracer.start_as_current_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"http.request.method": method, "url.full": url}, + ) as span: + try: + set_span_input(span, request) + headers = dict(request.headers) + inject(headers) + request.headers.update(headers) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to set span input: %s", e) + + try: + with suppress_http_instrumentation(): + response = wrapped(*args, **kwargs) + except Exception as e: + logger.error("netra.instrumentation.httpx: %s", e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + try: + span.set_attribute("http.response.status_code", response.status_code) + set_span_output(span, response) + if response.status_code >= 500: + span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}")) + else: + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to process response span: %s", e) + + return response + + span = tracer.start_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"http.request.method": method, "url.full": url}, + ) + try: + context = context_api.attach(set_span_in_context(span)) + try: + set_span_input(span, request) + headers = dict(request.headers) + inject(headers) + request.headers.update(headers) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to set span input: %s", e) + + try: + with suppress_http_instrumentation(): + response = wrapped(*args, **kwargs) + except Exception as e: + logger.error("netra.instrumentation.httpx: %s", e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + raise + + try: + span.set_attribute("http.response.status_code", response.status_code) + if response.status_code >= 500: + span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}")) + else: + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to set response status on span: %s", e) + + return StreamingWrapper(response=response, span=span) + finally: + context_api.detach(context) + + return wrapper + + +def async_send_wrapper(tracer: Tracer) -> Callable[..., Awaitable[Any]]: + """Return a wrapt-compatible async wrapper for ``httpx.AsyncClient.send``. + + Args: + tracer: The OpenTelemetry Tracer used to create spans. + + Returns: + An async callable suitable for use with ``wrap_function_wrapper``. + """ + + async def wrapper( + wrapped: Callable[..., Awaitable[Any]], instance: Any, args: Tuple[Any, ...], kwargs: Dict[str, Any] + ) -> Any: + """Intercept ``AsyncClient.send``, create a span, and capture request/response data. + + Args: + wrapped: The original ``AsyncClient.send`` coroutine. + instance: The ``AsyncClient`` instance on which the method is called. + args: Positional arguments passed to ``AsyncClient.send``; the first + element is the ``httpx.Request``. + kwargs: Keyword arguments passed to ``AsyncClient.send``. + + Returns: + The original ``httpx.Response`` for non-streaming requests, or an + :class:`AsyncStreamingWrapper` that keeps the span open while the + caller iterates over a streaming response. + """ + if should_suppress_instrumentation(): + return await wrapped(*args, **kwargs) + + try: + request = args[0] if args else kwargs.get("request") + if request is None: + raise ValueError("No request object found in arguments") + method = request.method + url = remove_url_credentials(str(request.url)) + span_name = get_default_span_name(method) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to extract request metadata: %s", e) + return await wrapped(*args, **kwargs) + + is_streaming = kwargs.get("stream", False) + if not is_streaming: + with tracer.start_as_current_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"http.request.method": method, "url.full": url}, + ) as span: + try: + set_span_input(span, request) + headers = dict(request.headers) + inject(headers) + request.headers.update(headers) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to set span input: %s", e) + + try: + with suppress_http_instrumentation(): + response = await wrapped(*args, **kwargs) + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + try: + span.set_attribute("http.response.status_code", response.status_code) + set_span_output(span, response) + if response.status_code >= 500: + span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}")) + else: + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to process response span: %s", e) + + return response + + span = tracer.start_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"http.request.method": method, "url.full": url}, + ) + try: + context = context_api.attach(set_span_in_context(span)) + try: + set_span_input(span, request) + headers = dict(request.headers) + inject(headers) + request.headers.update(headers) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to set span input: %s", e) + + try: + with suppress_http_instrumentation(): + response = await wrapped(*args, **kwargs) + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + raise + + try: + span.set_attribute("http.response.status_code", response.status_code) + if response.status_code >= 500: + span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}")) + else: + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.debug("netra.instrumentation.httpx: failed to set response status on span: %s", e) + + return AsyncStreamingWrapper(response=response, span=span) + finally: + context_api.detach(context) + + return wrapper diff --git a/netra/instrumentation/requests/__init__.py b/netra/instrumentation/requests/__init__.py new file mode 100644 index 0000000..1da4038 --- /dev/null +++ b/netra/instrumentation/requests/__init__.py @@ -0,0 +1,60 @@ +import logging +from typing import Any, Collection + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import get_tracer +from wrapt import wrap_function_wrapper + +from netra.instrumentation.requests.version import __version__ +from netra.instrumentation.requests.wrappers import send_wrapper + +logger = logging.getLogger(__name__) + +_instruments = ("requests >= 2.0.0",) + + +class RequestsInstrumentor(BaseInstrumentor): # type: ignore[misc] + """Custom requests instrumentor for Netra SDK.""" + + def instrumentation_dependencies(self) -> Collection[str]: + """Return the list of required instrumentation dependencies. + + Returns: + A collection of package requirement strings that must be satisfied + for this instrumentor to function. + """ + return _instruments + + def _instrument(self, **kwargs: Any) -> None: + """Instrument requests.Session.send. + + Args: + **kwargs: Keyword arguments passed by the instrumentation framework. + tracer_provider: Optional TracerProvider to use for creating spans. + """ + try: + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + except Exception as e: + logger.error(f"Failed to initialize tracer: {e}") + return + + try: + wrap_function_wrapper("requests", "Session.send", send_wrapper(tracer)) + except Exception as e: + logger.error(f"Failed to instrument requests: {e}") + + def _uninstrument(self, **kwargs: Any) -> None: + """Uninstrument requests.Session.send. + + Args: + **kwargs: Keyword arguments passed by the instrumentation framework + (unused but required by the base class interface). + """ + try: + import requests as requests_lib # type:ignore[import-untyped] + + unwrap(requests_lib.Session, "send") + except Exception as e: + logger.error(f"Failed to uninstrument requests: {e}") diff --git a/netra/instrumentation/requests/utils.py b/netra/instrumentation/requests/utils.py new file mode 100644 index 0000000..484a4a9 --- /dev/null +++ b/netra/instrumentation/requests/utils.py @@ -0,0 +1,247 @@ +import json +import logging +from typing import Any, Dict, List + +import requests as requests_lib # type: ignore[import-untyped] +from opentelemetry import context as context_api +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.trace import Span +from opentelemetry.util.http import remove_url_credentials, sanitize_method + +logger = logging.getLogger(__name__) + +_SENSITIVE_HEADERS = frozenset( + { + "authorization", + "cookie", + "set-cookie", + "x-api-key", + "api-key", + "x-auth-token", + "proxy-authorization", + } +) + + +def should_suppress_instrumentation() -> bool: + """Check if instrumentation should be suppressed. + + Returns: + True if the OpenTelemetry suppression key is active in the current + context, False otherwise. + """ + return context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) is True + + +def get_default_span_name(method: str) -> str: + """Derive a span name from the HTTP method. + + Args: + method: The raw HTTP method string. + + Returns: + The sanitized method (e.g. "GET") or "HTTP" for non-standard methods. + """ + if not method: + return "HTTP" + method = sanitize_method(method.strip()) + if method == "_OTHER": + return "HTTP" + return method + + +def _sanitize_headers(headers: Any) -> Dict[str, str]: + """Redact sensitive header values. + + Args: + headers: A mapping of header names to values. + + Returns: + A new dict with sensitive values replaced by "[REDACTED]". + """ + return {k: "[REDACTED]" if k.lower() in _SENSITIVE_HEADERS else v for k, v in headers.items()} + + +def _get_request_body(request: requests_lib.PreparedRequest) -> Any: + """Extract and deserialize the request body. + + Args: + request: The requests PreparedRequest object. + + Returns: + The parsed JSON, decoded string, streaming placeholder, or None. + """ + body = request.body + if body is None: + return None + if isinstance(body, bytes): + if not body: + return None + try: + return json.loads(body) + except (json.JSONDecodeError, UnicodeDecodeError): + pass + try: + return body.decode("utf-8") + except UnicodeDecodeError: + return f"" + if isinstance(body, str): + if not body: + return None + try: + return json.loads(body) + except json.JSONDecodeError: + return body + return "" + + +def _get_response_body(response: requests_lib.Response) -> Any: + """Extract and deserialize the response body. + + Skips body capture for streaming responses whose content has not yet been + consumed, to avoid forcing a full download and breaking downstream readers. + + Args: + response: The requests Response object. + + Returns: + The parsed JSON, text content, or None. + """ + if not getattr(response, "_content_consumed", True): + return "" + try: + return response.json() + except Exception: + pass + try: + text = response.text + if text: + return text + except Exception: + pass + return None + + +def set_span_input(span: Span, request: requests_lib.PreparedRequest) -> None: + """Serialize request data and set it as the span ``input`` attribute. + + Args: + span: The active OpenTelemetry span. + request: The outgoing PreparedRequest. + """ + if not span.is_recording(): + return + try: + input_data: Dict[str, Any] = { + "url": remove_url_credentials(request.url or ""), + "headers": _sanitize_headers(request.headers), + } + body = _get_request_body(request) + if body is not None: + input_data["body"] = body + span.set_attribute("input", json.dumps(input_data)) + except Exception: + logger.debug("Failed to set input attribute on requests span", exc_info=True) + + +def set_span_output(span: Span, response: requests_lib.Response) -> None: + """Serialize response data and set it as the span ``output`` attribute. + + Args: + span: The active OpenTelemetry span. + response: The received Response. + """ + if not span.is_recording(): + return + try: + output_data: Dict[str, Any] = { + "status_code": response.status_code, + "headers": _sanitize_headers(response.headers), + } + body = _get_response_body(response) + if body is not None: + output_data["body"] = body + span.set_attribute("output", json.dumps(output_data)) + except Exception: + logger.debug("Failed to set output attribute on requests span", exc_info=True) + + +def _parse_streaming_body(accumulated: bytes) -> Any: + """Parse accumulated streaming response bytes into a structured value. + + Handles SSE (``data: {...}``), NDJSON, plain concatenated JSON objects, + and falls back to a decoded string or a binary placeholder. + + Args: + accumulated: Raw bytes collected from the streaming response chunks. + + Returns: + A parsed JSON object or list, a plain string, or a binary-size + placeholder string if the bytes cannot be decoded as UTF-8. + """ + try: + text = accumulated.decode("utf-8") + except UnicodeDecodeError: + return f"" + + # SSE: any line starts with "data:" + lines = [ln.strip() for ln in text.splitlines() if ln.strip()] + if any(ln.startswith("data:") for ln in lines): + parsed: List[Any] = [] + for ln in lines: + if ln.startswith("data:"): + data = ln[5:].strip() + if data == "[DONE]": + continue + try: + parsed.append(json.loads(data)) + except json.JSONDecodeError: + parsed.append(data) + if parsed: + return parsed[0] if len(parsed) == 1 else parsed + + # Sequential JSON decoding: handles single JSON, NDJSON, and bare concatenated objects + decoder = json.JSONDecoder() + results: List[Any] = [] + idx = 0 + stripped = text.strip() + try: + while idx < len(stripped): + obj, end_idx = decoder.raw_decode(stripped, idx) + results.append(obj) + idx = end_idx + while idx < len(stripped) and stripped[idx] in " \t\n\r": + idx += 1 + if results and idx == len(stripped): + return results[0] if len(results) == 1 else results + except json.JSONDecodeError: + pass + + return text + + +def set_streaming_span_output(span: Span, response: requests_lib.Response, chunks: List[bytes]) -> None: + """Serialize accumulated streaming chunks and set them as the span ``output`` attribute. + + Args: + span: The active OpenTelemetry span. + response: The requests Response whose headers/status are used. + chunks: Raw bytes chunks accumulated during iteration. + """ + if not span.is_recording(): + return + try: + output_data: Dict[str, Any] = { + "status_code": response.status_code, + "headers": _sanitize_headers(response.headers), + } + if chunks: + output_data["body"] = _parse_streaming_body(b"".join(chunks)) + else: + # Fallback: body was accessed via .content/.text rather than iterators + body = _get_response_body(response) + if body is not None: + output_data["body"] = body + span.set_attribute("output", json.dumps(output_data)) + except Exception: + logger.debug("Failed to set streaming output attribute on requests span", exc_info=True) diff --git a/netra/instrumentation/requests/version.py b/netra/instrumentation/requests/version.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/netra/instrumentation/requests/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/netra/instrumentation/requests/wrappers.py b/netra/instrumentation/requests/wrappers.py new file mode 100644 index 0000000..e8f6652 --- /dev/null +++ b/netra/instrumentation/requests/wrappers.py @@ -0,0 +1,255 @@ +import logging +from collections.abc import Iterator +from typing import Any, Callable, Dict, List, Tuple + +from opentelemetry import context as context_api +from opentelemetry.instrumentation.utils import suppress_http_instrumentation +from opentelemetry.propagate import inject +from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.http import remove_url_credentials +from wrapt import ObjectProxy + +from netra.instrumentation.requests.utils import ( + get_default_span_name, + set_span_input, + set_span_output, + set_streaming_span_output, + should_suppress_instrumentation, +) + +logger = logging.getLogger(__name__) + + +class StreamingWrapper(ObjectProxy): # type: ignore[misc] + """Wraps a streaming requests.Response, keeping the span open until the stream is closed.""" + + def __init__(self, response: Any, span: Span) -> None: + """Initialize the streaming wrapper. + + Args: + response: The streaming requests.Response to wrap. + span: The open OpenTelemetry span to keep alive during streaming. + """ + super().__init__(response) + self._span = span + self._chunks: List[bytes] = [] + self._finalized = False + + def _wrap_iter(self, inner: Iterator[Any]) -> Iterator[Any]: + """Proxy a synchronous iterator, accumulating raw bytes for the span. + + Args: + inner: The underlying iterator to wrap. + + Yields: + Each chunk from *inner* unchanged. + """ + try: + for chunk in inner: + if isinstance(chunk, bytes): + self._chunks.append(chunk) + elif isinstance(chunk, str): + self._chunks.append(chunk.encode("utf-8")) + yield chunk + except GeneratorExit: + return + except Exception as e: + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.record_exception(e) + raise + + def __iter__(self) -> Iterator[Any]: + """Proxy direct iteration over the response, capturing chunks for span output. + + Returns: + An iterator that yields response chunks and accumulates them for + the span output attribute. + """ + return self._wrap_iter(iter(self.__wrapped__)) + + def iter_content(self, *args: Any, **kwargs: Any) -> Iterator[bytes]: + """Proxy ``Response.iter_content``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.iter_content``. + **kwargs: Keyword arguments forwarded to ``Response.iter_content``. + + Returns: + An iterator that yields raw bytes chunks and accumulates them for + the span output attribute. + """ + return self._wrap_iter(self.__wrapped__.iter_content(*args, **kwargs)) + + def iter_lines(self, *args: Any, **kwargs: Any) -> Iterator[Any]: + """Proxy ``Response.iter_lines``, capturing chunks for span output. + + Args: + *args: Positional arguments forwarded to ``Response.iter_lines``. + **kwargs: Keyword arguments forwarded to ``Response.iter_lines``. + + Returns: + An iterator that yields decoded line strings and accumulates the + raw bytes for the span output attribute. + """ + return self._wrap_iter(self.__wrapped__.iter_lines(*args, **kwargs)) + + def _finalize_span(self) -> None: + """Write accumulated chunk data to the span output attribute and end the span. + + Idempotent — subsequent calls after the first are no-ops. + """ + if self._finalized: + return + self._finalized = True + try: + set_streaming_span_output(self._span, self.__wrapped__, self._chunks) + except Exception as e: + logger.debug("netra.instrumentation.requests: failed to finalize streaming span: %s", e) + finally: + self._span.end() + + def __enter__(self) -> "StreamingWrapper": + """Return the wrapper itself so iteration methods are captured inside a with-block. + + Returns: + This StreamingWrapper instance. + """ + self.__wrapped__.__enter__() + return self + + def __exit__(self, *args: Any) -> None: + """Close via the wrapper so the span is finalized. + + Args: + *args: Exception info tuple (exc_type, exc_val, exc_tb) forwarded + from the context manager protocol. + """ + self.close() + + def close(self) -> None: + """Close the underlying response and finalize the span. + + Calls ``Response.close()`` on the wrapped response, then invokes + :meth:`_finalize_span` to record the accumulated output and end the span. + """ + try: + self.__wrapped__.close() + finally: + self._finalize_span() + + def __del__(self) -> None: + """Finalize the span on garbage collection as a last-resort safety net.""" + self._finalize_span() + + +def send_wrapper(tracer: Tracer) -> Callable[..., Any]: + """Return a wrapt-compatible wrapper for ``requests.Session.send``. + + Args: + tracer: The OpenTelemetry Tracer used to create spans. + + Returns: + A callable suitable for use with ``wrap_function_wrapper``. + """ + + def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Any: + """Intercept ``Session.send``, create a span, and capture request/response data. + + Args: + wrapped: The original ``Session.send`` method. + instance: The ``Session`` instance on which the method is called. + args: Positional arguments passed to ``Session.send``; the first + element is the ``PreparedRequest``. + kwargs: Keyword arguments passed to ``Session.send``. + + Returns: + The original ``Response`` for non-streaming requests, or a + :class:`StreamingWrapper` that keeps the span open while the + caller iterates over a streaming response. + """ + if should_suppress_instrumentation(): + return wrapped(*args, **kwargs) + + try: + request = args[0] if args else kwargs.get("request") + if request is None: + raise ValueError("No request object found in arguments") + method = (request.method or "").upper() + url = remove_url_credentials(request.url or "") + span_name = get_default_span_name(method) + except Exception as e: + logger.debug("netra.instrumentation.requests: failed to extract request metadata: %s", e) + return wrapped(*args, **kwargs) + + is_streaming = kwargs.get("stream", False) + if not is_streaming: + with tracer.start_as_current_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"http.request.method": method, "url.full": url}, + ) as span: + try: + set_span_input(span, request) + inject(request.headers) + except Exception as e: + logger.debug("netra.instrumentation.requests: failed to set span input: %s", e) + + try: + with suppress_http_instrumentation(): + response = wrapped(*args, **kwargs) + except Exception as e: + logger.error("netra.instrumentation.requests: %s", e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + + try: + span.set_attribute("http.response.status_code", response.status_code) + set_span_output(span, response) + if response.status_code >= 500: + span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}")) + else: + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.debug("netra.instrumentation.requests: failed to process response span: %s", e) + + return response + + span = tracer.start_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"http.request.method": method, "url.full": url}, + ) + try: + context = context_api.attach(set_span_in_context(span)) + try: + set_span_input(span, request) + inject(request.headers) + except Exception as e: + logger.debug("netra.instrumentation.requests: failed to set span input: %s", e) + + try: + with suppress_http_instrumentation(): + response = wrapped(*args, **kwargs) + except Exception as e: + logger.error("netra.instrumentation.requests: %s", e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + span.end() + raise + + try: + span.set_attribute("http.response.status_code", response.status_code) + if response.status_code >= 500: + span.set_status(Status(StatusCode.ERROR, f"HTTP {response.status_code}")) + else: + span.set_status(Status(StatusCode.OK)) + except Exception as e: + logger.debug("netra.instrumentation.requests: failed to set response status on span: %s", e) + + return StreamingWrapper(response=response, span=span) + finally: + context_api.detach(context) + + return wrapper diff --git a/netra/version.py b/netra/version.py index 2db3392..3025238 100644 --- a/netra/version.py +++ b/netra/version.py @@ -1 +1 @@ -__version__ = "0.1.82" +__version__ = "0.1.80dev0" diff --git a/pyproject.toml b/pyproject.toml index 2bebdcd..e832ce3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [project] name = "netra-sdk" -version = "0.1.82" +version = "0.1.80dev0" description = "A Python SDK for AI application observability that provides OpenTelemetry-based monitoring, tracing, and PII protection for LLM and vector database applications. Enables easy instrumentation, session tracking, and privacy-focused data collection for AI systems in production environments." authors = [ {name = "Sooraj Thomas",email = "sooraj@keyvalue.systems"} diff --git a/tests/test_httpx_instrumentation.py b/tests/test_httpx_instrumentation.py index e4d9be9..6fa20a3 100644 --- a/tests/test_httpx_instrumentation.py +++ b/tests/test_httpx_instrumentation.py @@ -6,7 +6,8 @@ from typing import Collection from unittest.mock import Mock, patch -from netra.instrumentation.httpx import HTTPXInstrumentor, get_default_span_name +from netra.instrumentation.httpx import HTTPXInstrumentor +from netra.instrumentation.httpx.utils import get_default_span_name class TestHTTPXInstrumentor: @@ -14,10 +15,8 @@ class TestHTTPXInstrumentor: def test_initialization(self): """Test HTTPXInstrumentor initialization.""" - # Act instrumentor = HTTPXInstrumentor() - # Assert assert instrumentor is not None assert hasattr(instrumentor, "_instrument") assert hasattr(instrumentor, "_uninstrument") @@ -25,109 +24,48 @@ def test_initialization(self): def test_instrumentation_dependencies(self): """Test instrumentation_dependencies returns correct packages.""" - # Arrange instrumentor = HTTPXInstrumentor() - # Act dependencies = instrumentor.instrumentation_dependencies() - # Assert assert isinstance(dependencies, Collection) assert "httpx >= 0.18.0" in dependencies @patch("netra.instrumentation.httpx.get_tracer") - @patch("netra.instrumentation.httpx.get_meter") - @patch("netra.instrumentation.httpx._instrument") - def test_instrument_with_default_parameters(self, mock_instrument, mock_get_meter, mock_get_tracer): + @patch("netra.instrumentation.httpx.wrap_function_wrapper") + def test_instrument_with_default_parameters(self, mock_wrap, mock_get_tracer): """Test _instrument method with default parameters.""" - # Arrange instrumentor = HTTPXInstrumentor() mock_tracer = Mock() - mock_meter = Mock() - mock_histogram = Mock() mock_get_tracer.return_value = mock_tracer - mock_get_meter.return_value = mock_meter - mock_meter.create_histogram.return_value = mock_histogram - # Act instrumentor._instrument() - # Assert mock_get_tracer.assert_called_once() - mock_get_meter.assert_called_once() - mock_instrument.assert_called_once() + assert mock_wrap.call_count == 2 @patch("netra.instrumentation.httpx.get_tracer") - @patch("netra.instrumentation.httpx.get_meter") - @patch("netra.instrumentation.httpx._instrument") - def test_instrument_with_custom_parameters(self, mock_instrument, mock_get_meter, mock_get_tracer): - """Test _instrument method with custom parameters.""" - # Arrange + @patch("netra.instrumentation.httpx.wrap_function_wrapper") + def test_instrument_with_custom_tracer_provider(self, mock_wrap, mock_get_tracer): + """Test _instrument method with custom tracer provider.""" instrumentor = HTTPXInstrumentor() mock_tracer_provider = Mock() - mock_meter_provider = Mock() - mock_request_hook = Mock() - mock_response_hook = Mock() mock_tracer = Mock() - mock_meter = Mock() - mock_histogram = Mock() mock_get_tracer.return_value = mock_tracer - mock_get_meter.return_value = mock_meter - mock_meter.create_histogram.return_value = mock_histogram - - # Act - instrumentor._instrument( - tracer_provider=mock_tracer_provider, - meter_provider=mock_meter_provider, - request_hook=mock_request_hook, - response_hook=mock_response_hook, - excluded_urls="http://example.com", - ) - - # Assert - mock_get_tracer.assert_called_once() - mock_get_meter.assert_called_once() - mock_instrument.assert_called_once() - - # Verify the _instrument call includes the hooks - call_args = mock_instrument.call_args - assert call_args[1]["request_hook"] == mock_request_hook - assert call_args[1]["response_hook"] == mock_response_hook - - @patch("netra.instrumentation.httpx._uninstrument") - def test_uninstrument(self, mock_uninstrument): - """Test _uninstrument method calls the underlying uninstrument function.""" - # Arrange - instrumentor = HTTPXInstrumentor() - # Act - instrumentor._uninstrument() + instrumentor._instrument(tracer_provider=mock_tracer_provider) - # Assert - mock_uninstrument.assert_called_once() + mock_get_tracer.assert_called_once() + assert mock_wrap.call_count == 2 - @patch("netra.instrumentation.httpx.get_tracer") - @patch("netra.instrumentation.httpx.get_meter") - @patch("netra.instrumentation.httpx._instrument") - def test_instrument_with_duration_histogram_boundaries(self, mock_instrument, mock_get_meter, mock_get_tracer): - """Test _instrument method with custom duration histogram boundaries.""" - # Arrange + @patch("netra.instrumentation.httpx.unwrap") + def test_uninstrument(self, mock_unwrap): + """Test _uninstrument method calls unwrap for both sync and async clients.""" instrumentor = HTTPXInstrumentor() - custom_boundaries = [0.1, 0.5, 1.0, 2.0, 5.0] - mock_tracer = Mock() - mock_meter = Mock() - mock_histogram = Mock() - mock_get_tracer.return_value = mock_tracer - mock_get_meter.return_value = mock_meter - mock_meter.create_histogram.return_value = mock_histogram - # Act - instrumentor._instrument(duration_histogram_boundaries=custom_boundaries) + instrumentor._uninstrument() - # Assert - mock_get_tracer.assert_called_once() - mock_get_meter.assert_called_once() - mock_instrument.assert_called_once() + assert mock_unwrap.call_count == 2 class TestUtilityFunctions: @@ -135,32 +73,24 @@ class TestUtilityFunctions: def test_get_default_span_name_with_standard_method(self): """Test get_default_span_name with standard HTTP method.""" - # Act result = get_default_span_name("GET") - # Assert assert result == "GET" def test_get_default_span_name_with_lowercase_method(self): """Test get_default_span_name with lowercase HTTP method.""" - # Act result = get_default_span_name("post") - # Assert assert result == "POST" def test_get_default_span_name_with_custom_method(self): """Test get_default_span_name with custom HTTP method.""" - # Act result = get_default_span_name("PATCH") - # Assert assert result == "PATCH" def test_get_default_span_name_with_empty_method(self): """Test get_default_span_name with empty method.""" - # Act result = get_default_span_name("") - # Assert assert result == "HTTP"