diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 4db7d2ec0cb..4cad60bad5a 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -66,7 +66,7 @@ Collection> batchMetrics(Collection metrics) return Collections.emptyList(); } Collection> preparedBatchesForExport = new ArrayList<>(); - Collection currentBatch = new ArrayList<>(maxExportBatchSize); + BatchState currentBatch = new BatchState(new ArrayList<>(maxExportBatchSize), 0); // Fill active batch and split overlapping metric points if needed for (MetricData metricData : metrics) { @@ -76,8 +76,8 @@ Collection> batchMetrics(Collection metrics) } // Push trailing capacity block - if (!currentBatch.isEmpty()) { - preparedBatchesForExport.add(currentBatch); + if (!currentBatch.metrics.isEmpty()) { + preparedBatchesForExport.add(currentBatch.metrics); } return Collections.unmodifiableCollection(preparedBatchesForExport); } @@ -92,16 +92,13 @@ Collection> batchMetrics(Collection metrics) * @return A result containing the prepared batches and the last in-progress batch. */ private MetricDataSplitOperationResult prepareExportBatches( - MetricData metricData, Collection currentBatch) { - int currentBatchPoints = 0; - for (MetricData m : currentBatch) { - currentBatchPoints += m.getData().getPoints().size(); - } - int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatchPoints; + MetricData metricData, BatchState currentBatch) { + int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatch.points; int totalPointsInMetricData = metricData.getData().getPoints().size(); if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { - currentBatch.add(metricData); + currentBatch.metrics.add(metricData); + currentBatch.points += totalPointsInMetricData; return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch); } else { // Remaining capacity can't hold all points, partition existing metric data object @@ -114,15 +111,16 @@ private MetricDataSplitOperationResult prepareExportBatches( Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch); if (pointsToTake > 0) { - currentBatch.add( + currentBatch.metrics.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentBatch.points += pointsToTake; currentIndex += pointsToTake; remainingCapacityInCurrentBatch -= pointsToTake; } if (remainingCapacityInCurrentBatch == 0) { - preparedBatches.add(currentBatch); - currentBatch = new ArrayList<>(maxExportBatchSize); + preparedBatches.add(currentBatch.metrics); + currentBatch = new BatchState(new ArrayList<>(maxExportBatchSize), 0); remainingCapacityInCurrentBatch = maxExportBatchSize; } } @@ -233,7 +231,7 @@ private static MetricData createMetricDataWithPoints( */ private static class MetricDataSplitOperationResult { private final Collection> preparedBatches; - private final Collection lastInProgressBatch; + private final BatchState lastInProgressBatch; /** * Creates a new MetricDataSplitOperationResult. @@ -245,8 +243,7 @@ private static class MetricDataSplitOperationResult { * than {@link #maxExportBatchSize} points. */ MetricDataSplitOperationResult( - Collection> preparedBatches, - Collection lastInProgressBatch) { + Collection> preparedBatches, BatchState lastInProgressBatch) { this.preparedBatches = preparedBatches; this.lastInProgressBatch = lastInProgressBatch; } @@ -255,8 +252,29 @@ Collection> getPreparedBatches() { return preparedBatches; } - Collection getLastInProgressBatch() { + BatchState getLastInProgressBatch() { return lastInProgressBatch; } } + + /** + * Tracks the active batch while batching stays linear: {@code metrics} is the current export + * payload being assembled and {@code points} is its running point count, so callers do not need + * to rescan the batch on every append. + */ + private static final class BatchState { + private final Collection metrics; + private int points; + + /** + * Creates the mutable state for the current in-progress batch. + * + * @param metrics metric entries collected into the current export batch + * @param points running total of data points across {@code metrics} + */ + private BatchState(Collection metrics, int points) { + this.metrics = metrics; + this.points = points; + } + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index d7bced887bd..8313cec51c1 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -547,7 +547,8 @@ void periodicExport_SequentialBatches_PartialFailure() throws Exception { batch3Result.succeed(); - // Flush result should still be success + // Failed export results are logged, but forceFlush preserves the prior partial-success + // behavior. assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); boolean logFound =