Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/5369.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-exporter-otlp-proto-http`: add a `max_request_size` argument to the OTLP HTTP exporters (traces, logs, metrics); serialized requests larger than the limit are dropped before sending, measured before compression. Defaults to 64 MiB (enabled); set to 0 to disable. Mirrors opentelemetry-go#8157.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
from opentelemetry.util._importlib_metadata import entry_points


class RequestPayloadTooLargeError(Exception):
"""A serialized OTLP request exceeded the configured ``max_request_size``.

The class name is emitted as the ``error.type`` attribute on the exporter's
failed-export metric, so renaming it changes observable telemetry.
"""


def _is_retryable(resp: requests.Response) -> bool:
if resp.status_code == 408:
return True
Expand All @@ -20,6 +28,19 @@ def _is_retryable(resp: requests.Response) -> bool:
return False


def _is_request_too_large(
serialized_data: bytes, max_request_size: int
) -> bool:
"""Return True if the serialized request exceeds a positive size limit.

The size is measured on the uncompressed serialized request, matching the
OTLP specification's "before compression" request-size limit. A
``max_request_size`` of ``0`` (or any non-positive value) disables the
check.
"""
return max_request_size > 0 and len(serialized_data) > max_request_size


def _load_session_from_envvar(
cred_envvar: Literal[
"OTEL_PYTHON_EXPORTER_OTLP_HTTP_LOGS_CREDENTIAL_PROVIDER",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
RequestPayloadTooLargeError,
_is_request_too_large,
_is_retryable,
_load_session_from_envvar,
)
Expand Down Expand Up @@ -70,6 +72,7 @@
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds
_DEFAULT_MAX_REQUEST_SIZE = 64 * 1024 * 1024 # 64 MiB, in bytes
_MAX_RETRYS = 6


Expand All @@ -85,8 +88,26 @@ def __init__(
compression: Compression | None = None,
session: requests.Session | None = None,
*,
max_request_size: int | None = None,
meter_provider: MeterProvider | None = None,
):
"""OTLP HTTP log exporter.

Args:
endpoint: Target URL to which the exporter is going to send logs.
certificate_file: Path to the CA certificate file for TLS.
client_key_file: Path to the client key file for mTLS.
client_certificate_file: Path to the client certificate file for mTLS.
headers: Headers to send with each export request.
timeout: Timeout in seconds for each export request.
compression: Compression to use; one of none, gzip, deflate.
session: Requests session to use at export.
max_request_size: Maximum size in bytes of a serialized request,
measured before compression. A request exceeding this size is
dropped before being sent. Defaults to 64 MiB; a value of 0 (or
any non-positive value) disables the limit.
meter_provider: MeterProvider used for the exporter's own metrics.
"""
self._shutdown_is_occuring = threading.Event()
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
Expand Down Expand Up @@ -125,6 +146,11 @@ def __init__(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
)
)
self._max_request_size = (
_DEFAULT_MAX_REQUEST_SIZE
if max_request_size is None
else max_request_size
)
self._compression = compression or _compression_from_env()
self._session = (
session
Expand Down Expand Up @@ -200,6 +226,19 @@ def export(

with self._metrics.export_operation(len(batch)) as result:
serialized_data = encode_logs(batch).SerializeToString()
if _is_request_too_large(serialized_data, self._max_request_size):
_logger.warning(
"Dropping logs batch: serialized size %d bytes exceeds "
"max_request_size %d bytes.",
len(serialized_data),
self._max_request_size,
)
result.error = RequestPayloadTooLargeError(
f"Serialized logs request size {len(serialized_data)} "
f"bytes exceeds max_request_size "
f"{self._max_request_size} bytes."
)
return LogRecordExportResult.FAILURE
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
RequestPayloadTooLargeError,
_is_request_too_large,
_is_retryable,
_load_session_from_envvar,
)
Expand Down Expand Up @@ -104,6 +106,7 @@
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10 # in seconds
_DEFAULT_MAX_REQUEST_SIZE = 64 * 1024 * 1024 # 64 MiB, in bytes
_MAX_RETRYS = 6


Expand All @@ -123,6 +126,7 @@ def __init__(
preferred_aggregation: dict[type, Aggregation] | None = None,
max_export_batch_size: int | None = None,
*,
max_request_size: int | None = None,
meter_provider: MeterProvider | None = None,
):
"""OTLP HTTP metrics exporter
Expand All @@ -145,6 +149,10 @@ def __init__(
max_export_batch_size: Maximum number of data points to export in a single request.
If not set there is no limit to the number of data points in a request.
If it is set and the number of data points exceeds the max, the request will be split.
max_request_size: Maximum size in bytes of a serialized request, measured before
compression. A request exceeding this size is dropped before being sent. Defaults
to 64 MiB; a value of 0 (or any non-positive value) disables the limit.
meter_provider: MeterProvider used for the exporter's own metrics.
"""
self._shutdown_in_progress = threading.Event()
self._endpoint = endpoint or environ.get(
Expand Down Expand Up @@ -204,6 +212,11 @@ def __init__(
preferred_temporality, preferred_aggregation
)
self._max_export_batch_size: int | None = max_export_batch_size
self._max_request_size = (
_DEFAULT_MAX_REQUEST_SIZE
if max_request_size is None
else max_request_size
)
self._shutdown = False

self._metrics = create_exporter_metrics(
Expand Down Expand Up @@ -271,6 +284,19 @@ def _export_with_retries(
"""
with self._metrics.export_operation(num_items) as result:
serialized_data = export_request.SerializeToString()
if _is_request_too_large(serialized_data, self._max_request_size):
_logger.warning(
"Dropping metrics batch: serialized size %d bytes exceeds "
"max_request_size %d bytes.",
len(serialized_data),
self._max_request_size,
)
result.error = RequestPayloadTooLargeError(
f"Serialized metrics request size {len(serialized_data)} "
f"bytes exceeds max_request_size "
f"{self._max_request_size} bytes."
)
return MetricExportResult.FAILURE
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
Compression,
)
from opentelemetry.exporter.otlp.proto.http._common import (
RequestPayloadTooLargeError,
_is_request_too_large,
_is_retryable,
_load_session_from_envvar,
)
Expand Down Expand Up @@ -66,6 +68,7 @@
DEFAULT_ENDPOINT = "http://localhost:4318/"
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds
_DEFAULT_MAX_REQUEST_SIZE = 64 * 1024 * 1024 # 64 MiB, in bytes
_MAX_RETRYS = 6


Expand All @@ -81,8 +84,26 @@ def __init__(
compression: Compression | None = None,
session: requests.Session | None = None,
*,
max_request_size: int | None = None,
meter_provider: MeterProvider | None = None,
):
"""OTLP HTTP span exporter.

Args:
endpoint: Target URL to which the exporter is going to send spans.
certificate_file: Path to the CA certificate file for TLS.
client_key_file: Path to the client key file for mTLS.
client_certificate_file: Path to the client certificate file for mTLS.
headers: Headers to send with each export request.
timeout: Timeout in seconds for each export request.
compression: Compression to use; one of none, gzip, deflate.
session: Requests session to use at export.
max_request_size: Maximum size in bytes of a serialized request,
measured before compression. A request exceeding this size is
dropped before being sent. Defaults to 64 MiB; a value of 0 (or
any non-positive value) disables the limit.
meter_provider: MeterProvider used for the exporter's own metrics.
"""
self._shutdown_in_progress = threading.Event()
self._endpoint = endpoint or environ.get(
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
Expand Down Expand Up @@ -120,6 +141,11 @@ def __init__(
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
)
)
self._max_request_size = (
_DEFAULT_MAX_REQUEST_SIZE
if max_request_size is None
else max_request_size
)
self._compression = compression or _compression_from_env()
self._session = (
session
Expand Down Expand Up @@ -193,6 +219,19 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:

with self._metrics.export_operation(len(spans)) as result:
serialized_data = encode_spans(spans).SerializePartialToString()
if _is_request_too_large(serialized_data, self._max_request_size):
_logger.warning(
"Dropping span batch: serialized size %d bytes exceeds "
"max_request_size %d bytes.",
len(serialized_data),
self._max_request_size,
)
result.error = RequestPayloadTooLargeError(
f"Serialized span request size {len(serialized_data)} "
f"bytes exceeds max_request_size "
f"{self._max_request_size} bytes."
)
return SpanExportResult.FAILURE
deadline_sec = time() + self._timeout
for retry_num in range(_MAX_RETRYS):
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,82 @@ def setUp(self):
),
}

def test_max_request_size_default(self):
self.assertEqual(
OTLPMetricExporter()._max_request_size, 64 * 1024 * 1024
)

@patch.object(Session, "post")
def test_oversized_payload_dropped_before_send(self, mock_post):
exporter = OTLPMetricExporter(max_request_size=1)
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.FAILURE,
)
mock_post.assert_not_called()

@patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True))
def test_max_request_size_zero_disables(self, _mock_export):
exporter = OTLPMetricExporter(max_request_size=0)
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.SUCCESS,
)

@patch.object(Session, "post")
def test_negative_max_request_size_disables_limit(self, mock_post):
resp = Response()
resp.status_code = 200
mock_post.return_value = resp
exporter = OTLPMetricExporter(max_request_size=-1)
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.SUCCESS,
)
mock_post.assert_called()

@patch.object(Session, "post")
def test_oversized_payload_dropped_with_batch_splitting_enabled(
self, mock_post
):
# With batch-splitting enabled, the byte check still applies to each
# post-split request, so a too-small limit drops every split before
# sending (an oversized split aborts the batch, like any other
# non-retryable per-split failure).
exporter = OTLPMetricExporter(
max_request_size=1, max_export_batch_size=1
)
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.FAILURE,
)
mock_post.assert_not_called()

@patch.dict(
"os.environ", {OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED: "true"}
)
@patch.object(Session, "post")
def test_oversized_payload_records_failure_metric(self, mock_post):
exporter = OTLPMetricExporter(
max_request_size=1, meter_provider=self.meter_provider
)
self.assertEqual(
exporter.export(self.metrics["sum_int"]),
MetricExportResult.FAILURE,
)
mock_post.assert_not_called()
metrics_data = self.metric_reader.get_metrics_data()
scope_metrics = metrics_data.resource_metrics[0].scope_metrics[0]
exported = next(
metric
for metric in scope_metrics.metrics
if metric.name == "otel.sdk.exporter.metric_data_point.exported"
)
self.assertEqual(
exported.data.data_points[0].attributes["error.type"],
"RequestPayloadTooLargeError",
)

def test_constructor_default(self):
exporter = OTLPMetricExporter()

Expand Down
Loading
Loading