diff --git a/.changelog/5369.changed b/.changelog/5369.changed new file mode 100644 index 00000000000..b2061cefede --- /dev/null +++ b/.changelog/5369.changed @@ -0,0 +1 @@ +`opentelemetry-exporter-otlp-proto-http`: add a `max_request_size` argument to the OTLP HTTP exporters (traces, logs, metrics); serialized requests larger than the limit are dropped before sending, measured before compression. Defaults to 64 MiB (enabled); set to 0 to disable. Mirrors opentelemetry-go#8157. diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py index 57bd7ca065a..72ff4f291fe 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_common/__init__.py @@ -12,6 +12,14 @@ from opentelemetry.util._importlib_metadata import entry_points +class RequestPayloadTooLargeError(Exception): + """A serialized OTLP request exceeded the configured ``max_request_size``. + + The class name is emitted as the ``error.type`` attribute on the exporter's + failed-export metric, so renaming it changes observable telemetry. + """ + + def _is_retryable(resp: requests.Response) -> bool: if resp.status_code == 408: return True @@ -20,6 +28,19 @@ def _is_retryable(resp: requests.Response) -> bool: return False +def _is_request_too_large( + serialized_data: bytes, max_request_size: int +) -> bool: + """Return True if the serialized request exceeds a positive size limit. + + The size is measured on the uncompressed serialized request, matching the + OTLP specification's "before compression" request-size limit. A + ``max_request_size`` of ``0`` (or any non-positive value) disables the + check. + """ + return max_request_size > 0 and len(serialized_data) > max_request_size + + def _load_session_from_envvar( cred_envvar: Literal[ "OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER", diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 56906b96501..3f71ad7f32b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -25,6 +25,8 @@ Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( + RequestPayloadTooLargeError, + _is_request_too_large, _is_retryable, _load_session_from_envvar, ) @@ -70,6 +72,7 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_LOGS_EXPORT_PATH = "v1/logs" DEFAULT_TIMEOUT = 10 # in seconds +_DEFAULT_MAX_REQUEST_SIZE = 64 * 1024 * 1024 # 64 MiB, in bytes _MAX_RETRYS = 6 @@ -85,8 +88,26 @@ def __init__( compression: Compression | None = None, session: requests.Session | None = None, *, + max_request_size: int | None = None, meter_provider: MeterProvider | None = None, ): + """OTLP HTTP log exporter. + + Args: + endpoint: Target URL to which the exporter is going to send logs. + certificate_file: Path to the CA certificate file for TLS. + client_key_file: Path to the client key file for mTLS. + client_certificate_file: Path to the client certificate file for mTLS. + headers: Headers to send with each export request. + timeout: Timeout in seconds for each export request. + compression: Compression to use; one of none, gzip, deflate. + session: Requests session to use at export. + max_request_size: Maximum size in bytes of a serialized request, + measured before compression. A request exceeding this size is + dropped before being sent. Defaults to 64 MiB; a value of 0 (or + any non-positive value) disables the limit. + meter_provider: MeterProvider used for the exporter's own metrics. + """ self._shutdown_is_occuring = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, @@ -125,6 +146,11 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) + self._max_request_size = ( + _DEFAULT_MAX_REQUEST_SIZE + if max_request_size is None + else max_request_size + ) self._compression = compression or _compression_from_env() self._session = ( session @@ -200,6 +226,19 @@ def export( with self._metrics.export_operation(len(batch)) as result: serialized_data = encode_logs(batch).SerializeToString() + if _is_request_too_large(serialized_data, self._max_request_size): + _logger.warning( + "Dropping logs batch: serialized size %d bytes exceeds " + "max_request_size %d bytes.", + len(serialized_data), + self._max_request_size, + ) + result.error = RequestPayloadTooLargeError( + f"Serialized logs request size {len(serialized_data)} " + f"bytes exceeds max_request_size " + f"{self._max_request_size} bytes." + ) + return LogRecordExportResult.FAILURE deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index eb1e69cfe4f..4f73e1ef145 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -39,6 +39,8 @@ Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( + RequestPayloadTooLargeError, + _is_request_too_large, _is_retryable, _load_session_from_envvar, ) @@ -104,6 +106,7 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" DEFAULT_TIMEOUT = 10 # in seconds +_DEFAULT_MAX_REQUEST_SIZE = 64 * 1024 * 1024 # 64 MiB, in bytes _MAX_RETRYS = 6 @@ -123,6 +126,7 @@ def __init__( preferred_aggregation: dict[type, Aggregation] | None = None, max_export_batch_size: int | None = None, *, + max_request_size: int | None = None, meter_provider: MeterProvider | None = None, ): """OTLP HTTP metrics exporter @@ -145,6 +149,10 @@ def __init__( max_export_batch_size: Maximum number of data points to export in a single request. If not set there is no limit to the number of data points in a request. If it is set and the number of data points exceeds the max, the request will be split. + max_request_size: Maximum size in bytes of a serialized request, measured before + compression. A request exceeding this size is dropped before being sent. Defaults + to 64 MiB; a value of 0 (or any non-positive value) disables the limit. + meter_provider: MeterProvider used for the exporter's own metrics. """ self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( @@ -204,6 +212,11 @@ def __init__( preferred_temporality, preferred_aggregation ) self._max_export_batch_size: int | None = max_export_batch_size + self._max_request_size = ( + _DEFAULT_MAX_REQUEST_SIZE + if max_request_size is None + else max_request_size + ) self._shutdown = False self._metrics = create_exporter_metrics( @@ -271,6 +284,19 @@ def _export_with_retries( """ with self._metrics.export_operation(num_items) as result: serialized_data = export_request.SerializeToString() + if _is_request_too_large(serialized_data, self._max_request_size): + _logger.warning( + "Dropping metrics batch: serialized size %d bytes exceeds " + "max_request_size %d bytes.", + len(serialized_data), + self._max_request_size, + ) + result.error = RequestPayloadTooLargeError( + f"Serialized metrics request size {len(serialized_data)} " + f"bytes exceeds max_request_size " + f"{self._max_request_size} bytes." + ) + return MetricExportResult.FAILURE deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 2be240103c0..b5a48a547b3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -27,6 +27,8 @@ Compression, ) from opentelemetry.exporter.otlp.proto.http._common import ( + RequestPayloadTooLargeError, + _is_request_too_large, _is_retryable, _load_session_from_envvar, ) @@ -66,6 +68,7 @@ DEFAULT_ENDPOINT = "http://localhost:4318/" DEFAULT_TRACES_EXPORT_PATH = "v1/traces" DEFAULT_TIMEOUT = 10 # in seconds +_DEFAULT_MAX_REQUEST_SIZE = 64 * 1024 * 1024 # 64 MiB, in bytes _MAX_RETRYS = 6 @@ -81,8 +84,26 @@ def __init__( compression: Compression | None = None, session: requests.Session | None = None, *, + max_request_size: int | None = None, meter_provider: MeterProvider | None = None, ): + """OTLP HTTP span exporter. + + Args: + endpoint: Target URL to which the exporter is going to send spans. + certificate_file: Path to the CA certificate file for TLS. + client_key_file: Path to the client key file for mTLS. + client_certificate_file: Path to the client certificate file for mTLS. + headers: Headers to send with each export request. + timeout: Timeout in seconds for each export request. + compression: Compression to use; one of none, gzip, deflate. + session: Requests session to use at export. + max_request_size: Maximum size in bytes of a serialized request, + measured before compression. A request exceeding this size is + dropped before being sent. Defaults to 64 MiB; a value of 0 (or + any non-positive value) disables the limit. + meter_provider: MeterProvider used for the exporter's own metrics. + """ self._shutdown_in_progress = threading.Event() self._endpoint = endpoint or environ.get( OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, @@ -120,6 +141,11 @@ def __init__( environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), ) ) + self._max_request_size = ( + _DEFAULT_MAX_REQUEST_SIZE + if max_request_size is None + else max_request_size + ) self._compression = compression or _compression_from_env() self._session = ( session @@ -193,6 +219,19 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: with self._metrics.export_operation(len(spans)) as result: serialized_data = encode_spans(spans).SerializePartialToString() + if _is_request_too_large(serialized_data, self._max_request_size): + _logger.warning( + "Dropping span batch: serialized size %d bytes exceeds " + "max_request_size %d bytes.", + len(serialized_data), + self._max_request_size, + ) + result.error = RequestPayloadTooLargeError( + f"Serialized span request size {len(serialized_data)} " + f"bytes exceeds max_request_size " + f"{self._max_request_size} bytes." + ) + return SpanExportResult.FAILURE deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 84a11e8ae90..3ba2fc448d5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -128,6 +128,82 @@ def setUp(self): ), } + def test_max_request_size_default(self): + self.assertEqual( + OTLPMetricExporter()._max_request_size, 64 * 1024 * 1024 + ) + + @patch.object(Session, "post") + def test_oversized_payload_dropped_before_send(self, mock_post): + exporter = OTLPMetricExporter(max_request_size=1) + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + mock_post.assert_not_called() + + @patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True)) + def test_max_request_size_zero_disables(self, _mock_export): + exporter = OTLPMetricExporter(max_request_size=0) + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.SUCCESS, + ) + + @patch.object(Session, "post") + def test_negative_max_request_size_disables_limit(self, mock_post): + resp = Response() + resp.status_code = 200 + mock_post.return_value = resp + exporter = OTLPMetricExporter(max_request_size=-1) + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.SUCCESS, + ) + mock_post.assert_called() + + @patch.object(Session, "post") + def test_oversized_payload_dropped_with_batch_splitting_enabled( + self, mock_post + ): + # With batch-splitting enabled, the byte check still applies to each + # post-split request, so a too-small limit drops every split before + # sending (an oversized split aborts the batch, like any other + # non-retryable per-split failure). + exporter = OTLPMetricExporter( + max_request_size=1, max_export_batch_size=1 + ) + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + mock_post.assert_not_called() + + @patch.dict( + "os.environ", {OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED: "true"} + ) + @patch.object(Session, "post") + def test_oversized_payload_records_failure_metric(self, mock_post): + exporter = OTLPMetricExporter( + max_request_size=1, meter_provider=self.meter_provider + ) + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + mock_post.assert_not_called() + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + exported = next( + metric + for metric in scope_metrics.metrics + if metric.name == "otel.sdk.exporter.metric_data_point.exported" + ) + self.assertEqual( + exported.data.data_points[0].attributes["error.type"], + "RequestPayloadTooLargeError", + ) + def test_constructor_default(self): exporter = OTLPMetricExporter() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index d7f3592e288..3663b0eb9bc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -68,6 +68,7 @@ ENV_TIMEOUT = "30" +# pylint: disable=too-many-public-methods class TestOTLPHTTPLogExporter(unittest.TestCase): def setUp(self): self.metric_reader = InMemoryMetricReader() @@ -656,6 +657,63 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + def test_max_request_size_default(self): + self.assertEqual(OTLPLogExporter()._max_request_size, 64 * 1024 * 1024) + + @patch.object(Session, "post") + def test_oversized_payload_dropped_before_send(self, mock_post): + exporter = OTLPLogExporter(max_request_size=1) + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogRecordExportResult.FAILURE, + ) + mock_post.assert_not_called() + + @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) + def test_max_request_size_zero_disables(self, _mock_export): + exporter = OTLPLogExporter(max_request_size=0) + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogRecordExportResult.SUCCESS, + ) + + @patch.object(Session, "post") + def test_negative_max_request_size_disables_limit(self, mock_post): + resp = Response() + resp.status_code = 200 + mock_post.return_value = resp + exporter = OTLPLogExporter(max_request_size=-1) + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogRecordExportResult.SUCCESS, + ) + mock_post.assert_called() + + @patch.dict( + "os.environ", {OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED: "true"} + ) + @patch.object(Session, "post") + def test_oversized_payload_records_failure_metric(self, mock_post): + exporter = OTLPLogExporter( + max_request_size=1, meter_provider=self.meter_provider + ) + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogRecordExportResult.FAILURE, + ) + mock_post.assert_not_called() + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + exported = next( + metric + for metric in scope_metrics.metrics + if metric.name == "otel.sdk.exporter.log.exported" + ) + self.assertEqual( + exported.data.data_points[0].attributes["error.type"], + "RequestPayloadTooLargeError", + ) + def assert_standard_metric_attrs(self, attributes): self.assertEqual( attributes["otel.component.type"], "otlp_http_log_exporter" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 1580e5a1802..d51fc593398 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -1,9 +1,11 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 +import gzip import threading import time import unittest +from http.server import BaseHTTPRequestHandler, HTTPServer from logging import WARNING from unittest.mock import MagicMock, Mock, patch @@ -12,6 +14,9 @@ from requests.exceptions import ConnectionError from requests.models import Response +from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( + encode_spans, +) from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( DEFAULT_COMPRESSION, @@ -63,7 +68,27 @@ ) -# pylint: disable=protected-access +class _RecordingOTLPHandler(BaseHTTPRequestHandler): + # do_POST is the required override name from BaseHTTPRequestHandler. + def do_POST(self): # pylint: disable=invalid-name + content_length = int(self.headers.get("Content-Length", 0)) + self.server.received_bodies.append(self.rfile.read(content_length)) + self.send_response(200) + self.end_headers() + + def log_message(self, *args): # silence the test server's stderr logging + pass + + +def _start_recording_server(): + server = HTTPServer(("127.0.0.1", 0), _RecordingOTLPHandler) + server.received_bodies = [] + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server, thread + + +# pylint: disable=protected-access,too-many-public-methods class TestOTLPSpanExporter(unittest.TestCase): def setUp(self): self.metric_reader = InMemoryMetricReader() @@ -486,6 +511,109 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + def test_max_request_size_default(self): + exporter = OTLPSpanExporter() + self.assertEqual(exporter._max_request_size, 64 * 1024 * 1024) + + @patch.object(Session, "post") + def test_oversized_payload_dropped_before_send(self, mock_post): + exporter = OTLPSpanExporter(max_request_size=1) + self.assertEqual( + exporter.export([BASIC_SPAN]), SpanExportResult.FAILURE + ) + mock_post.assert_not_called() + + @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) + def test_max_request_size_zero_disables(self, _mock_export): + exporter = OTLPSpanExporter(max_request_size=0) + self.assertEqual( + exporter.export([BASIC_SPAN]), SpanExportResult.SUCCESS + ) + + @patch.object(Session, "post") + def test_negative_max_request_size_disables_limit(self, mock_post): + resp = Response() + resp.status_code = 200 + mock_post.return_value = resp + exporter = OTLPSpanExporter(max_request_size=-1) + self.assertEqual( + exporter.export([BASIC_SPAN]), SpanExportResult.SUCCESS + ) + mock_post.assert_called() + + @patch.object(Session, "post") + def test_oversized_payload_measured_before_compression(self, mock_post): + # The limit applies to the uncompressed serialized request. Build a + # highly compressible batch whose gzip size is below a limit that the + # uncompressed size still exceeds, then assert it is still dropped -- + # which can only hold if size is measured before compression. + spans = [BASIC_SPAN] * 200 + uncompressed = encode_spans(spans).SerializePartialToString() + compressed = gzip.compress(uncompressed) + limit = (len(compressed) + len(uncompressed)) // 2 + # Guard the discriminating condition: compressed < limit < uncompressed. + self.assertLess(len(compressed), limit) + self.assertLess(limit, len(uncompressed)) + exporter = OTLPSpanExporter( + max_request_size=limit, compression=Compression.Gzip + ) + self.assertEqual(exporter.export(spans), SpanExportResult.FAILURE) + mock_post.assert_not_called() + + @patch.dict( + "os.environ", {OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED: "true"} + ) + @patch.object(Session, "post") + def test_oversized_payload_records_failure_metric(self, mock_post): + exporter = OTLPSpanExporter( + max_request_size=1, meter_provider=self.meter_provider + ) + self.assertEqual( + exporter.export([BASIC_SPAN]), SpanExportResult.FAILURE + ) + mock_post.assert_not_called() + metrics_data = self.metric_reader.get_metrics_data() + scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0] + exported = next( + metric + for metric in scope_metrics.metrics + if metric.name == "otel.sdk.exporter.span.exported" + ) + self.assertEqual( + exported.data.data_points[0].attributes["error.type"], + "RequestPayloadTooLargeError", + ) + + def test_end_to_end_export_sends_request_over_http(self): + server, thread = _start_recording_server() + port = server.server_address[1] + try: + exporter = OTLPSpanExporter( + endpoint=f"http://127.0.0.1:{port}/v1/traces" + ) + result = exporter.export([BASIC_SPAN]) + finally: + server.shutdown() + thread.join() + self.assertEqual(result, SpanExportResult.SUCCESS) + self.assertEqual(len(server.received_bodies), 1) + self.assertGreater(len(server.received_bodies[0]), 0) + + def test_end_to_end_oversized_request_never_reaches_server(self): + server, thread = _start_recording_server() + port = server.server_address[1] + try: + exporter = OTLPSpanExporter( + endpoint=f"http://127.0.0.1:{port}/v1/traces", + max_request_size=1, + ) + result = exporter.export([BASIC_SPAN]) + finally: + server.shutdown() + thread.join() + self.assertEqual(result, SpanExportResult.FAILURE) + self.assertEqual(server.received_bodies, []) + def assert_standard_metric_attrs(self, attributes): self.assertEqual( attributes["otel.component.type"], "otlp_http_span_exporter"