-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Modify sdkstats manager #47363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Modify sdkstats manager #47363
Changes from all commits
96c0e81
90ff0f7
fe9254a
38244df
fd08930
e8f7eb7
d9f5f81
e74f0fb
2bd92fa
0552d9d
c964227
a6024d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,189 @@ | ||
| ```py | ||
| namespace azure.monitor.opentelemetry.exporter | ||
|
|
||
| class azure.monitor.opentelemetry.exporter.ApplicationInsightsSampler(Sampler): | ||
|
|
||
| def __init__(self, sampling_ratio: float = 1.0): ... | ||
|
|
||
| def get_description(self) -> str: ... | ||
|
|
||
| def should_sample( | ||
| self, | ||
| parent_context: Optional[Context], | ||
| trace_id: int, | ||
| name: str, | ||
| kind: Optional[SpanKind] = None, | ||
| attributes: Attributes = None, | ||
| links: Optional[Sequence[Link]] = None, | ||
| trace_state: Optional[TraceState] = None | ||
| ) -> SamplingResult: ... | ||
|
|
||
|
|
||
| class azure.monitor.opentelemetry.exporter.AzureMonitorLogExporter(BaseExporter, LogRecordExporter): | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| api_version: Optional[str] = ..., | ||
| connection_string: Optional[str] = ..., | ||
| credential: Optional[ManagedIdentityCredential/ClientSecretCredential] = ..., | ||
| disable_offline_storage: Optional[bool] = ..., | ||
| max_envelopes_per_second: Optional[int] = ..., | ||
| storage_directory: Optional[str] = ..., | ||
| **kwargs: Any | ||
| ) -> None: ... | ||
|
|
||
| @classmethod | ||
| def from_connection_string( | ||
| cls, | ||
| conn_str: str, | ||
| *, | ||
| api_version: Optional[str] = ..., | ||
| **kwargs: Any | ||
| ) -> AzureMonitorLogExporter: ... | ||
|
|
||
| def export( | ||
| self, | ||
| batch: Sequence[ReadableLogRecord], | ||
| **kwargs: Any | ||
| ) -> LogRecordExportResult: ... | ||
|
|
||
| def shutdown(self) -> None: ... | ||
|
|
||
|
|
||
| class azure.monitor.opentelemetry.exporter.AzureMonitorMetricExporter(BaseExporter, MetricExporter): | ||
|
|
||
| def __init__(self, **kwargs: Any) -> None: ... | ||
|
|
||
| @classmethod | ||
| def from_connection_string( | ||
| cls, | ||
| conn_str: str, | ||
| *, | ||
| api_version: Optional[str] = ..., | ||
| **kwargs: Any | ||
| ) -> AzureMonitorMetricExporter: ... | ||
|
|
||
| def export( | ||
| self, | ||
| metrics_data: OTMetricsData, | ||
| timeout_millis: float = 10000, | ||
| **kwargs: Any | ||
| ) -> MetricExportResult: ... | ||
|
|
||
| def force_flush(self, timeout_millis: float = 10000) -> bool: ... | ||
|
|
||
| def shutdown( | ||
| self, | ||
| timeout_millis: float = 30000, | ||
| **kwargs: Any | ||
| ) -> None: ... | ||
|
|
||
|
|
||
| class azure.monitor.opentelemetry.exporter.AzureMonitorTraceExporter(BaseExporter, SpanExporter): | ||
|
|
||
| def __init__(self, **kwargs: Any): ... | ||
|
|
||
| @classmethod | ||
| def from_connection_string( | ||
| cls, | ||
| conn_str: str, | ||
| *, | ||
| api_version: Optional[str] = ..., | ||
| **kwargs: Any | ||
| ) -> AzureMonitorTraceExporter: ... | ||
|
|
||
| def export( | ||
| self, | ||
| spans: Sequence[ReadableSpan], | ||
| **_kwargs: Any | ||
| ) -> SpanExportResult: ... | ||
|
|
||
| def shutdown(self) -> None: ... | ||
|
|
||
|
|
||
| class azure.monitor.opentelemetry.exporter.RateLimitedSampler(Sampler): | ||
|
|
||
| def __init__(self, target_spans_per_second_limit: float): ... | ||
|
|
||
| def get_description(self) -> str: ... | ||
|
|
||
| def should_sample( | ||
| self, | ||
| parent_context: Optional[Context], | ||
| trace_id: int, | ||
| name: str, | ||
| kind: Optional[SpanKind] = None, | ||
| attributes: Attributes = None, | ||
| links: Optional[Sequence[Link]] = None, | ||
| trace_state: Optional[TraceState] = None | ||
| ) -> SamplingResult: ... | ||
|
|
||
|
|
||
| namespace azure.monitor.opentelemetry.exporter.statsbeat | ||
|
|
||
| def azure.monitor.opentelemetry.exporter.statsbeat.collect_statsbeat_metrics(exporter: BaseExporter) -> None: ... | ||
|
|
||
|
|
||
| def azure.monitor.opentelemetry.exporter.statsbeat.shutdown_statsbeat_metrics() -> bool: ... | ||
|
|
||
|
|
||
| class azure.monitor.opentelemetry.exporter.statsbeat.StatsbeatConfig: | ||
|
|
||
| def __eq__(self, other: object) -> bool: ... | ||
|
|
||
| def __hash__(self) -> int: ... | ||
|
|
||
| def __init__( | ||
| self, | ||
| endpoint: str, | ||
| region: str, | ||
| instrumentation_key: str, | ||
| disable_offline_storage: bool = False, | ||
| credential: Optional[Any] = None, | ||
| distro_version: Optional[str] = None, | ||
| connection_string: Optional[str] = None | ||
| ) -> None: ... | ||
|
|
||
| @classmethod | ||
| def from_config( | ||
| cls, | ||
| base_config: StatsbeatConfig, | ||
| config_dict: Dict[str, str] | ||
| ) -> Optional[StatsbeatConfig]: ... | ||
|
|
||
| @classmethod | ||
| def from_exporter(cls, exporter: Any) -> Optional[StatsbeatConfig]: ... | ||
|
|
||
|
|
||
| class azure.monitor.opentelemetry.exporter.statsbeat.StatsbeatManager(metaclass=Singleton): | ||
|
|
||
| def __init__(self) -> None: ... | ||
|
|
||
| def add_metric_callback( | ||
| self, | ||
| metric_name: str, | ||
| callback: Callable[[CallbackOptions], Iterable[Observation]] | ||
| ) -> bool: ... | ||
|
|
||
| def get_current_config(self) -> Optional[StatsbeatConfig]: ... | ||
|
|
||
| def initialize(self, config: StatsbeatConfig) -> bool: ... | ||
|
|
||
| def is_initialized(self) -> bool: ... | ||
|
|
||
| def shutdown(self) -> bool: ... | ||
|
|
||
|
|
||
| namespace azure.monitor.opentelemetry.exporter.statsbeat.customer | ||
|
|
||
| def azure.monitor.opentelemetry.exporter.statsbeat.customer.collect_customer_sdkstats(exporter: BaseExporter) -> None: ... | ||
|
|
||
|
|
||
| def azure.monitor.opentelemetry.exporter.statsbeat.customer.get_customer_stats_manager() -> CustomerSdkStatsManager: ... | ||
|
|
||
|
|
||
| def azure.monitor.opentelemetry.exporter.statsbeat.customer.shutdown_customer_sdkstats_metrics() -> None: ... | ||
|
|
||
|
|
||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| apiMdSha256: aae91abb52f0850d0b137b43fb90124f5da502f9500a8276b28731bf9041fda6 | ||
| parserVersion: 0.3.28 | ||
| pythonVersion: 3.13.14 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,9 @@ | |
| # Licensed under the MIT License. | ||
| import logging | ||
| import threading | ||
| from typing import Optional, Any, Dict | ||
| from typing import Callable, Iterable, Optional, Any, Dict | ||
|
|
||
| from opentelemetry.metrics import CallbackOptions, Observation | ||
| from opentelemetry.sdk.metrics import MeterProvider | ||
| from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader | ||
| from opentelemetry.sdk.resources import Resource | ||
|
|
@@ -19,6 +20,8 @@ | |
| _get_stats_long_export_interval, | ||
| _get_stats_short_export_interval, | ||
| _get_connection_string_for_region_from_config, | ||
| _ADDITIONAL_CALLBACKS, | ||
| _ADDITIONAL_CALLBACKS_LOCK, | ||
| ) | ||
| from azure.monitor.opentelemetry.exporter._utils import Singleton | ||
|
|
||
|
|
@@ -377,3 +380,25 @@ def is_initialized(self) -> bool: | |
| """ | ||
| with self._lock: | ||
| return self._initialized | ||
|
|
||
| def add_metric_callback( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be invoked by the distro. |
||
| self, | ||
| metric_name: str, | ||
| callback: Callable[[CallbackOptions], Iterable[Observation]], | ||
| ) -> bool: | ||
| """Register an extra observation callback that an SDK/Distro with its own network sdkstats metric can use to | ||
| contribute rows to a built-in statsbeat metric. | ||
|
|
||
| :param metric_name: Name of the built-in statsbeat metric to extend. | ||
| :type metric_name: str | ||
| :param callback: OpenTelemetry observable-gauge callback ``(CallbackOptions) -> Iterable[Observation]``. | ||
| :type callback: Callable[[CallbackOptions], Iterable[Observation]] | ||
| :returns: ``True`` if newly registered, ``False`` if already registered. | ||
| :rtype: bool | ||
| """ | ||
| with _ADDITIONAL_CALLBACKS_LOCK: | ||
| callbacks = _ADDITIONAL_CALLBACKS.setdefault(metric_name, []) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add additional callbacks globals to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can then also make the additional callbacks as fields on the manager instead of accessing it directly from globals.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@lzchen What do you mean by "make additional callbacks as fields on the manager"? Could you please elaborate on that. |
||
| if callback in callbacks: | ||
| return False | ||
| callbacks.append(callback) | ||
| return True | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,9 @@ | |
| get_statsbeat_customer_sdkstats_feature_set, | ||
| get_statsbeat_browser_sdk_loader_feature_set, | ||
| ) | ||
| from azure.monitor.opentelemetry.exporter.statsbeat._utils import ( | ||
| _iter_additional_observations, | ||
| ) | ||
| from azure.monitor.opentelemetry.exporter import _utils | ||
|
|
||
|
|
||
|
|
@@ -379,6 +382,7 @@ def _get_success_count(self, options: CallbackOptions) -> Iterable[Observation]: | |
| if count != 0: | ||
| observations.append(Observation(int(count), dict(attributes))) | ||
| _REQUESTS_MAP[_REQ_SUCCESS_NAME[1]] = 0 | ||
| observations.extend(_iter_additional_observations(_REQ_SUCCESS_NAME[0], options)) | ||
| return observations | ||
|
|
||
| # pylint: disable=unused-argument | ||
|
|
@@ -393,6 +397,7 @@ def _get_failure_count(self, options: CallbackOptions) -> Iterable[Observation]: | |
| attributes["statusCode"] = code | ||
| observations.append(Observation(int(count), dict(attributes))) | ||
| _REQUESTS_MAP[_REQ_FAILURE_NAME[1]][code] = 0 # type: ignore | ||
| observations.extend(_iter_additional_observations(_REQ_FAILURE_NAME[0], options)) | ||
| return observations | ||
|
|
||
| # pylint: disable=unused-argument | ||
|
|
@@ -409,6 +414,7 @@ def _get_average_duration(self, options: CallbackOptions) -> Iterable[Observatio | |
| observations.append(Observation(result * 1000, dict(attributes))) | ||
| _REQUESTS_MAP[_REQ_DURATION_NAME[1]] = 0 | ||
| _REQUESTS_MAP["count"] = 0 | ||
| observations.extend(_iter_additional_observations(_REQ_DURATION_NAME[0], options)) | ||
| return observations | ||
|
|
||
| # pylint: disable=unused-argument | ||
|
|
@@ -423,6 +429,7 @@ def _get_retry_count(self, options: CallbackOptions) -> Iterable[Observation]: | |
| attributes["statusCode"] = code | ||
| observations.append(Observation(int(count), dict(attributes))) | ||
| _REQUESTS_MAP[_REQ_RETRY_NAME[1]][code] = 0 # type: ignore | ||
| observations.extend(_iter_additional_observations(_REQ_RETRY_NAME[0], options)) | ||
| return observations | ||
|
|
||
| # pylint: disable=unused-argument | ||
|
|
@@ -437,6 +444,7 @@ def _get_throttle_count(self, options: CallbackOptions) -> Iterable[Observation] | |
| attributes["statusCode"] = code | ||
| observations.append(Observation(int(count), dict(attributes))) | ||
| _REQUESTS_MAP[_REQ_THROTTLE_NAME[1]][code] = 0 # type: ignore | ||
| observations.extend(_iter_additional_observations(_REQ_THROTTLE_NAME[0], options)) | ||
| return observations | ||
|
|
||
| # pylint: disable=unused-argument | ||
|
|
@@ -451,6 +459,7 @@ def _get_exception_count(self, options: CallbackOptions) -> Iterable[Observation | |
| attributes["exceptionType"] = code | ||
| observations.append(Observation(int(count), dict(attributes))) | ||
| _REQUESTS_MAP[_REQ_EXCEPTION_NAME[1]][code] = 0 # type: ignore | ||
| observations.extend(_iter_additional_observations(_REQ_EXCEPTION_NAME[0], options)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return observations | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,8 +3,10 @@ | |
| import os | ||
| import logging | ||
| import json | ||
| from collections.abc import Iterable # pylint: disable=import-error | ||
| from typing import Optional, Dict | ||
| import threading | ||
| from collections.abc import Iterable, Callable, Iterator # pylint: disable=import-error | ||
| from typing import Optional, Dict, List | ||
| from opentelemetry.metrics import CallbackOptions, Observation | ||
|
|
||
| from azure.monitor.opentelemetry.exporter._constants import ( | ||
|
rads-1996 marked this conversation as resolved.
|
||
| _APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME, | ||
|
|
@@ -26,6 +28,9 @@ | |
| _REQUESTS_MAP_LOCK, | ||
| ) | ||
|
|
||
| _ADDITIONAL_CALLBACKS: Dict[str, List[Callable[[CallbackOptions], Iterable[Observation]]]] = {} | ||
| _ADDITIONAL_CALLBACKS_LOCK = threading.Lock() | ||
|
|
||
|
|
||
| def _get_stats_connection_string(endpoint: str) -> str: | ||
| cs_env = os.environ.get(_APPLICATIONINSIGHTS_STATS_CONNECTION_STRING_ENV_NAME) | ||
|
|
@@ -165,3 +170,36 @@ def _get_connection_string_for_region_from_config(target_region: str, settings: | |
| "Unexpected error getting stats connection string for region '%s': %s", target_region, str(ex) | ||
| ) | ||
| return None | ||
|
|
||
|
|
||
| def _iter_additional_observations(metric_name: str, options: CallbackOptions) -> Iterator[Observation]: | ||
| """Yield observations contributed via :func:`add_metric_callback`. | ||
|
|
||
| Invoked by the built-in ``_StatsbeatMetrics`` callbacks at collection time. | ||
| Snapshots the registered callbacks under the registry lock to avoid | ||
| mutation during iteration, then releases the lock before invoking user | ||
| callbacks (so they cannot deadlock against the registry). Exceptions raised | ||
| by individual callbacks are caught, logged, and skipped. | ||
|
|
||
| :param metric_name: Name of the built-in statsbeat metric being collected. | ||
| :type metric_name: str | ||
| :param options: OpenTelemetry callback options forwarded to each registered callback. | ||
| :type options: ~opentelemetry.metrics.CallbackOptions | ||
| :returns: Iterator over observations contributed by registered callbacks. | ||
| :rtype: Iterator[~opentelemetry.metrics.Observation] | ||
| """ | ||
|
|
||
| with _ADDITIONAL_CALLBACKS_LOCK: | ||
| callbacks = tuple(_ADDITIONAL_CALLBACKS.get(metric_name, ())) | ||
|
|
||
| iter_logger = logging.getLogger(__name__) | ||
| for cb in callbacks: | ||
| try: | ||
| yield from cb(options) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is passing in |
||
| except Exception: # pylint: disable=broad-except | ||
| iter_logger.debug( | ||
| "Extra statsbeat callback %r for %r raised; skipping.", | ||
| cb, | ||
| metric_name, | ||
| exc_info=True, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: blank line