diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md index e54407c9309c..e0a79e9d5e80 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/CHANGELOG.md @@ -3,7 +3,9 @@ ## 1.0.0b54 (Unreleased) ### Features Added - +- Add `StatsbeatManager.add_metric_callback` to let SDKs/distros add their own metric + observations to built-in statsbeat metrics + ([#47363](https://github.com/Azure/azure-sdk-for-python/pull/47363)) ### Breaking Changes - Customer Facing SDKStats: Renamed metric dimension attributes from snake_case/dotted to camelCase (`compute_type` -> `computeType`, `telemetry_type` -> `telemetryType`, `telemetry_success` -> `telemetrySuccess`, diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md new file mode 100644 index 000000000000..1c14aeda42dd --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.md @@ -0,0 +1,189 @@ +```py +namespace azure.monitor.opentelemetry.exporter + + class azure.monitor.opentelemetry.exporter.ApplicationInsightsSampler(Sampler): + + def __init__(self, sampling_ratio: float = 1.0): ... + + def get_description(self) -> str: ... + + def should_sample( + self, + parent_context: Optional[Context], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence[Link]] = None, + trace_state: Optional[TraceState] = None + ) -> SamplingResult: ... + + + class azure.monitor.opentelemetry.exporter.AzureMonitorLogExporter(BaseExporter, LogRecordExporter): + + def __init__( + self, + *, + api_version: Optional[str] = ..., + connection_string: Optional[str] = ..., + credential: Optional[ManagedIdentityCredential/ClientSecretCredential] = ..., + disable_offline_storage: Optional[bool] = ..., + max_envelopes_per_second: Optional[int] = ..., + storage_directory: Optional[str] = ..., + **kwargs: Any + ) -> None: ... + + @classmethod + def from_connection_string( + cls, + conn_str: str, + *, + api_version: Optional[str] = ..., + **kwargs: Any + ) -> AzureMonitorLogExporter: ... + + def export( + self, + batch: Sequence[ReadableLogRecord], + **kwargs: Any + ) -> LogRecordExportResult: ... + + def shutdown(self) -> None: ... + + + class azure.monitor.opentelemetry.exporter.AzureMonitorMetricExporter(BaseExporter, MetricExporter): + + def __init__(self, **kwargs: Any) -> None: ... + + @classmethod + def from_connection_string( + cls, + conn_str: str, + *, + api_version: Optional[str] = ..., + **kwargs: Any + ) -> AzureMonitorMetricExporter: ... + + def export( + self, + metrics_data: OTMetricsData, + timeout_millis: float = 10000, + **kwargs: Any + ) -> MetricExportResult: ... + + def force_flush(self, timeout_millis: float = 10000) -> bool: ... + + def shutdown( + self, + timeout_millis: float = 30000, + **kwargs: Any + ) -> None: ... + + + class azure.monitor.opentelemetry.exporter.AzureMonitorTraceExporter(BaseExporter, SpanExporter): + + def __init__(self, **kwargs: Any): ... + + @classmethod + def from_connection_string( + cls, + conn_str: str, + *, + api_version: Optional[str] = ..., + **kwargs: Any + ) -> AzureMonitorTraceExporter: ... + + def export( + self, + spans: Sequence[ReadableSpan], + **_kwargs: Any + ) -> SpanExportResult: ... + + def shutdown(self) -> None: ... + + + class azure.monitor.opentelemetry.exporter.RateLimitedSampler(Sampler): + + def __init__(self, target_spans_per_second_limit: float): ... + + def get_description(self) -> str: ... + + def should_sample( + self, + parent_context: Optional[Context], + trace_id: int, + name: str, + kind: Optional[SpanKind] = None, + attributes: Attributes = None, + links: Optional[Sequence[Link]] = None, + trace_state: Optional[TraceState] = None + ) -> SamplingResult: ... + + +namespace azure.monitor.opentelemetry.exporter.statsbeat + + def azure.monitor.opentelemetry.exporter.statsbeat.collect_statsbeat_metrics(exporter: BaseExporter) -> None: ... + + + def azure.monitor.opentelemetry.exporter.statsbeat.shutdown_statsbeat_metrics() -> bool: ... + + + class azure.monitor.opentelemetry.exporter.statsbeat.StatsbeatConfig: + + def __eq__(self, other: object) -> bool: ... + + def __hash__(self) -> int: ... + + def __init__( + self, + endpoint: str, + region: str, + instrumentation_key: str, + disable_offline_storage: bool = False, + credential: Optional[Any] = None, + distro_version: Optional[str] = None, + connection_string: Optional[str] = None + ) -> None: ... + + @classmethod + def from_config( + cls, + base_config: StatsbeatConfig, + config_dict: Dict[str, str] + ) -> Optional[StatsbeatConfig]: ... + + @classmethod + def from_exporter(cls, exporter: Any) -> Optional[StatsbeatConfig]: ... + + + class azure.monitor.opentelemetry.exporter.statsbeat.StatsbeatManager(metaclass=Singleton): + + def __init__(self) -> None: ... + + def add_metric_callback( + self, + metric_name: str, + callback: Callable[[CallbackOptions], Iterable[Observation]] + ) -> bool: ... + + def get_current_config(self) -> Optional[StatsbeatConfig]: ... + + def initialize(self, config: StatsbeatConfig) -> bool: ... + + def is_initialized(self) -> bool: ... + + def shutdown(self) -> bool: ... + + +namespace azure.monitor.opentelemetry.exporter.statsbeat.customer + + def azure.monitor.opentelemetry.exporter.statsbeat.customer.collect_customer_sdkstats(exporter: BaseExporter) -> None: ... + + + def azure.monitor.opentelemetry.exporter.statsbeat.customer.get_customer_stats_manager() -> CustomerSdkStatsManager: ... + + + def azure.monitor.opentelemetry.exporter.statsbeat.customer.shutdown_customer_sdkstats_metrics() -> None: ... + + +``` \ No newline at end of file diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml new file mode 100644 index 000000000000..b5e35ca355b1 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/api.metadata.yml @@ -0,0 +1,3 @@ +apiMdSha256: aae91abb52f0850d0b137b43fb90124f5da502f9500a8276b28731bf9041fda6 +parserVersion: 0.3.28 +pythonVersion: 3.13.14 diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py index b435a95064db..b5b6351eddfa 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_manager.py @@ -2,8 +2,9 @@ # Licensed under the MIT License. import logging import threading -from typing import Optional, Any, Dict +from typing import Callable, Iterable, Optional, Any, Dict +from opentelemetry.metrics import CallbackOptions, Observation from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -19,6 +20,8 @@ _get_stats_long_export_interval, _get_stats_short_export_interval, _get_connection_string_for_region_from_config, + _ADDITIONAL_CALLBACKS, + _ADDITIONAL_CALLBACKS_LOCK, ) from azure.monitor.opentelemetry.exporter._utils import Singleton @@ -377,3 +380,25 @@ def is_initialized(self) -> bool: """ with self._lock: return self._initialized + + def add_metric_callback( + self, + metric_name: str, + callback: Callable[[CallbackOptions], Iterable[Observation]], + ) -> bool: + """Register an extra observation callback that an SDK/Distro with its own network sdkstats metric can use to + contribute rows to a built-in statsbeat metric. + + :param metric_name: Name of the built-in statsbeat metric to extend. + :type metric_name: str + :param callback: OpenTelemetry observable-gauge callback ``(CallbackOptions) -> Iterable[Observation]``. + :type callback: Callable[[CallbackOptions], Iterable[Observation]] + :returns: ``True`` if newly registered, ``False`` if already registered. + :rtype: bool + """ + with _ADDITIONAL_CALLBACKS_LOCK: + callbacks = _ADDITIONAL_CALLBACKS.setdefault(metric_name, []) + if callback in callbacks: + return False + callbacks.append(callback) + return True diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py index d0f20511aadf..66afe56aa15c 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_statsbeat_metrics.py @@ -40,6 +40,9 @@ get_statsbeat_customer_sdkstats_feature_set, get_statsbeat_browser_sdk_loader_feature_set, ) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + _iter_additional_observations, +) from azure.monitor.opentelemetry.exporter import _utils @@ -379,6 +382,7 @@ def _get_success_count(self, options: CallbackOptions) -> Iterable[Observation]: if count != 0: observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 0 + observations.extend(_iter_additional_observations(_REQ_SUCCESS_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -393,6 +397,7 @@ def _get_failure_count(self, options: CallbackOptions) -> Iterable[Observation]: attributes["statusCode"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_FAILURE_NAME[1]][code] = 0 # type: ignore + observations.extend(_iter_additional_observations(_REQ_FAILURE_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -409,6 +414,7 @@ def _get_average_duration(self, options: CallbackOptions) -> Iterable[Observatio observations.append(Observation(result * 1000, dict(attributes))) _REQUESTS_MAP[_REQ_DURATION_NAME[1]] = 0 _REQUESTS_MAP["count"] = 0 + observations.extend(_iter_additional_observations(_REQ_DURATION_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -423,6 +429,7 @@ def _get_retry_count(self, options: CallbackOptions) -> Iterable[Observation]: attributes["statusCode"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_RETRY_NAME[1]][code] = 0 # type: ignore + observations.extend(_iter_additional_observations(_REQ_RETRY_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -437,6 +444,7 @@ def _get_throttle_count(self, options: CallbackOptions) -> Iterable[Observation] attributes["statusCode"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_THROTTLE_NAME[1]][code] = 0 # type: ignore + observations.extend(_iter_additional_observations(_REQ_THROTTLE_NAME[0], options)) return observations # pylint: disable=unused-argument @@ -451,6 +459,7 @@ def _get_exception_count(self, options: CallbackOptions) -> Iterable[Observation attributes["exceptionType"] = code observations.append(Observation(int(count), dict(attributes))) _REQUESTS_MAP[_REQ_EXCEPTION_NAME[1]][code] = 0 # type: ignore + observations.extend(_iter_additional_observations(_REQ_EXCEPTION_NAME[0], options)) return observations diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py index 07ed150cd33d..3c45d7f20f61 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/statsbeat/_utils.py @@ -3,8 +3,10 @@ import os import logging import json -from collections.abc import Iterable # pylint: disable=import-error -from typing import Optional, Dict +import threading +from collections.abc import Iterable, Callable, Iterator # pylint: disable=import-error +from typing import Optional, Dict, List +from opentelemetry.metrics import CallbackOptions, Observation from azure.monitor.opentelemetry.exporter._constants import ( _APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME, @@ -26,6 +28,9 @@ _REQUESTS_MAP_LOCK, ) +_ADDITIONAL_CALLBACKS: Dict[str, List[Callable[[CallbackOptions], Iterable[Observation]]]] = {} +_ADDITIONAL_CALLBACKS_LOCK = threading.Lock() + def _get_stats_connection_string(endpoint: str) -> str: cs_env = os.environ.get(_APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME) @@ -165,3 +170,36 @@ def _get_connection_string_for_region_from_config(target_region: str, settings: "Unexpected error getting stats connection string for region '%s': %s", target_region, str(ex) ) return None + + +def _iter_additional_observations(metric_name: str, options: CallbackOptions) -> Iterator[Observation]: + """Yield observations contributed via :func:`add_metric_callback`. + + Invoked by the built-in ``_StatsbeatMetrics`` callbacks at collection time. + Snapshots the registered callbacks under the registry lock to avoid + mutation during iteration, then releases the lock before invoking user + callbacks (so they cannot deadlock against the registry). Exceptions raised + by individual callbacks are caught, logged, and skipped. + + :param metric_name: Name of the built-in statsbeat metric being collected. + :type metric_name: str + :param options: OpenTelemetry callback options forwarded to each registered callback. + :type options: ~opentelemetry.metrics.CallbackOptions + :returns: Iterator over observations contributed by registered callbacks. + :rtype: Iterator[~opentelemetry.metrics.Observation] + """ + + with _ADDITIONAL_CALLBACKS_LOCK: + callbacks = tuple(_ADDITIONAL_CALLBACKS.get(metric_name, ())) + + iter_logger = logging.getLogger(__name__) + for cb in callbacks: + try: + yield from cb(options) + except Exception: # pylint: disable=broad-except + iter_logger.debug( + "Extra statsbeat callback %r for %r raised; skipping.", + cb, + metric_name, + exc_info=True, + ) diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py index 572074e32b31..daf9491f5547 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/tests/statsbeat/test_metrics.py @@ -20,6 +20,9 @@ _REQ_SUCCESS_NAME, _REQ_THROTTLE_NAME, ) +from opentelemetry.metrics import Observation +from azure.monitor.opentelemetry.exporter.statsbeat import _utils as statsbeat_utils +from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager from azure.monitor.opentelemetry.exporter.statsbeat._state import ( _REQUESTS_MAP, _STATSBEAT_STATE, @@ -35,6 +38,9 @@ _AttachTypes, _RP_Names, ) +from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( + _iter_additional_observations, +) class MockResponse(object): @@ -967,4 +973,132 @@ def test_shorten_host(self): self.assertEqual(_shorten_host(url), "fakehost-5") +# pylint: disable=protected-access +class TestAdditionalObservationCallbacks(unittest.TestCase): + """Tests for StatsbeatManager.add_metric_callback and the _iter_additional_observations helper.""" + + def setUp(self): + statsbeat_utils._ADDITIONAL_CALLBACKS.clear() + _REQUESTS_MAP.clear() + + def tearDown(self): + statsbeat_utils._ADDITIONAL_CALLBACKS.clear() + _REQUESTS_MAP.clear() + + def _make_metric(self): + return _StatsbeatMetrics( + MeterProvider(), + "1aa11111-bbbb-1ccc-8ddd-eeeeffff3334", + "https://westus-1.in.applicationinsights.azure.com/", + False, + 0, + False, + ) + + # ---- StatsbeatManager.add_metric_callback ---- + + def test_add_returns_true_first_time(self): + cb = lambda options: [] # noqa: E731 + self.assertTrue(StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], cb)) + self.assertEqual(statsbeat_utils._ADDITIONAL_CALLBACKS[_REQ_SUCCESS_NAME[0]], [cb]) + + def test_add_is_idempotent_on_same_callback(self): + cb = lambda options: [] # noqa: E731 + self.assertTrue(StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], cb)) + self.assertFalse(StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], cb)) + self.assertEqual(statsbeat_utils._ADDITIONAL_CALLBACKS[_REQ_SUCCESS_NAME[0]], [cb]) + + def test_add_supports_multiple_distinct_callbacks(self): + cb1 = lambda options: [] # noqa: E731 + cb2 = lambda options: [] # noqa: E731 + self.assertTrue(StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], cb1)) + self.assertTrue(StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], cb2)) + self.assertEqual( + statsbeat_utils._ADDITIONAL_CALLBACKS[_REQ_SUCCESS_NAME[0]], + [cb1, cb2], + ) + + # ---- _iter_additional_observations ---- + + def test_iter_unregistered_name_yields_nothing(self): + self.assertEqual(list(_iter_additional_observations(_REQ_SUCCESS_NAME[0], None)), []) + + def test_iter_yields_observations_from_registered_callback(self): + obs = Observation(7, {"endpoint": "ep1"}) + + def cb(_options): + yield obs + + StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], cb) + self.assertEqual(list(_iter_additional_observations(_REQ_SUCCESS_NAME[0], None)), [obs]) + + def test_iter_aggregates_across_multiple_callbacks(self): + obs1 = Observation(1, {"endpoint": "ep1"}) + obs2 = Observation(2, {"endpoint": "ep2"}) + StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], lambda _options: [obs1]) + StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], lambda _options: [obs2]) + self.assertEqual( + list(_iter_additional_observations(_REQ_SUCCESS_NAME[0], None)), + [obs1, obs2], + ) + + def test_iter_swallows_callback_exception_and_continues(self): + good_obs = Observation(42, {"endpoint": "ok"}) + + def bad_cb(_options): + raise RuntimeError("boom") + + StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], bad_cb) + StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], lambda _options: [good_obs]) + # Should not raise; should still emit the good observation. + self.assertEqual( + list(_iter_additional_observations(_REQ_SUCCESS_NAME[0], None)), + [good_obs], + ) + + def test_iter_callbacks_for_other_metrics_not_invoked(self): + called = [] + StatsbeatManager().add_metric_callback(_REQ_FAILURE_NAME[0], lambda _options: called.append("failure") or []) + list(_iter_additional_observations(_REQ_SUCCESS_NAME[0], None)) + self.assertEqual(called, []) + + # ---- integration with built-in callbacks ---- + + def test_success_count_callback_emits_extras(self): + metric = self._make_metric() + _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 5 + + extra = Observation(99, {"endpoint": "extra-ep", "statusCode": 200}) + StatsbeatManager().add_metric_callback(_REQ_SUCCESS_NAME[0], lambda _options: [extra]) + + observations = metric._get_success_count(options=None) + + # Built-in observation followed by the extra one. + self.assertEqual(len(observations), 2) + self.assertEqual(observations[0].value, 5) + self.assertIs(observations[-1], extra) + + def test_success_count_callback_unchanged_without_extras(self): + metric = self._make_metric() + _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 3 + + observations = metric._get_success_count(options=None) + + self.assertEqual(len(observations), 1) + self.assertEqual(observations[0].value, 3) + + def test_extras_for_other_metric_do_not_leak_into_success(self): + metric = self._make_metric() + _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 1 + + unrelated = Observation(123, {"endpoint": "other"}) + StatsbeatManager().add_metric_callback(_REQ_FAILURE_NAME[0], lambda _options: [unrelated]) + + observations = metric._get_success_count(options=None) + + self.assertEqual(len(observations), 1) + self.assertEqual(observations[0].value, 1) + self.assertNotIn(unrelated, observations) + + # cSpell:enable