Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
## 1.0.0b54 (Unreleased)

### Features Added

- Add `StatsbeatManager.add_metric_callback` to let SDKs/distros add their own metric
observations to built-in statsbeat metrics
([#47363](https://github.com/Azure/azure-sdk-for-python/pull/47363))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: blank line

### Breaking Changes
- Customer Facing SDKStats: Renamed metric dimension attributes from snake_case/dotted to camelCase
(`compute_type` -> `computeType`, `telemetry_type` -> `telemetryType`, `telemetry_success` -> `telemetrySuccess`,
Expand Down
189 changes: 189 additions & 0 deletions sdk/monitor/azure-monitor-opentelemetry-exporter/api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
```py
namespace azure.monitor.opentelemetry.exporter

class azure.monitor.opentelemetry.exporter.ApplicationInsightsSampler(Sampler):

def __init__(self, sampling_ratio: float = 1.0): ...

def get_description(self) -> str: ...

def should_sample(
self,
parent_context: Optional[Context],
trace_id: int,
name: str,
kind: Optional[SpanKind] = None,
attributes: Attributes = None,
links: Optional[Sequence[Link]] = None,
trace_state: Optional[TraceState] = None
) -> SamplingResult: ...


class azure.monitor.opentelemetry.exporter.AzureMonitorLogExporter(BaseExporter, LogRecordExporter):

def __init__(
self,
*,
api_version: Optional[str] = ...,
connection_string: Optional[str] = ...,
credential: Optional[ManagedIdentityCredential/ClientSecretCredential] = ...,
disable_offline_storage: Optional[bool] = ...,
max_envelopes_per_second: Optional[int] = ...,
storage_directory: Optional[str] = ...,
**kwargs: Any
) -> None: ...

@classmethod
def from_connection_string(
cls,
conn_str: str,
*,
api_version: Optional[str] = ...,
**kwargs: Any
) -> AzureMonitorLogExporter: ...

def export(
self,
batch: Sequence[ReadableLogRecord],
**kwargs: Any
) -> LogRecordExportResult: ...

def shutdown(self) -> None: ...


class azure.monitor.opentelemetry.exporter.AzureMonitorMetricExporter(BaseExporter, MetricExporter):

def __init__(self, **kwargs: Any) -> None: ...

@classmethod
def from_connection_string(
cls,
conn_str: str,
*,
api_version: Optional[str] = ...,
**kwargs: Any
) -> AzureMonitorMetricExporter: ...

def export(
self,
metrics_data: OTMetricsData,
timeout_millis: float = 10000,
**kwargs: Any
) -> MetricExportResult: ...

def force_flush(self, timeout_millis: float = 10000) -> bool: ...

def shutdown(
self,
timeout_millis: float = 30000,
**kwargs: Any
) -> None: ...


class azure.monitor.opentelemetry.exporter.AzureMonitorTraceExporter(BaseExporter, SpanExporter):

def __init__(self, **kwargs: Any): ...

@classmethod
def from_connection_string(
cls,
conn_str: str,
*,
api_version: Optional[str] = ...,
**kwargs: Any
) -> AzureMonitorTraceExporter: ...

def export(
self,
spans: Sequence[ReadableSpan],
**_kwargs: Any
) -> SpanExportResult: ...

def shutdown(self) -> None: ...


class azure.monitor.opentelemetry.exporter.RateLimitedSampler(Sampler):

def __init__(self, target_spans_per_second_limit: float): ...

def get_description(self) -> str: ...

def should_sample(
self,
parent_context: Optional[Context],
trace_id: int,
name: str,
kind: Optional[SpanKind] = None,
attributes: Attributes = None,
links: Optional[Sequence[Link]] = None,
trace_state: Optional[TraceState] = None
) -> SamplingResult: ...


namespace azure.monitor.opentelemetry.exporter.statsbeat

def azure.monitor.opentelemetry.exporter.statsbeat.collect_statsbeat_metrics(exporter: BaseExporter) -> None: ...


def azure.monitor.opentelemetry.exporter.statsbeat.shutdown_statsbeat_metrics() -> bool: ...


class azure.monitor.opentelemetry.exporter.statsbeat.StatsbeatConfig:

def __eq__(self, other: object) -> bool: ...

def __hash__(self) -> int: ...

def __init__(
self,
endpoint: str,
region: str,
instrumentation_key: str,
disable_offline_storage: bool = False,
credential: Optional[Any] = None,
distro_version: Optional[str] = None,
connection_string: Optional[str] = None
) -> None: ...

@classmethod
def from_config(
cls,
base_config: StatsbeatConfig,
config_dict: Dict[str, str]
) -> Optional[StatsbeatConfig]: ...

@classmethod
def from_exporter(cls, exporter: Any) -> Optional[StatsbeatConfig]: ...


class azure.monitor.opentelemetry.exporter.statsbeat.StatsbeatManager(metaclass=Singleton):

def __init__(self) -> None: ...

def add_metric_callback(
self,
metric_name: str,
callback: Callable[[CallbackOptions], Iterable[Observation]]
) -> bool: ...

def get_current_config(self) -> Optional[StatsbeatConfig]: ...

def initialize(self, config: StatsbeatConfig) -> bool: ...

def is_initialized(self) -> bool: ...

def shutdown(self) -> bool: ...


namespace azure.monitor.opentelemetry.exporter.statsbeat.customer

def azure.monitor.opentelemetry.exporter.statsbeat.customer.collect_customer_sdkstats(exporter: BaseExporter) -> None: ...


def azure.monitor.opentelemetry.exporter.statsbeat.customer.get_customer_stats_manager() -> CustomerSdkStatsManager: ...


def azure.monitor.opentelemetry.exporter.statsbeat.customer.shutdown_customer_sdkstats_metrics() -> None: ...


```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
apiMdSha256: aae91abb52f0850d0b137b43fb90124f5da502f9500a8276b28731bf9041fda6
parserVersion: 0.3.28
pythonVersion: 3.13.14
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
# Licensed under the MIT License.
import logging
import threading
from typing import Optional, Any, Dict
from typing import Callable, Iterable, Optional, Any, Dict

from opentelemetry.metrics import CallbackOptions, Observation
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
Expand All @@ -19,6 +20,8 @@
_get_stats_long_export_interval,
_get_stats_short_export_interval,
_get_connection_string_for_region_from_config,
_ADDITIONAL_CALLBACKS,
_ADDITIONAL_CALLBACKS_LOCK,
)
from azure.monitor.opentelemetry.exporter._utils import Singleton

Expand Down Expand Up @@ -377,3 +380,25 @@ def is_initialized(self) -> bool:
"""
with self._lock:
return self._initialized

def add_metric_callback(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be invoked by the distro.

self,
metric_name: str,
callback: Callable[[CallbackOptions], Iterable[Observation]],
) -> bool:
"""Register an extra observation callback that an SDK/Distro with its own network sdkstats metric can use to
contribute rows to a built-in statsbeat metric.

:param metric_name: Name of the built-in statsbeat metric to extend.
:type metric_name: str
:param callback: OpenTelemetry observable-gauge callback ``(CallbackOptions) -> Iterable[Observation]``.
:type callback: Callable[[CallbackOptions], Iterable[Observation]]
:returns: ``True`` if newly registered, ``False`` if already registered.
:rtype: bool
"""
with _ADDITIONAL_CALLBACKS_LOCK:
callbacks = _ADDITIONAL_CALLBACKS.setdefault(metric_name, [])

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add additional callbacks globals to state and have gettors / settors pattern just like the other global variables.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can then also make the additional callbacks as fields on the manager instead of accessing it directly from globals.

@rads-1996 rads-1996 Jun 15, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can then also make the additional callbacks as fields on the manager instead of accessing it directly from globals.

@lzchen What do you mean by "make additional callbacks as fields on the manager"? Could you please elaborate on that.

if callback in callbacks:
return False
callbacks.append(callback)
return True
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
get_statsbeat_customer_sdkstats_feature_set,
get_statsbeat_browser_sdk_loader_feature_set,
)
from azure.monitor.opentelemetry.exporter.statsbeat._utils import (
_iter_additional_observations,
)
from azure.monitor.opentelemetry.exporter import _utils


Expand Down Expand Up @@ -379,6 +382,7 @@ def _get_success_count(self, options: CallbackOptions) -> Iterable[Observation]:
if count != 0:
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 0
observations.extend(_iter_additional_observations(_REQ_SUCCESS_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -393,6 +397,7 @@ def _get_failure_count(self, options: CallbackOptions) -> Iterable[Observation]:
attributes["statusCode"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_FAILURE_NAME[1]][code] = 0 # type: ignore
observations.extend(_iter_additional_observations(_REQ_FAILURE_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -409,6 +414,7 @@ def _get_average_duration(self, options: CallbackOptions) -> Iterable[Observatio
observations.append(Observation(result * 1000, dict(attributes)))
_REQUESTS_MAP[_REQ_DURATION_NAME[1]] = 0
_REQUESTS_MAP["count"] = 0
observations.extend(_iter_additional_observations(_REQ_DURATION_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -423,6 +429,7 @@ def _get_retry_count(self, options: CallbackOptions) -> Iterable[Observation]:
attributes["statusCode"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_RETRY_NAME[1]][code] = 0 # type: ignore
observations.extend(_iter_additional_observations(_REQ_RETRY_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -437,6 +444,7 @@ def _get_throttle_count(self, options: CallbackOptions) -> Iterable[Observation]
attributes["statusCode"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_THROTTLE_NAME[1]][code] = 0 # type: ignore
observations.extend(_iter_additional_observations(_REQ_THROTTLE_NAME[0], options))
return observations

# pylint: disable=unused-argument
Expand All @@ -451,6 +459,7 @@ def _get_exception_count(self, options: CallbackOptions) -> Iterable[Observation
attributes["exceptionType"] = code
observations.append(Observation(int(count), dict(attributes)))
_REQUESTS_MAP[_REQ_EXCEPTION_NAME[1]][code] = 0 # type: ignore
observations.extend(_iter_additional_observations(_REQ_EXCEPTION_NAME[0], options))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yield pattern is good in _iter_additional_observations but observations.extend materializes all yielded items anyways, effectively cancelling out the optimization. You can jus return instead of yield to be consistent.

return observations


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import os
import logging
import json
from collections.abc import Iterable # pylint: disable=import-error
from typing import Optional, Dict
import threading
from collections.abc import Iterable, Callable, Iterator # pylint: disable=import-error
from typing import Optional, Dict, List
from opentelemetry.metrics import CallbackOptions, Observation

from azure.monitor.opentelemetry.exporter._constants import (
Comment thread
rads-1996 marked this conversation as resolved.
_APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME,
Expand All @@ -26,6 +28,9 @@
_REQUESTS_MAP_LOCK,
)

_ADDITIONAL_CALLBACKS: Dict[str, List[Callable[[CallbackOptions], Iterable[Observation]]]] = {}
_ADDITIONAL_CALLBACKS_LOCK = threading.Lock()


def _get_stats_connection_string(endpoint: str) -> str:
cs_env = os.environ.get(_APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME)
Expand Down Expand Up @@ -165,3 +170,36 @@ def _get_connection_string_for_region_from_config(target_region: str, settings:
"Unexpected error getting stats connection string for region '%s': %s", target_region, str(ex)
)
return None


def _iter_additional_observations(metric_name: str, options: CallbackOptions) -> Iterator[Observation]:
"""Yield observations contributed via :func:`add_metric_callback`.

Invoked by the built-in ``_StatsbeatMetrics`` callbacks at collection time.
Snapshots the registered callbacks under the registry lock to avoid
mutation during iteration, then releases the lock before invoking user
callbacks (so they cannot deadlock against the registry). Exceptions raised
by individual callbacks are caught, logged, and skipped.

:param metric_name: Name of the built-in statsbeat metric being collected.
:type metric_name: str
:param options: OpenTelemetry callback options forwarded to each registered callback.
:type options: ~opentelemetry.metrics.CallbackOptions
:returns: Iterator over observations contributed by registered callbacks.
:rtype: Iterator[~opentelemetry.metrics.Observation]
"""

with _ADDITIONAL_CALLBACKS_LOCK:
callbacks = tuple(_ADDITIONAL_CALLBACKS.get(metric_name, ()))

iter_logger = logging.getLogger(__name__)
for cb in callbacks:
try:
yield from cb(options)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is passing in options for future proofing? I don't think our internal callbacks use them right?

except Exception: # pylint: disable=broad-except
iter_logger.debug(
"Extra statsbeat callback %r for %r raised; skipping.",
cb,
metric_name,
exc_info=True,
)
Loading
Loading