Add support for batching in PeriodicMetricReader#8296
Add support for batching in PeriodicMetricReader#8296psx95 wants to merge 18 commits intoopen-telemetry:mainfrom
Conversation
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #8296 +/- ##
============================================
+ Coverage 90.27% 90.31% +0.04%
- Complexity 7684 7708 +24
============================================
Files 850 851 +1
Lines 23186 23318 +132
Branches 2352 2363 +11
============================================
+ Hits 20931 21060 +129
- Misses 1530 1532 +2
- Partials 725 726 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| Collection<MetricData> currentBatch = new ArrayList<>(maxExportBatchSize); | ||
|
|
||
| // Fill active batch and split overlapping metric points if needed | ||
| for (MetricData metricData : metrics) { |
There was a problem hiding this comment.
nit: i expected prepareExportBatches to take the Collection so that all of the batching logic is contained within the batcher (and we don't have to pass + update currentBatch).
There was a problem hiding this comment.
But we are not technically updating the currentBatch in this loop - the contents within the currentBatch is updated by the prepareExportBatches.
My intention behind the prepareExportBatches was a function that splits a given MetricData into appropriate export batches. Since the batches need to be completely filled (except the last one), this function requires the currentBatch as well.
Maybe if the function was renamed to splitMetricData it would make more sense?
| batches = metricsBatcher.batchMetrics(metricData); | ||
| List<CompletableResultCode> results = new ArrayList<>(batches.size()); | ||
| for (Collection<MetricData> batch : batches) { | ||
| results.add(exporter.export(batch)); |
There was a problem hiding this comment.
I don't see exportTimeoutMillis implemented in the periodic reader anywhere. https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#periodic-exporting-metricreader. I mostly want to make sure that the timeout, if it exists, is applied to each export, and not to all exports together.
There was a problem hiding this comment.
Yes, it's set at the exporter level and I think it's on a per export call basis.
I was not 100% sure about this as well, which is why I called it out in the PR description. Maybe other reviewers can confirm/guide here.
There was a problem hiding this comment.
Pull request overview
Adds configurable export batching to PeriodicMetricReader so large metric collections can be split into multiple export calls based on a new setMaxExportBatchSize(int) builder option (per spec / #8245).
Changes:
- Add
PeriodicMetricReaderBuilder#setMaxExportBatchSize(int)and plumb aMetricExportBatcherintoPeriodicMetricReader. - Implement
MetricExportBatcherto splitMetricDatapoint collections into bounded batches. - Add/expand unit tests for batching behavior and builder validation.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java | Adds setMaxExportBatchSize configuration and passes batcher into reader. |
| sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java | Uses MetricExportBatcher to split exports into multiple calls when configured. |
| sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java | New batching/splitting implementation for metric point data. |
| sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java | Adds validation and integration-style tests for periodic batching behavior. |
| sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java | New focused unit tests for batching and splitting across metric types. |
| docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt | Records new public builder method in API diffs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| while (currentIndex < totalPointsInMetricData) { | ||
| int pointsToTake = | ||
| Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch); | ||
| currentBatch.add( | ||
| copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); |
There was a problem hiding this comment.
prepareExportBatches can enter an infinite loop when remainingCapacityInCurrentBatch is 0 (i.e., the current batch is already full) and the next MetricData has >0 points. In that case pointsToTake becomes 0, currentIndex never advances, and the while loop never terminates. Handle the zero-capacity case by flushing/rolling over to a new batch before the loop (or ensuring pointsToTake is always >0). Please also add a test that covers the scenario where a metric exactly fills a batch and the following metric triggers a split.
There was a problem hiding this comment.
I don't think this will happen. When remainingCapacityInCurrentBatch is 0:
First Iteration:
pointsToTake=Math.min(totalPointsInMetricData - 0, 0)evaluates to 0.copyMetricData(..., 0, 0)executes and creates aMetricDatawith 0 points, which is added to currentBatch.currentIndexandremainingCapacityInCurrentBatchremain at 0.- The condition
if (remainingCapacityInCurrentBatch == 0)evaluates totrue, causing thecurrentBatch(now containing the extra 0-point MetricData) to be flushed topreparedBatches. - Crucially,
remainingCapacityInCurrentBatchis then reset tomaxExportBatchSize(which is strictly positive).
Second Iteration:
- Since currentIndex is still 0 < totalPointsInMetricData.
- The next iteration starts with
remainingCapacityInCurrentBatch == maxExportBatchSize. pointsToTakeis now > 0,currentIndexadvances normally, and the loop successfully terminates!
| int currentBatchPoints = 0; | ||
| for (MetricData m : currentBatch) { | ||
| currentBatchPoints += m.getData().getPoints().size(); | ||
| } | ||
| int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatchPoints; |
There was a problem hiding this comment.
prepareExportBatches recomputes currentBatchPoints by iterating the entire currentBatch for every incoming MetricData, which makes batching O(n²) in the number of metrics per batch. Consider tracking the current batch point count incrementally (e.g., return it in MetricDataSplitOperationResult or maintain it alongside currentBatch) to keep batching linear.
There was a problem hiding this comment.
This is technically correct, but I'm not sure if the performance gain is worth complicating the logic for a loop that will be a reasonably small number of iterations (a reasonable batch size would be < 500 points?).
I'd be curious to hear other reviewers' thoughts on this.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (metricsBatcher != null) { | ||
| batches = metricsBatcher.batchMetrics(metricData); | ||
| List<CompletableResultCode> results = new ArrayList<>(batches.size()); | ||
| for (Collection<MetricData> batch : batches) { | ||
| results.add(exporter.export(batch)); | ||
| } | ||
| result = CompletableResultCode.ofAll(results); | ||
| } else { |
There was a problem hiding this comment.
MetricExporter#export documents that the caller (PeriodicMetricReader) will not call export again until the previous call completes. With batching enabled, this loop invokes exporter.export(batch) multiple times back-to-back without waiting for each returned CompletableResultCode to complete, which can lead to concurrent in-flight exports for exporters that are not re-entrant. Please change the batching export logic to export batches sequentially (start the next export only in the previous export's completion callback) and aggregate the final result from that chain.
This PR adds support for configuring
setMaxExportBatchSizeon thePeriodicMetricReaderthat sets the maximum number of metric data points to be sent in a single export call.If the number of metric data points (across
Collection<MetricData>) scheduled to be export exceeds themaxExportBatchSize, the data points organized in batches and sent over multiple export calls.These changes do not modify the current timeout behavior, which is still per export call.
Fix #8245