diff --git a/CHANGELOG.md b/CHANGELOG.md index 941551d8a9..939efe40ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-exporter-otlp-proto-http`: refactor shared HTTP exporter logic into common module, extract `_setup_session`, `_export`, + `_export_with_retries`, and `_compression_from_env` from trace/log/metric exporters into `_common` + ([#2990](https://github.com/open-telemetry/opentelemetry-python/pull/5160)) - `opentelemetry-sdk`: add `additional_properties` support to generated config models via custom `datamodel-codegen` template, enabling plugin/custom component names to flow through typed dataclasses ([#5131](https://github.com/open-telemetry/opentelemetry-python/pull/5131)) - Fix incorrect code example in `create_tracer()` docstring diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 1bdb7d228c..b72726ce30 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -12,16 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gzip +import logging +import random +import threading +import zlib +from io import BytesIO from os import environ -from typing import Literal, Optional +from time import time +from typing import Any, Dict, Literal, Optional, Tuple, Union import requests +from requests.exceptions import ConnectionError +from opentelemetry.exporter.otlp.proto.http import ( + _OTLP_HTTP_HEADERS, + Compression, +) from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER, + OTEL_EXPORTER_OTLP_COMPRESSION, +) +from opentelemetry.semconv.attributes.http_attributes import ( + HTTP_RESPONSE_STATUS_CODE, ) from opentelemetry.util._importlib_metadata import entry_points +_logger = logging.getLogger(__name__) + +_MAX_RETRIES = 6 + def _is_retryable(resp: requests.Response) -> bool: if resp.status_code == 408: @@ -64,3 +84,168 @@ def _load_session_from_envvar( f" must be of type `requests.Session`." ) return None + + +def _setup_session( + session: Optional[requests.Session], + cred_envvar: Literal[ + "OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER", + "OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER", + "OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER", + ], + headers: Dict[str, str], + compression: Compression, +) -> requests.Session: + configured_session = ( + session or _load_session_from_envvar(cred_envvar) or requests.Session() + ) + configured_session.headers.update(headers) + configured_session.headers.update(_OTLP_HTTP_HEADERS) + # let users override our defaults + configured_session.headers.update(headers) + if compression is not Compression.NoCompression: + configured_session.headers.update( + {"Content-Encoding": compression.value} + ) + return configured_session + + +def _export( + session: requests.Session, + endpoint: str, + serialized_data: bytes, + compression: Compression, + certificate_file: Union[str, bool], + client_cert: Optional[Union[str, Tuple[str, str]]], + timeout_sec: float, +) -> requests.Response: + data = serialized_data + if compression == Compression.Gzip: + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + data = gzip_data.getvalue() + elif compression == Compression.Deflate: + data = zlib.compress(serialized_data) + + # By default, keep-alive is enabled in Session's request + # headers. Backends may choose to close the connection + # while a post happens which causes an unhandled + # exception. This try/except will retry the post on such exceptions + try: + resp = session.post( + url=endpoint, + data=data, + verify=certificate_file, + timeout=timeout_sec, + cert=client_cert, + ) + except ConnectionError: + resp = session.post( + url=endpoint, + data=data, + verify=certificate_file, + timeout=timeout_sec, + cert=client_cert, + ) + return resp + + +def _export_with_retries( + session: requests.Session, + endpoint: str, + serialized_data: bytes, + compression: Compression, + certificate_file: Union[str, bool], + client_cert: Optional[Union[str, Tuple[str, str]]], + timeout: float, + shutdown_event: threading.Event, + result: Any, + batch_name: str, +) -> bool: + deadline_sec = time() + timeout + for retry_num in range(_MAX_RETRIES): + # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. + backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + export_error: Optional[Exception] = None + try: + resp = _export( + session, + endpoint, + serialized_data, + compression, + certificate_file, + client_cert, + deadline_sec - time(), + ) + if resp.ok: + return True + except requests.exceptions.RequestException as error: + reason = error + export_error = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code + + error_attrs = ( + {HTTP_RESPONSE_STATUS_CODE: status_code} + if status_code is not None + else None + ) + + if not retryable: + _logger.error( + "Failed to export %s batch code: %s, reason: %s", + batch_name, + status_code, + reason, + ) + result.error = export_error + result.error_attrs = error_attrs + return False + + if ( + retry_num + 1 == _MAX_RETRIES + or backoff_seconds > (deadline_sec - time()) + or shutdown_event.is_set() + ): + _logger.error( + "Failed to export %s batch due to timeout, " + "max retries or shutdown.", + batch_name, + ) + result.error = export_error + result.error_attrs = error_attrs + return False + + _logger.warning( + "Transient error %s encountered while exporting %s batch, retrying in %.2fs.", + reason, + batch_name, + backoff_seconds, + ) + if shutdown_event.wait(backoff_seconds): + _logger.warning("Shutdown in progress, aborting retry.") + break + return False + + +def _compression_from_env( + signal_compression_envvar: Literal[ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION", + "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION", + ], +) -> Compression: + compression = ( + environ.get( + signal_compression_envvar, + environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), + ) + .lower() + .strip() + ) + return Compression(compression) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 6032433dd1..b7bab8b438 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -12,31 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time from typing import Dict, Optional, Sequence from urllib.parse import urlparse import requests -from requests.exceptions import ConnectionError from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( ExporterMetrics, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _compression_from_env, + _export_with_retries, + _setup_session, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk._logs import ReadableLogRecord @@ -50,7 +44,6 @@ OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE, @@ -65,9 +58,6 @@ from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OtelComponentTypeValues, ) -from opentelemetry.semconv.attributes.http_attributes import ( - HTTP_RESPONSE_STATUS_CODE, -) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -77,9 +67,8 @@ DEFAULT_COMPRESSION = Compression.NoCompression DEFAULT_ENDPOINT = "http://localhost:4318/" -DEFAULT_LOGS_EXPORT_PATH = "v1/logs" DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 +DEFAULT_LOGS_EXPORT_PATH = "v1/logs" class OTLPLogExporter(LogRecordExporter): @@ -96,7 +85,7 @@ def __init__( *, meter_provider: Optional[MeterProvider] = None, ): - self._shutdown_is_occuring = threading.Event() + self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, _append_logs_path( @@ -134,22 +123,15 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) - self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER - ) - or requests.Session() + self._compression = compression or _compression_from_env( + OTEL_EXPORTER_OTLP_LOGS_COMPRESSION + ) + self._session = _setup_session( + session, + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER, + self._headers, + self._compression, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) self._shutdown = False self._metrics = ExporterMetrics( @@ -159,43 +141,6 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export( self, batch: Sequence[ReadableLogRecord] ) -> LogRecordExportResult: @@ -203,69 +148,25 @@ def export( _logger.warning("Exporter already shutdown, ignoring batch") return LogRecordExportResult.FAILURE + serialized_data = encode_logs(batch).SerializeToString() with self._metrics.export_operation(len(batch)) as result: - serialized_data = encode_logs(batch).SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return LogRecordExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export logs batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return LogRecordExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_is_occuring.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return LogRecordExportResult.FAILURE + success = _export_with_retries( + self._session, + self._endpoint, + serialized_data, + self._compression, + self._certificate_file, + self._client_cert, + self._timeout, + self._shutdown_in_progress, + result, + "logs", + ) + return ( + LogRecordExportResult.SUCCESS + if success + else LogRecordExportResult.FAILURE + ) def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" @@ -276,22 +177,10 @@ def shutdown(self): _logger.warning("Exporter already shutdown, ignoring call") return self._shutdown = True - self._shutdown_is_occuring.set() + self._shutdown_in_progress.set() self._session.close() -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_LOGS_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - def _append_logs_path(endpoint: str) -> str: if endpoint.endswith("/"): return endpoint + DEFAULT_LOGS_EXPORT_PATH diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index efd63b4543..7c5838d2ae 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -12,14 +12,9 @@ # limitations under the License. from __future__ import annotations -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time from typing import ( # noqa: F401 Any, Callable, @@ -31,7 +26,6 @@ from urllib.parse import urlparse import requests -from requests.exceptions import ConnectionError from typing_extensions import deprecated from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( @@ -47,12 +41,12 @@ encode_metrics, ) from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _compression_from_env, + _export_with_retries, + _setup_session, ) from opentelemetry.metrics import MeterProvider from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401 @@ -75,7 +69,6 @@ OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, @@ -103,9 +96,6 @@ from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OtelComponentTypeValues, ) -from opentelemetry.semconv.attributes.http_attributes import ( - HTTP_RESPONSE_STATUS_CODE, -) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -113,9 +103,8 @@ DEFAULT_COMPRESSION = Compression.NoCompression DEFAULT_ENDPOINT = "http://localhost:4318/" -DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 +DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): @@ -194,22 +183,15 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) - self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER - ) - or requests.Session() + self._compression = compression or _compression_from_env( + OTEL_EXPORTER_OTLP_METRICS_COMPRESSION + ) + self._session = _setup_session( + session, + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_METRICS_CREDENTIAL_PROVIDER, + self._headers, + self._compression, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) self._common_configuration( preferred_temporality, preferred_aggregation @@ -224,122 +206,6 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - - def _export_with_retries( - self, - export_request: ExportMetricsServiceRequest, - deadline_sec: float, - num_items: int, - ) -> MetricExportResult: - """Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out. - - Args: - export_request: ExportMetricsServiceRequest object containing metrics data to export - deadline_sec: timestamp deadline for the export - - Returns: - MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise - """ - with self._metrics.export_operation(num_items) as result: - serialized_data = export_request.SerializeToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return MetricExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export metrics batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return MetricExportResult.FAILURE - - _logger.warning( - "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return MetricExportResult.FAILURE - def export( self, metrics_data: MetricsData, @@ -357,26 +223,36 @@ def export( num_items += len(metric.data.data_points) export_request = encode_metrics(metrics_data) - deadline_sec = time() + self._timeout + + def _do_export(request: ExportMetricsServiceRequest) -> bool: + serialized_data = request.SerializeToString() + with self._metrics.export_operation(num_items) as result: + return _export_with_retries( + self._session, + self._endpoint, + serialized_data, + self._compression, + self._certificate_file, + self._client_cert, + self._timeout, + self._shutdown_in_progress, + result, + "metrics", + ) # If no batch size configured, export as single batch with retries as configured if self._max_export_batch_size is None: - return self._export_with_retries( - export_request, deadline_sec, num_items + return ( + MetricExportResult.SUCCESS + if _do_export(export_request) + else MetricExportResult.FAILURE ) # Else, export in batches of configured size - batched_export_requests = _split_metrics_data( + for split_request in _split_metrics_data( export_request, self._max_export_batch_size - ) - - for split_metrics_data in batched_export_requests: - export_result = self._export_with_retries( - split_metrics_data, - deadline_sec, - num_items, - ) - if export_result != MetricExportResult.SUCCESS: + ): + if not _do_export(split_request): return MetricExportResult.FAILURE # Only returns SUCCESS if all batches succeeded @@ -727,18 +603,6 @@ def get_resource_data( return _get_resource_data(sdk_resource_scope_data, resource_class, name) -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - def _append_metrics_path(endpoint: str) -> str: if endpoint.endswith("/"): return endpoint + DEFAULT_METRICS_EXPORT_PATH diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 018d89df1e..97aada9e27 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -12,19 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gzip import logging -import random import threading -import zlib -from io import BytesIO from os import environ -from time import time from typing import Dict, Optional, Sequence from urllib.parse import urlparse import requests -from requests.exceptions import ConnectionError from opentelemetry.exporter.otlp.proto.common._exporter_metrics import ( ExporterMetrics, @@ -33,12 +27,12 @@ encode_spans, ) from opentelemetry.exporter.otlp.proto.http import ( - _OTLP_HTTP_HEADERS, Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( - _is_retryable, - _load_session_from_envvar, + _compression_from_env, + _export_with_retries, + _setup_session, ) from opentelemetry.metrics import MeterProvider from opentelemetry.sdk.environment_variables import ( @@ -46,7 +40,6 @@ OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, - OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT, @@ -63,9 +56,6 @@ from opentelemetry.semconv._incubating.attributes.otel_attributes import ( OtelComponentTypeValues, ) -from opentelemetry.semconv.attributes.http_attributes import ( - HTTP_RESPONSE_STATUS_CODE, -) from opentelemetry.util.re import parse_env_headers _logger = logging.getLogger(__name__) @@ -73,9 +63,8 @@ DEFAULT_COMPRESSION = Compression.NoCompression DEFAULT_ENDPOINT = "http://localhost:4318/" -DEFAULT_TRACES_EXPORT_PATH = "v1/traces" DEFAULT_TIMEOUT = 10 # in seconds -_MAX_RETRYS = 6 +DEFAULT_TRACES_EXPORT_PATH = "v1/traces" class OTLPSpanExporter(SpanExporter): @@ -129,22 +118,15 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) - self._compression = compression or _compression_from_env() - self._session = ( - session - or _load_session_from_envvar( - _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER - ) - or requests.Session() + self._compression = compression or _compression_from_env( + OTEL_EXPORTER_OTLP_TRACES_COMPRESSION + ) + self._session = _setup_session( + session, + _OTEL_PYTHON_EXPORTER_OTLP_HTTP_TRACES_CREDENTIAL_PROVIDER, + self._headers, + self._compression, ) - self._session.headers.update(self._headers) - self._session.headers.update(_OTLP_HTTP_HEADERS) - # let users override our defaults - self._session.headers.update(self._headers) - if self._compression is not Compression.NoCompression: - self._session.headers.update( - {"Content-Encoding": self._compression.value} - ) self._shutdown = False self._metrics = ExporterMetrics( @@ -154,111 +136,28 @@ def __init__( meter_provider, ) - def _export( - self, serialized_data: bytes, timeout_sec: Optional[float] = None - ): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - if timeout_sec is None: - timeout_sec = self._timeout - - # By default, keep-alive is enabled in Session's request - # headers. Backends may choose to close the connection - # while a post happens which causes an unhandled - # exception. This try/except will retry the post on such exceptions - try: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - except ConnectionError: - resp = self._session.post( - url=self._endpoint, - data=data, - verify=self._certificate_file, - timeout=timeout_sec, - cert=self._client_cert, - ) - return resp - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE + serialized_data = encode_spans(spans).SerializePartialToString() with self._metrics.export_operation(len(spans)) as result: - serialized_data = encode_spans(spans).SerializePartialToString() - deadline_sec = time() + self._timeout - for retry_num in range(_MAX_RETRYS): - # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. - backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) - export_error: Optional[Exception] = None - try: - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return SpanExportResult.SUCCESS - except requests.exceptions.RequestException as error: - reason = error - export_error = error - retryable = isinstance(error, ConnectionError) - status_code = None - else: - reason = resp.reason - retryable = _is_retryable(resp) - status_code = resp.status_code - - if not retryable: - _logger.error( - "Failed to export span batch code: %s, reason: %s", - status_code, - reason, - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - - if ( - retry_num + 1 == _MAX_RETRYS - or backoff_seconds > (deadline_sec - time()) - or self._shutdown - ): - _logger.error( - "Failed to export span batch due to timeout, " - "max retries or shutdown." - ) - error_attrs = ( - {HTTP_RESPONSE_STATUS_CODE: status_code} - if status_code is not None - else None - ) - result.error = export_error - result.error_attrs = error_attrs - return SpanExportResult.FAILURE - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %.2fs.", - reason, - backoff_seconds, - ) - shutdown = self._shutdown_in_progress.wait(backoff_seconds) - if shutdown: - _logger.warning("Shutdown in progress, aborting retry.") - break - return SpanExportResult.FAILURE + success = _export_with_retries( + self._session, + self._endpoint, + serialized_data, + self._compression, + self._certificate_file, + self._client_cert, + self._timeout, + self._shutdown_in_progress, + result, + "span", + ) + return ( + SpanExportResult.SUCCESS if success else SpanExportResult.FAILURE + ) def shutdown(self): if self._shutdown: @@ -273,18 +172,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return True -def _compression_from_env() -> Compression: - compression = ( - environ.get( - OTEL_EXPORTER_OTLP_TRACES_COMPRESSION, - environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), - ) - .lower() - .strip() - ) - return Compression(compression) - - def _append_trace_path(endpoint: str) -> str: if endpoint.endswith("/"): return endpoint + DEFAULT_TRACES_EXPORT_PATH diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 5f7ae2afa9..c95a733f0b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -1265,7 +1265,10 @@ def test_exponential_explicit_bucket_histogram(self): ExplicitBucketHistogramAggregation, ) - @patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True)) + @patch( + "opentelemetry.exporter.otlp.proto.http._common._export", + return_value=Mock(ok=True), + ) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ Test that any HTTP 2XX code returns a successful result diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 7981b0bc82..a13e9f8da9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -456,7 +456,10 @@ def _get_sdk_log_data() -> List[ReadWriteLogRecord]: return [log1, log2, log3, log4] - @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) + @patch( + "opentelemetry.exporter.otlp.proto.http._common._export", + return_value=Mock(ok=True), + ) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ Test that any HTTP 2XX code returns a successful result diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 0df471aa69..546d163bf8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -277,7 +277,10 @@ def test_headers_parse_from_env(self): ), ) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) + @patch( + "opentelemetry.exporter.otlp.proto.http._common._export", + return_value=Mock(ok=True), + ) def test_2xx_status_code(self, mock_otlp_metric_exporter): """ Test that any HTTP 2XX code returns a successful result