From af2be1e230d8e59e6c32b733a89a82eba730234d Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Mon, 13 Apr 2026 20:35:54 +0000 Subject: [PATCH 01/20] WIP: Add metrics batcher in the SDK --- .../metrics/export/MetricExportBatcher.java | 219 ++++++++++++++++++ .../metrics/export/PeriodicMetricReader.java | 9 +- .../export/PeriodicMetricReaderBuilder.java | 10 +- 3 files changed, 236 insertions(+), 2 deletions(-) create mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java new file mode 100644 index 00000000000..9c8731ef03a --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -0,0 +1,219 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.export; + +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +class MetricExportBatcher { + private final int maxExportBatchSize; + + MetricExportBatcher(int maxExportBatchSize) { + this.maxExportBatchSize = maxExportBatchSize; + } + + /** + * Batches the given metric data into multiple batches based on the maximum export batch size. + * + * @param metrics The collection of metric data objects to batch based on the number of data + * points they contain. + * @return A collection of batches of metric data. + */ + Collection> batchMetrics(Collection metrics) { + if (metrics.isEmpty()) { + return Collections.emptyList(); + } + + Collection> batches = new ArrayList<>(); + int currentBatchRemainingCapacity = maxExportBatchSize; + + for (MetricData metricData : metrics) { + MetricDataSplitOperationResult splitResult = + splitMetricData(metricData, currentBatchRemainingCapacity); + batches.add(splitResult.getBatchedMetricData()); + currentBatchRemainingCapacity = splitResult.getLastBatchRemainingCapacity(); + } + + return Collections.unmodifiableCollection(batches); + } + + /** + * Splits a MetricData object into multiple MetricData objects if the number of points exceeds the + * remaining capacity in the current batch. This function tries to fill the current batch with as + * many points as possible from the given metric data. + * + *

If the number of points in the metric data is less than or equal to the remaining capacity + * in the current batch, it will return a single MetricData object with all the points. + * + *

If the number of points in the metric data is greater than the remaining capacity in the + * current batch, it will return multiple MetricData objects, each with a subset of the points + * from the original metric data. + * + * @param metricData The MetricData object to split. + * @param remainingCapacityInCurrentBatch The remaining capacity in the current batch being used. + * @return A MetricDataSplitOperationResult containing the batched metric data and the remaining + * capacity in the last batch. + */ + private MetricDataSplitOperationResult splitMetricData( + MetricData metricData, int remainingCapacityInCurrentBatch) { + int totalPointsInMetricData = metricData.getData().getPoints().size(); + if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { + // We have enough capacity in the current batch to fit all points in this + // MetricData + return new MetricDataSplitOperationResult( + Collections.singleton(metricData), + remainingCapacityInCurrentBatch - totalPointsInMetricData); + } else { + // We don't have enough capacity in the current batch. Split this MetricData + // into multiple MetricData objects. + Collection splittedMetrics = new ArrayList<>(); + // List of all points in the metric data - to avoid creating a new one in each + // call to copyMetricData + List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); + + // Split the points into chunks of size maxExportBatchSize + // From the first chunk, take as many points as possible to fill current batch + int pointsToTake = remainingCapacityInCurrentBatch; + int currentIndex = 0; + + if (pointsToTake > 0) { + splittedMetrics.add( + copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentIndex = pointsToTake; + remainingCapacityInCurrentBatch -= pointsToTake; // should be 0 + } + + int remainingPoints = totalPointsInMetricData - currentIndex; + // Add remaining points in chunks of size maxExportBatchSize + while (currentIndex < totalPointsInMetricData) { + pointsToTake = Math.min(remainingPoints, maxExportBatchSize); + splittedMetrics.add( + copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentIndex += pointsToTake; + remainingPoints -= pointsToTake; + } + + int lastBatchRemainingCapacity = maxExportBatchSize - pointsToTake; + return new MetricDataSplitOperationResult(splittedMetrics, lastBatchRemainingCapacity); + } + } + + private static MetricData copyMetricData( + MetricData original, + List originalPointsList, + int dataPointsOffset, + int dataPointsToTake) { + List points = + originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake); + return createMetricDataWithPoints(original, points); + } + + /** + * Creates a new MetricData with the given points. + * + * @param original The original MetricData. + * @param points The points to use for the new MetricData. + * @return A new MetricData with the given points. + */ + @SuppressWarnings("unchecked") + private static MetricData createMetricDataWithPoints( + MetricData original, Collection points) { + switch (original.getType()) { + case DOUBLE_GAUGE: + return ImmutableMetricData.createDoubleGauge( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableGaugeData.create((Collection) (Collection) points)); + case LONG_GAUGE: + return ImmutableMetricData.createLongGauge( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableGaugeData.create((Collection) (Collection) points)); + case DOUBLE_SUM: + SumData doubleSumData = original.getDoubleSumData(); + return ImmutableMetricData.createDoubleSum( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableSumData.create( + doubleSumData.isMonotonic(), + doubleSumData.getAggregationTemporality(), + (Collection) (Collection) points)); + case LONG_SUM: + SumData longSumData = original.getLongSumData(); + return ImmutableMetricData.createLongSum( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableSumData.create( + longSumData.isMonotonic(), + longSumData.getAggregationTemporality(), + (Collection) (Collection) points)); + case HISTOGRAM: + HistogramData histogramData = original.getHistogramData(); + return ImmutableMetricData.createDoubleHistogram( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableHistogramData.create( + histogramData.getAggregationTemporality(), + (Collection) (Collection) points)); + default: + throw new UnsupportedOperationException("Unsupported metric type: " + original.getType()); + } + } + + /** A result of a metric data split operation. */ + private static class MetricDataSplitOperationResult { + private final Collection batchedMetricData; + private final int lastBatchRemainingCapacity; + + /** + * Creates a new MetricDataSplitOperationResult. + * + * @param batchedMetricData The collection of batched metric data. + * @param lastBatchRemainingCapacity The remaining capacity in the last batch. + */ + MetricDataSplitOperationResult( + Collection batchedMetricData, int lastBatchRemainingCapacity) { + this.batchedMetricData = batchedMetricData; + this.lastBatchRemainingCapacity = lastBatchRemainingCapacity; + } + + Collection getBatchedMetricData() { + return batchedMetricData; + } + + int getLastBatchRemainingCapacity() { + return lastBatchRemainingCapacity; + } + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index c8e33fde1e0..d8850b25e4e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -51,6 +51,7 @@ public final class PeriodicMetricReader implements MetricReader { private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop(); @Nullable private volatile ScheduledFuture scheduledFuture; + @Nullable private final MetricExportBatcher metricsBatcher; /** * Returns a new {@link PeriodicMetricReader} which exports to the {@code exporter} once every @@ -66,10 +67,14 @@ public static PeriodicMetricReaderBuilder builder(MetricExporter exporter) { } PeriodicMetricReader( - MetricExporter exporter, long intervalNanos, ScheduledExecutorService scheduler) { + MetricExporter exporter, + long intervalNanos, + ScheduledExecutorService scheduler, + @Nullable MetricExportBatcher metricsBatcher) { this.exporter = exporter; this.intervalNanos = intervalNanos; this.scheduler = scheduler; + this.metricsBatcher = metricsBatcher; this.scheduled = new Scheduled(); } @@ -163,6 +168,8 @@ public String toString() { + exporter + ", intervalNanos=" + intervalNanos + + ", metricsBatcher=" + + metricsBatcher + '}'; } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java index 04cdd27506d..df4d387caef 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java @@ -30,6 +30,8 @@ public final class PeriodicMetricReaderBuilder { @Nullable private ScheduledExecutorService executor; + @Nullable private MetricExportBatcher metricsBatcher; + PeriodicMetricReaderBuilder(MetricExporter metricExporter) { this.metricExporter = metricExporter; } @@ -59,6 +61,12 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor return this; } + public PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) { + checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive"); + this.metricsBatcher = new MetricExportBatcher(maxExportBatchSize); + return this; + } + /** Build a {@link PeriodicMetricReader} with the configuration of this builder. */ public PeriodicMetricReader build() { ScheduledExecutorService executor = this.executor; @@ -66,6 +74,6 @@ public PeriodicMetricReader build() { executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("PeriodicMetricReader")); } - return new PeriodicMetricReader(metricExporter, intervalNanos, executor); + return new PeriodicMetricReader(metricExporter, intervalNanos, executor, metricsBatcher); } } From 21e56ea0635637f48b8370975175f8a78367503a Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Tue, 14 Apr 2026 15:15:38 +0000 Subject: [PATCH 02/20] Allow exporting metricData batches --- .../sdk/metrics/export/PeriodicMetricReader.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index d8850b25e4e..14d9a236502 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -16,7 +16,9 @@ import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -224,7 +226,18 @@ CompletableResultCode doRun() { exportAvailable.set(true); flushResult.succeed(); } else { - CompletableResultCode result = exporter.export(metricData); + Collection> batches = null; + CompletableResultCode result; + if (metricsBatcher != null) { + batches = metricsBatcher.batchMetrics(metricData); + List results = new ArrayList<>(batches.size()); + for (Collection batch : batches) { + results.add(exporter.export(batch)); + } + result = CompletableResultCode.ofAll(results); + } else { + result = exporter.export(metricData); + } result.whenComplete( () -> { if (!result.isSuccess()) { From 2be7726672e8bb816feebeaa2f9389c060b7b80e Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Tue, 14 Apr 2026 15:32:32 +0000 Subject: [PATCH 03/20] Make existing tests compatible with changes --- .../sdk/metrics/export/MetricExportBatcher.java | 5 +++++ .../sdk/metrics/export/PeriodicMetricReaderTest.java | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 9c8731ef03a..288d0b3c3a3 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -28,6 +28,11 @@ class MetricExportBatcher { this.maxExportBatchSize = maxExportBatchSize; } + @Override + public String toString() { + return "MetricExportBatcher{maxExportBatchSize=" + maxExportBatchSize + "}"; + } + /** * Batches the given metric data into multiple batches based on the maximum export batch size. * diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index 1e74ffcaa9e..414ecc28f67 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -341,12 +341,14 @@ void stringRepresentation() { assertThat( PeriodicMetricReader.builder(metricExporter) .setInterval(Duration.ofSeconds(1)) + .setMaxExportBatchSize(200) .build() .toString()) .isEqualTo( "PeriodicMetricReader{" + "exporter=MockMetricExporter{}, " - + "intervalNanos=1000000000" + + "intervalNanos=1000000000, " + + "metricsBatcher=MetricExportBatcher{maxExportBatchSize=200}" + "}"); } From f4ff90d359db1c14ea95a2b7cf9069fcfbfa07f0 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Tue, 14 Apr 2026 19:40:25 +0000 Subject: [PATCH 04/20] Add docs and update existing tests --- .../sdk/metrics/export/MetricExportBatcher.java | 16 ++++++++++++++++ .../metrics/export/PeriodicMetricReaderTest.java | 13 +++++++++++++ 2 files changed, 29 insertions(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 288d0b3c3a3..00b0ba6da24 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.metrics.export; +import io.opentelemetry.sdk.metrics.data.Data; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.HistogramData; import io.opentelemetry.sdk.metrics.data.HistogramPointData; @@ -21,10 +22,25 @@ import java.util.Collections; import java.util.List; +/** + * Batches metric data into multiple batches based on the maximum export batch size. This is used by + * the {@link PeriodicMetricReader} to batch metric data before exporting it. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ class MetricExportBatcher { private final int maxExportBatchSize; + /** + * Creates a new {@link MetricExportBatcher} with the given maximum export batch size. + * + * @param maxExportBatchSize The maximum number of {@link Data#getPoints()} in each export. + */ MetricExportBatcher(int maxExportBatchSize) { + if (maxExportBatchSize <= 0) { + throw new IllegalArgumentException("maxExportBatchSize must be positive"); + } this.maxExportBatchSize = maxExportBatchSize; } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index 414ecc28f67..25227859d36 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -97,6 +97,19 @@ void startOnlyOnce() { verify(scheduler, times(1)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any()); } + @Test + void build_withIllegalMaxExportSize() { + assertThatThrownBy( + () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(0).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + + assertThatThrownBy( + () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(-1).build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + } + @Test void periodicExport() throws Exception { WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); From b243d64e892aa0e1775cc33fb782d362c5e927b1 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 00:31:33 +0000 Subject: [PATCH 05/20] Fix MetricExportBatcher logic and add tests --- .../metrics/export/MetricExportBatcher.java | 131 ++++--- .../export/MetricExportBatcherTest.java | 367 ++++++++++++++++++ 2 files changed, 445 insertions(+), 53 deletions(-) create mode 100644 sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 00b0ba6da24..c71565bd055 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -61,77 +61,91 @@ Collection> batchMetrics(Collection metrics) return Collections.emptyList(); } - Collection> batches = new ArrayList<>(); - int currentBatchRemainingCapacity = maxExportBatchSize; + Collection> preparedBatchesForExport = new ArrayList<>(); + Collection currentBatch = new ArrayList<>(maxExportBatchSize); + // Iterate through each MetricData and fill up the current batch, splitting if + // necessary for (MetricData metricData : metrics) { - MetricDataSplitOperationResult splitResult = - splitMetricData(metricData, currentBatchRemainingCapacity); - batches.add(splitResult.getBatchedMetricData()); - currentBatchRemainingCapacity = splitResult.getLastBatchRemainingCapacity(); + MetricDataSplitOperationResult splitResult = prepareExportBatches(metricData, currentBatch); + preparedBatchesForExport.addAll(splitResult.getPreparedBatches()); + currentBatch = splitResult.getLastInProgressBatch(); } - return Collections.unmodifiableCollection(batches); + // Add the last in-progress batch if it is not empty + if (!currentBatch.isEmpty()) { + preparedBatchesForExport.add(currentBatch); + } + + return Collections.unmodifiableCollection(preparedBatchesForExport); } /** - * Splits a MetricData object into multiple MetricData objects if the number of points exceeds the - * remaining capacity in the current batch. This function tries to fill the current batch with as - * many points as possible from the given metric data. - * - *

If the number of points in the metric data is less than or equal to the remaining capacity - * in the current batch, it will return a single MetricData object with all the points. + * Prepares export batches from a single metric data object. This function only + * operates on a + * single metric data object, fills up the current batch with as many points as + * possible from the + * metric data object, and then creates new metric data objects for the + * remaining points. * - *

If the number of points in the metric data is greater than the remaining capacity in the - * current batch, it will return multiple MetricData objects, each with a subset of the points - * from the original metric data. - * - * @param metricData The MetricData object to split. - * @param remainingCapacityInCurrentBatch The remaining capacity in the current batch being used. - * @return A MetricDataSplitOperationResult containing the batched metric data and the remaining - * capacity in the last batch. + * @param metricData The metric data object to split. + * @param currentBatch The current batch of metric data objects. + * @return A result containing the prepared batches and the last in-progress + * batch. */ - private MetricDataSplitOperationResult splitMetricData( - MetricData metricData, int remainingCapacityInCurrentBatch) { + private MetricDataSplitOperationResult prepareExportBatches( + MetricData metricData, Collection currentBatch) { + int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatch.size(); int totalPointsInMetricData = metricData.getData().getPoints().size(); + if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { - // We have enough capacity in the current batch to fit all points in this - // MetricData - return new MetricDataSplitOperationResult( - Collections.singleton(metricData), - remainingCapacityInCurrentBatch - totalPointsInMetricData); + currentBatch.add(metricData); + return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch); } else { - // We don't have enough capacity in the current batch. Split this MetricData - // into multiple MetricData objects. - Collection splittedMetrics = new ArrayList<>(); - // List of all points in the metric data - to avoid creating a new one in each - // call to copyMetricData + // remaining capacity in current batch cannot hold all points from metric data + // split the metric data into multiple metric data objects List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); + Collection> preparedBatches = new ArrayList<>(); // Split the points into chunks of size maxExportBatchSize // From the first chunk, take as many points as possible to fill current batch int pointsToTake = remainingCapacityInCurrentBatch; int currentIndex = 0; + // fill the current batch and add it to prepared batches if (pointsToTake > 0) { - splittedMetrics.add( + currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); currentIndex = pointsToTake; - remainingCapacityInCurrentBatch -= pointsToTake; // should be 0 + preparedBatches.add(currentBatch); } + // If the current metric contains more data points than could fit into the + // filled batch above, + // we initialize a fresh batch to receive the spillover points on subsequent + // iterations. int remainingPoints = totalPointsInMetricData - currentIndex; + currentBatch = new ArrayList<>(maxExportBatchSize); + remainingCapacityInCurrentBatch = maxExportBatchSize; + // Add remaining points in chunks of size maxExportBatchSize - while (currentIndex < totalPointsInMetricData) { - pointsToTake = Math.min(remainingPoints, maxExportBatchSize); - splittedMetrics.add( + while (currentIndex < totalPointsInMetricData && remainingPoints > 0) { + // There are still more points in the current metricData + // Take as many points as possible to fill current batch up till remaining + // capacity + pointsToTake = Math.min(remainingPoints, remainingCapacityInCurrentBatch); + currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); currentIndex += pointsToTake; remainingPoints -= pointsToTake; + remainingCapacityInCurrentBatch -= pointsToTake; + if (remainingCapacityInCurrentBatch == 0) { + preparedBatches.add(currentBatch); + currentBatch = new ArrayList<>(maxExportBatchSize); + remainingCapacityInCurrentBatch = maxExportBatchSize; + } } - - int lastBatchRemainingCapacity = maxExportBatchSize - pointsToTake; - return new MetricDataSplitOperationResult(splittedMetrics, lastBatchRemainingCapacity); + return new MetricDataSplitOperationResult(preparedBatches, currentBatch); } } @@ -212,29 +226,40 @@ private static MetricData createMetricDataWithPoints( } } - /** A result of a metric data split operation. */ + /** + * A data class to store the result of a split operation performed on a single + * {@link MetricData} + * object. + */ private static class MetricDataSplitOperationResult { - private final Collection batchedMetricData; - private final int lastBatchRemainingCapacity; + private final Collection> preparedBatches; + private final Collection lastInProgressBatch; /** * Creates a new MetricDataSplitOperationResult. * - * @param batchedMetricData The collection of batched metric data. - * @param lastBatchRemainingCapacity The remaining capacity in the last batch. + * @param preparedBatches The collection of prepared batches of metric data + * for export. Each + * batch of {@link MetricData} objects is guaranteed + * to have at most {@link + * #maxExportBatchSize} points. + * @param lastInProgressBatch The last batch that is still in progress. This + * batch may have less + * than {@link #maxExportBatchSize} points. */ MetricDataSplitOperationResult( - Collection batchedMetricData, int lastBatchRemainingCapacity) { - this.batchedMetricData = batchedMetricData; - this.lastBatchRemainingCapacity = lastBatchRemainingCapacity; + Collection> preparedBatches, + Collection lastInProgressBatch) { + this.preparedBatches = preparedBatches; + this.lastInProgressBatch = lastInProgressBatch; } - Collection getBatchedMetricData() { - return batchedMetricData; + Collection> getPreparedBatches() { + return preparedBatches; } - int getLastBatchRemainingCapacity() { - return lastBatchRemainingCapacity; + Collection getLastInProgressBatch() { + return lastInProgressBatch; } } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java new file mode 100644 index 00000000000..b4f6a2abf0b --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -0,0 +1,367 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.export; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.resources.Resource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.junit.jupiter.api.Test; + +class MetricExportBatcherTest { + + @Test + void constructor_InvalidMaxExportBatchSize() { + assertThatThrownBy(() -> new MetricExportBatcher(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + assertThatThrownBy(() -> new MetricExportBatcher(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive"); + } + + @Test + void toString_Valid() { + MetricExportBatcher batcher = new MetricExportBatcher(10); + assertThat(batcher.toString()).isEqualTo("MetricExportBatcher{maxExportBatchSize=10}"); + } + + @Test + void batchMetrics_EmptyMetrics() { + MetricExportBatcher batcher = new MetricExportBatcher(10); + assertThat(batcher.batchMetrics(Collections.emptyList())).isEmpty(); + } + + @Test + void batchMetrics_MetricFitsIntact() { + MetricExportBatcher batcher = new MetricExportBatcher(10); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Collections.singletonList(p1))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(metric); + } + + @Test + @SuppressWarnings("all") + void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); + DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0); + DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0); + DoublePointData p4 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 4.0); + DoublePointData p5 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 5.0); + + MetricData metric = + ImmutableMetricData.createDoubleGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + List> batchesList = new ArrayList<>(batches); + + assertThat(batchesList.size()).isEqualTo(3); + Collection firstBatch = batchesList.get(0); + Collection secondBatch = batchesList.get(1); + Collection thirdBatch = batchesList.get(2); + + assertThat(firstBatch.size()).isEqualTo(1); + assertThat(secondBatch.size()).isEqualTo(1); + assertThat(thirdBatch.size()).isEqualTo(1); + + MetricData firsBatch_m1 = firstBatch.iterator().next(); + assertThat(firsBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(firsBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData secondBatch_m1 = secondBatch.iterator().next(); + assertThat(secondBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(secondBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); + + // Last batch is partially filled. + MetricData thirdBatch_m1 = thirdBatch.iterator().next(); + assertThat(thirdBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(thirdBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p5); + } + + @Test + void batchMetrics_SplitsLongGauge_SingleBatchPartiallyFilled() { + MetricExportBatcher batcher = new MetricExportBatcher(4); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(1); + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(1); // There is only 1 MetricData + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3); + } + + @Test + void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); + DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0); + + MetricData metric = + ImmutableMetricData.createDoubleSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableSumData.create( + /* isMonotonic= */ true, AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(1); // There is only 1 MetricData + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1, p2); + assertThat(m1.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m1.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + + MetricData metric_1 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc_1", + "1", + ImmutableSumData.create( + /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); + + MetricData metric_2 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc_2", + "1", + ImmutableSumData.create( + /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Arrays.asList(metric_1, metric_2)); + + assertThat(batches).hasSize(4); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + Collection thirdBatch = batches.stream().skip(2).findFirst().get(); + Collection fourthBatch = batches.stream().skip(3).findFirst().get(); + + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + assertThat(thirdBatch).hasSize(1); + assertThat(fourthBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m1.getName()).isEqualTo("name_1"); + assertThat(m1.getDescription()).isEqualTo("desc_1"); + assertThat(m1.getUnit()).isEqualTo("1"); + assertThat(m1.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m1.getLongSumData().isMonotonic()).isFalse(); + assertThat(m1.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m2.getName()).isEqualTo("name_1"); + assertThat(m2.getDescription()).isEqualTo("desc_1"); + assertThat(m2.getUnit()).isEqualTo("1"); + assertThat(m2.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m2.getLongSumData().isMonotonic()).isFalse(); + assertThat(m2.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + + MetricData m3 = thirdBatch.iterator().next(); + assertThat(m3.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m3.getName()).isEqualTo("name_2"); + assertThat(m3.getDescription()).isEqualTo("desc_2"); + assertThat(m3.getUnit()).isEqualTo("1"); + assertThat(m3.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m3.getLongSumData().isMonotonic()).isFalse(); + assertThat(m3.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + + MetricData m4 = fourthBatch.iterator().next(); + assertThat(m4.getType()).isEqualTo(MetricDataType.LONG_SUM); + assertThat(m4.getName()).isEqualTo("name_2"); + assertThat(m4.getDescription()).isEqualTo("desc_2"); + assertThat(m4.getUnit()).isEqualTo("1"); + assertThat(m4.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m4.getLongSumData().isMonotonic()).isFalse(); + assertThat(m4.getLongSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.DELTA); + } + + @Test + void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + ImmutableHistogramPointData p1 = + ImmutableHistogramPointData.create( + 1, + 2, + Attributes.empty(), + 1.0, + /* hasMin= */ false, + 0.0, + /* hasMax= */ false, + 0.0, + Collections.emptyList(), + Collections.singletonList(1L)); + ImmutableHistogramPointData p2 = + ImmutableHistogramPointData.create( + 1, + 2, + Attributes.empty(), + 2.0, + /* hasMin= */ false, + 0.0, + /* hasMax= */ false, + 0.0, + Collections.emptyList(), + Collections.singletonList(2L)); + + MetricData metric = + ImmutableMetricData.createDoubleHistogram( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableHistogramData.create( + AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(2); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m1.getHistogramData().getPoints()).containsExactly(p1); + assertThat(m1.getHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m2.getHistogramData().getPoints()).containsExactly(p2); + assertThat(m2.getHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void batchMetrics_EmptyPointsInMetricData() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Collections.emptyList())); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(metric); + } + + @Test + void batchMetrics_MultipleMetricsExactCapacityMatch() { + MetricExportBatcher batcher = new MetricExportBatcher(4); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + + MetricData m1 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2))); + MetricData m2 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p3, p4))); + + Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(m1, m2); + } +} From 7e9fe9ea2989435236eac03aab3686323ffcbe72 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 01:15:21 +0000 Subject: [PATCH 06/20] Add support for EXPONENTIAL_HISTOGRAM and SUMMARY data types --- .../metrics/export/MetricExportBatcher.java | 57 +++--- .../export/MetricExportBatcherTest.java | 163 ++++++++++++++++-- 2 files changed, 181 insertions(+), 39 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index c71565bd055..28896181136 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -7,16 +7,21 @@ import io.opentelemetry.sdk.metrics.data.Data; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.data.HistogramData; import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -81,17 +86,13 @@ Collection> batchMetrics(Collection metrics) } /** - * Prepares export batches from a single metric data object. This function only - * operates on a - * single metric data object, fills up the current batch with as many points as - * possible from the - * metric data object, and then creates new metric data objects for the - * remaining points. + * Prepares export batches from a single metric data object. This function only operates on a + * single metric data object, fills up the current batch with as many points as possible from the + * metric data object, and then creates new metric data objects for the remaining points. * - * @param metricData The metric data object to split. + * @param metricData The metric data object to split. * @param currentBatch The current batch of metric data objects. - * @return A result containing the prepared batches and the last in-progress - * batch. + * @return A result containing the prepared batches and the last in-progress batch. */ private MetricDataSplitOperationResult prepareExportBatches( MetricData metricData, Collection currentBatch) { @@ -221,14 +222,31 @@ private static MetricData createMetricDataWithPoints( ImmutableHistogramData.create( histogramData.getAggregationTemporality(), (Collection) (Collection) points)); - default: - throw new UnsupportedOperationException("Unsupported metric type: " + original.getType()); + case EXPONENTIAL_HISTOGRAM: + ExponentialHistogramData expHistogramData = original.getExponentialHistogramData(); + return ImmutableMetricData.createExponentialHistogram( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableExponentialHistogramData.create( + expHistogramData.getAggregationTemporality(), + (Collection) (Collection) points)); + case SUMMARY: + return ImmutableMetricData.createDoubleSummary( + original.getResource(), + original.getInstrumentationScopeInfo(), + original.getName(), + original.getDescription(), + original.getUnit(), + ImmutableSummaryData.create((Collection) (Collection) points)); } + throw new UnsupportedOperationException("Unsupported metric type: " + original.getType()); } /** - * A data class to store the result of a split operation performed on a single - * {@link MetricData} + * A data class to store the result of a split operation performed on a single {@link MetricData} * object. */ private static class MetricDataSplitOperationResult { @@ -238,14 +256,11 @@ private static class MetricDataSplitOperationResult { /** * Creates a new MetricDataSplitOperationResult. * - * @param preparedBatches The collection of prepared batches of metric data - * for export. Each - * batch of {@link MetricData} objects is guaranteed - * to have at most {@link - * #maxExportBatchSize} points. - * @param lastInProgressBatch The last batch that is still in progress. This - * batch may have less - * than {@link #maxExportBatchSize} points. + * @param preparedBatches The collection of prepared batches of metric data for export. Each + * batch of {@link MetricData} objects is guaranteed to have at most {@link + * #maxExportBatchSize} points. + * @param lastInProgressBatch The last batch that is still in progress. This batch may have less + * than {@link #maxExportBatchSize} points. */ MetricDataSplitOperationResult( Collection> preparedBatches, diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index b4f6a2abf0b..bf80bf56809 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -12,16 +12,25 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.Arrays; @@ -317,24 +326,6 @@ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() .isEqualTo(AggregationTemporality.CUMULATIVE); } - @Test - void batchMetrics_EmptyPointsInMetricData() { - MetricExportBatcher batcher = new MetricExportBatcher(2); - MetricData metric = - ImmutableMetricData.createLongGauge( - Resource.empty(), - InstrumentationScopeInfo.empty(), - "name", - "desc", - "1", - ImmutableGaugeData.create(Collections.emptyList())); - - Collection> batches = - batcher.batchMetrics(Collections.singletonList(metric)); - assertThat(batches).hasSize(1); - assertThat(batches.iterator().next()).containsExactly(metric); - } - @Test void batchMetrics_MultipleMetricsExactCapacityMatch() { MetricExportBatcher batcher = new MetricExportBatcher(4); @@ -364,4 +355,140 @@ void batchMetrics_MultipleMetricsExactCapacityMatch() { assertThat(batches).hasSize(1); assertThat(batches.iterator().next()).containsExactly(m1, m2); } + + @Test + void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + ExponentialHistogramBuckets buckets = + ImmutableExponentialHistogramBuckets.create( + /* scale= */ 20, /* offset= */ 0, /* bucketCounts= */ Collections.singletonList(1L)); + ExponentialHistogramPointData p1 = + ImmutableExponentialHistogramPointData.create( + /* scale= */ 20, + /* sum= */ 1.0, + /* zeroCount= */ 0, + /* hasMin= */ false, + /* min= */ 0.0, + /* hasMax= */ false, + /* max= */ 0.0, + /* positiveBuckets= */ buckets, + /* negativeBuckets= */ buckets, + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* exemplars= */ Collections.emptyList()); + ExponentialHistogramPointData p2 = + ImmutableExponentialHistogramPointData.create( + /* scale= */ 20, + /* sum= */ 2.0, + /* zeroCount= */ 0, + /* hasMin= */ false, + /* min= */ 0.0, + /* hasMax= */ false, + /* max= */ 0.0, + /* positiveBuckets= */ buckets, + /* negativeBuckets= */ buckets, + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* exemplars= */ Collections.emptyList()); + + MetricData metric = + ImmutableMetricData.createExponentialHistogram( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableExponentialHistogramData.create( + AggregationTemporality.CUMULATIVE, Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(2); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m1.getExponentialHistogramData().getPoints()).containsExactly(p1); + assertThat(m1.getExponentialHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m2.getExponentialHistogramData().getPoints()).containsExactly(p2); + assertThat(m2.getExponentialHistogramData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + + @Test + void batchMetrics_SplitsSummary_MultipleBatchesCompletelyFilled_SingleMetric() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + SummaryPointData p1 = + ImmutableSummaryPointData.create( + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* count= */ 1, + /* sum= */ 1.0, + /* percentileValues= */ Collections.singletonList( + ImmutableValueAtQuantile.create(0.5, 1.0))); + SummaryPointData p2 = + ImmutableSummaryPointData.create( + /* startEpochNanos= */ 1, + /* epochNanos= */ 2, + /* attributes= */ Attributes.empty(), + /* count= */ 1, + /* sum= */ 2.0, + /* percentileValues= */ Collections.singletonList( + ImmutableValueAtQuantile.create(0.5, 2.0))); + + MetricData metric = + ImmutableMetricData.createDoubleSummary( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableSummaryData.create(Arrays.asList(p1, p2))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + + assertThat(batches).hasSize(2); + Collection firstBatch = batches.iterator().next(); + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m1.getSummaryData().getPoints()).containsExactly(p1); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m2.getSummaryData().getPoints()).containsExactly(p2); + } + + @Test + void batchMetrics_EmptyPointsInMetricData() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Collections.emptyList())); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + assertThat(batches).hasSize(1); + assertThat(batches.iterator().next()).containsExactly(metric); + } } From 3106e918732aa512585c5d4da7ec1d8aac65a10b Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 01:28:13 +0000 Subject: [PATCH 07/20] Fix metrics checkstyle issue --- .../sdk/metrics/export/MetricExportBatcherTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index bf80bf56809..d957af29f96 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -192,7 +192,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); - MetricData metric_1 = + MetricData metric1 = ImmutableMetricData.createLongSum( Resource.empty(), InstrumentationScopeInfo.empty(), @@ -202,7 +202,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( ImmutableSumData.create( /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); - MetricData metric_2 = + MetricData metric2 = ImmutableMetricData.createLongSum( Resource.empty(), InstrumentationScopeInfo.empty(), @@ -213,7 +213,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( /* isMonotonic= */ false, AggregationTemporality.DELTA, Arrays.asList(p1, p2))); Collection> batches = - batcher.batchMetrics(Arrays.asList(metric_1, metric_2)); + batcher.batchMetrics(Arrays.asList(metric1, metric2)); assertThat(batches).hasSize(4); Collection firstBatch = batches.iterator().next(); From 8491ce585222e7e28c0e0eb0b3c946a609ae5727 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 13:57:44 +0000 Subject: [PATCH 08/20] Add missing generated diff files --- docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt index a874b822c9f..b2ecdd07ca8 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt @@ -1,2 +1,4 @@ Comparing source compatibility of opentelemetry-sdk-metrics-1.62.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.61.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.metrics.export.PeriodicMetricReaderBuilder setMaxExportBatchSize(int) From 6027e52229cb5d2805a809959adc7752e027fc0a Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 15:43:02 +0000 Subject: [PATCH 09/20] Update unit tests for enhanced coverage --- .../export/MetricExportBatcherTest.java | 182 +++++++++++++++++- 1 file changed, 175 insertions(+), 7 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index d957af29f96..949d9743b32 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -116,15 +116,24 @@ void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { MetricData firsBatch_m1 = firstBatch.iterator().next(); assertThat(firsBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(firsBatch_m1.getName()).isEqualTo("name"); + assertThat(firsBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(firsBatch_m1.getUnit()).isEqualTo("1"); assertThat(firsBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); MetricData secondBatch_m1 = secondBatch.iterator().next(); assertThat(secondBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(secondBatch_m1.getName()).isEqualTo("name"); + assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); assertThat(secondBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); // Last batch is partially filled. MetricData thirdBatch_m1 = thirdBatch.iterator().next(); assertThat(thirdBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(thirdBatch_m1.getName()).isEqualTo("name"); + assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); assertThat(thirdBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p5); } @@ -153,6 +162,9 @@ void batchMetrics_SplitsLongGauge_SingleBatchPartiallyFilled() { MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3); } @@ -180,6 +192,9 @@ void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() { MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1, p2); assertThat(m1.getDoubleSumData().isMonotonic()).isTrue(); assertThat(m1.getDoubleSumData().getAggregationTemporality()) @@ -189,8 +204,10 @@ void batchMetrics_SplitsDoubleSum_SingleBatchCompletelyFilled() { @Test void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics() { MetricExportBatcher batcher = new MetricExportBatcher(1); - LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); - LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + Attributes attrs1 = Attributes.builder().put("key1", "val1").build(); + Attributes attrs2 = Attributes.builder().put("key2", "val2").build(); + LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L); MetricData metric1 = ImmutableMetricData.createLongSum( @@ -232,6 +249,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m1.getDescription()).isEqualTo("desc_1"); assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m1.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1); assertThat(m1.getLongSumData().isMonotonic()).isFalse(); assertThat(m1.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -242,6 +260,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m2.getDescription()).isEqualTo("desc_1"); assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m2.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2); assertThat(m2.getLongSumData().isMonotonic()).isFalse(); assertThat(m2.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -252,6 +271,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m3.getDescription()).isEqualTo("desc_2"); assertThat(m3.getUnit()).isEqualTo("1"); assertThat(m3.getLongSumData().getPoints()).containsExactly(p1); + assertThat(m3.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs1); assertThat(m3.getLongSumData().isMonotonic()).isFalse(); assertThat(m3.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -262,6 +282,7 @@ void batchMetrics_SplitsLongSum_MultipleBatchesCompletelyFilled_MultipleMetrics( assertThat(m4.getDescription()).isEqualTo("desc_2"); assertThat(m4.getUnit()).isEqualTo("1"); assertThat(m4.getLongSumData().getPoints()).containsExactly(p2); + assertThat(m4.getLongSumData().getPoints().iterator().next().getAttributes()).isEqualTo(attrs2); assertThat(m4.getLongSumData().isMonotonic()).isFalse(); assertThat(m4.getLongSumData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.DELTA); @@ -316,11 +337,17 @@ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getHistogramData().getPoints()).containsExactly(p1); assertThat(m1.getHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); MetricData m2 = secondBatch.iterator().next(); assertThat(m2.getType()).isEqualTo(MetricDataType.HISTOGRAM); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getHistogramData().getPoints()).containsExactly(p2); assertThat(m2.getHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); @@ -329,10 +356,14 @@ void batchMetrics_SplitsHistogram_MultipleBatchesCompletelyFilled_SingleMetric() @Test void batchMetrics_MultipleMetricsExactCapacityMatch() { MetricExportBatcher batcher = new MetricExportBatcher(4); - LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); - LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); - LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); - LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + Attributes attrs1 = Attributes.builder().put("k", "v1").build(); + Attributes attrs2 = Attributes.builder().put("k", "v2").build(); + Attributes attrs3 = Attributes.builder().put("k", "v3").build(); + Attributes attrs4 = Attributes.builder().put("k", "v4").build(); + LongPointData p1 = ImmutableLongPointData.create(1, 2, attrs1, 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, attrs2, 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, attrs3, 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, attrs4, 4L); MetricData m1 = ImmutableMetricData.createLongGauge( @@ -353,7 +384,16 @@ void batchMetrics_MultipleMetricsExactCapacityMatch() { Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); assertThat(batches).hasSize(1); - assertThat(batches.iterator().next()).containsExactly(m1, m2); + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).containsExactly(m1, m2); + + MetricData res1 = firstBatch.iterator().next(); + MetricData res2 = firstBatch.stream().skip(1).findFirst().get(); + + assertThat(res1.getName()).isEqualTo("name_1"); + assertThat(res1.getLongGaugeData().getPoints()).containsExactly(p1, p2); + assertThat(res2.getName()).isEqualTo("name_2"); + assertThat(res2.getLongGaugeData().getPoints()).containsExactly(p3, p4); } @Test @@ -414,12 +454,18 @@ void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_Sin MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getExponentialHistogramData().getPoints()).containsExactly(p1); assertThat(m1.getExponentialHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); MetricData m2 = secondBatch.iterator().next(); assertThat(m2.getType()).isEqualTo(MetricDataType.EXPONENTIAL_HISTOGRAM); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getExponentialHistogramData().getPoints()).containsExactly(p2); assertThat(m2.getExponentialHistogramData().getAggregationTemporality()) .isEqualTo(AggregationTemporality.CUMULATIVE); @@ -467,13 +513,135 @@ void batchMetrics_SplitsSummary_MultipleBatchesCompletelyFilled_SingleMetric() { MetricData m1 = firstBatch.iterator().next(); assertThat(m1.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); assertThat(m1.getSummaryData().getPoints()).containsExactly(p1); MetricData m2 = secondBatch.iterator().next(); assertThat(m2.getType()).isEqualTo(MetricDataType.SUMMARY); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); assertThat(m2.getSummaryData().getPoints()).containsExactly(p2); } + @Test + void batchMetrics_SplitsLongGauge_MultipleBatches() { + MetricExportBatcher batcher = new MetricExportBatcher(2); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L); + + MetricData metric = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3, p4, p5))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + List> batchesList = new ArrayList<>(batches); + + assertThat(batchesList).hasSize(3); + Collection firstBatch = batchesList.get(0); + Collection secondBatch = batchesList.get(1); + Collection thirdBatch = batchesList.get(2); + + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + assertThat(thirdBatch).hasSize(1); + + MetricData firstBatch_m1 = firstBatch.iterator().next(); + assertThat(firstBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(firstBatch_m1.getName()).isEqualTo("name"); + assertThat(firstBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(firstBatch_m1.getUnit()).isEqualTo("1"); + assertThat(firstBatch_m1.getLongGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData secondBatch_m1 = secondBatch.iterator().next(); + assertThat(secondBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(secondBatch_m1.getName()).isEqualTo("name"); + assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); + assertThat(secondBatch_m1.getLongGaugeData().getPoints()).containsExactly(p3, p4); + + MetricData thirdBatch_m1 = thirdBatch.iterator().next(); + assertThat(thirdBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(thirdBatch_m1.getName()).isEqualTo("name"); + assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); + assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); + assertThat(thirdBatch_m1.getLongGaugeData().getPoints()).containsExactly(p5); + } + + @Test + void batchMetrics_SplitsDoubleSum_MultipleBatches() { + MetricExportBatcher batcher = new MetricExportBatcher(1); + DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); + DoublePointData p2 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 2.0); + DoublePointData p3 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 3.0); + + MetricData metric = + ImmutableMetricData.createDoubleSum( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name", + "desc", + "1", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + Arrays.asList(p1, p2, p3))); + + Collection> batches = + batcher.batchMetrics(Collections.singletonList(metric)); + List> batchesList = new ArrayList<>(batches); + + assertThat(batchesList).hasSize(3); + Collection firstBatch = batchesList.get(0); + Collection secondBatch = batchesList.get(1); + Collection thirdBatch = batchesList.get(2); + + assertThat(firstBatch).hasSize(1); + assertThat(secondBatch).hasSize(1); + assertThat(thirdBatch).hasSize(1); + + MetricData m1 = firstBatch.iterator().next(); + assertThat(m1.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m1.getName()).isEqualTo("name"); + assertThat(m1.getDescription()).isEqualTo("desc"); + assertThat(m1.getUnit()).isEqualTo("1"); + assertThat(m1.getDoubleSumData().getPoints()).containsExactly(p1); + assertThat(m1.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m1.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + MetricData m2 = secondBatch.iterator().next(); + assertThat(m2.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m2.getName()).isEqualTo("name"); + assertThat(m2.getDescription()).isEqualTo("desc"); + assertThat(m2.getUnit()).isEqualTo("1"); + assertThat(m2.getDoubleSumData().getPoints()).containsExactly(p2); + assertThat(m2.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m2.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + + MetricData m3 = thirdBatch.iterator().next(); + assertThat(m3.getType()).isEqualTo(MetricDataType.DOUBLE_SUM); + assertThat(m3.getName()).isEqualTo("name"); + assertThat(m3.getDescription()).isEqualTo("desc"); + assertThat(m3.getUnit()).isEqualTo("1"); + assertThat(m3.getDoubleSumData().getPoints()).containsExactly(p3); + assertThat(m3.getDoubleSumData().isMonotonic()).isTrue(); + assertThat(m3.getDoubleSumData().getAggregationTemporality()) + .isEqualTo(AggregationTemporality.CUMULATIVE); + } + @Test void batchMetrics_EmptyPointsInMetricData() { MetricExportBatcher batcher = new MetricExportBatcher(2); From dc322fd494d4e6618fc98e191bfb0ec867997f71 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 16:38:34 +0000 Subject: [PATCH 10/20] Add unit tests for PeriodicMetricReader --- .../export/PeriodicMetricReaderTest.java | 109 +++++++++++++++++- 1 file changed, 106 insertions(+), 3 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index 25227859d36..6fe562dfd05 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -55,8 +56,13 @@ @MockitoSettings(strictness = Strictness.LENIENT) class PeriodicMetricReaderTest { private static final List LONG_POINT_LIST = - Collections.singletonList( - ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1234567)); + Arrays.asList( + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 1L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 2L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 3L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 4L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 5L), + ImmutableLongPointData.create(1000, 3000, Attributes.empty(), 6L)); private static final MetricData METRIC_DATA = ImmutableMetricData.createLongSum( @@ -98,7 +104,7 @@ void startOnlyOnce() { } @Test - void build_withIllegalMaxExportSize() { + void build_WithIllegalMaxExportSize() { assertThatThrownBy( () -> PeriodicMetricReader.builder(metricExporter).setMaxExportBatchSize(0).build()) .isInstanceOf(IllegalArgumentException.class) @@ -131,6 +137,103 @@ void periodicExport() throws Exception { } } + @Test + void periodicExport_WithMaxExportBatchSize_PartiallyFilledBatch() throws Exception { + WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); + PeriodicMetricReader reader = + PeriodicMetricReader.builder(waitingMetricExporter) + .setInterval(Duration.ofMillis(100)) + .setMaxExportBatchSize(4) + .build(); + + reader.register(collectionRegistration); + MetricData expectedMetricDataBatch1 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(0, 4))); + MetricData expectedMetricDataBatch2 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(4, 6))); + try { + assertThat(waitingMetricExporter.waitForNumberOfExports(2)) + .containsExactly( + Collections.singletonList(expectedMetricDataBatch1), + Collections.singletonList(expectedMetricDataBatch2)); + } finally { + reader.shutdown(); + } + } + + @Test + void periodicExport_WithMaxExportBatchSize_CompletelyFilledBatch() throws Exception { + WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); + PeriodicMetricReader reader = + PeriodicMetricReader.builder(waitingMetricExporter) + .setInterval(Duration.ofMillis(100)) + .setMaxExportBatchSize(2) + .build(); + + reader.register(collectionRegistration); + MetricData expectedMetricDataBatch1 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(0, 2))); + MetricData expectedMetricDataBatch2 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(2, 4))); + + MetricData expectedMetricDataBatch3 = + ImmutableMetricData.createLongSum( + Resource.empty(), + InstrumentationScopeInfo.create("PeriodicMetricReaderTest"), + "my metric", + "my metric description", + "us", + ImmutableSumData.create( + /* isMonotonic= */ true, + AggregationTemporality.CUMULATIVE, + LONG_POINT_LIST.subList(4, 6))); + try { + assertThat(waitingMetricExporter.waitForNumberOfExports(3)) + .containsExactly( + Collections.singletonList(expectedMetricDataBatch1), + Collections.singletonList(expectedMetricDataBatch2), + Collections.singletonList(expectedMetricDataBatch3)); + } finally { + reader.shutdown(); + } + } + @Test void periodicExport_NoMetricsSkipsExport() { WaitingMetricExporter waitingMetricExporter = new WaitingMetricExporter(); From 8027bdd74e5236b1cda942b6840dbea9c4363d63 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 16:42:23 +0000 Subject: [PATCH 11/20] Fix checkstyle issues --- .../export/MetricExportBatcherTest.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 949d9743b32..303d3bdb970 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -557,26 +557,26 @@ void batchMetrics_SplitsLongGauge_MultipleBatches() { assertThat(secondBatch).hasSize(1); assertThat(thirdBatch).hasSize(1); - MetricData firstBatch_m1 = firstBatch.iterator().next(); - assertThat(firstBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); - assertThat(firstBatch_m1.getName()).isEqualTo("name"); - assertThat(firstBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(firstBatch_m1.getUnit()).isEqualTo("1"); - assertThat(firstBatch_m1.getLongGaugeData().getPoints()).containsExactly(p1, p2); - - MetricData secondBatch_m1 = secondBatch.iterator().next(); - assertThat(secondBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); - assertThat(secondBatch_m1.getName()).isEqualTo("name"); - assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); - assertThat(secondBatch_m1.getLongGaugeData().getPoints()).containsExactly(p3, p4); - - MetricData thirdBatch_m1 = thirdBatch.iterator().next(); - assertThat(thirdBatch_m1.getType()).isEqualTo(MetricDataType.LONG_GAUGE); - assertThat(thirdBatch_m1.getName()).isEqualTo("name"); - assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); - assertThat(thirdBatch_m1.getLongGaugeData().getPoints()).containsExactly(p5); + MetricData firstBatchMetricData = firstBatch.iterator().next(); + assertThat(firstBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(firstBatchMetricData.getName()).isEqualTo("name"); + assertThat(firstBatchMetricData.getDescription()).isEqualTo("desc"); + assertThat(firstBatchMetricData.getUnit()).isEqualTo("1"); + assertThat(firstBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData secondBatchMetricData = secondBatch.iterator().next(); + assertThat(secondBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(secondBatchMetricData.getName()).isEqualTo("name"); + assertThat(secondBatchMetricData.getDescription()).isEqualTo("desc"); + assertThat(secondBatchMetricData.getUnit()).isEqualTo("1"); + assertThat(secondBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p3, p4); + + MetricData thirdBatchMetricData = thirdBatch.iterator().next(); + assertThat(thirdBatchMetricData.getType()).isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(thirdBatchMetricData.getName()).isEqualTo("name"); + assertThat(thirdBatchMetricData.getDescription()).isEqualTo("desc"); + assertThat(thirdBatchMetricData.getUnit()).isEqualTo("1"); + assertThat(thirdBatchMetricData.getLongGaugeData().getPoints()).containsExactly(p5); } @Test From 549c08f7ea1b167e65dee400caf0846b93aeea27 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 16:51:32 +0000 Subject: [PATCH 12/20] Clean up inline code comments --- .../metrics/export/MetricExportBatcher.java | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 28896181136..0c6c7a62986 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -65,23 +65,20 @@ Collection> batchMetrics(Collection metrics) if (metrics.isEmpty()) { return Collections.emptyList(); } - Collection> preparedBatchesForExport = new ArrayList<>(); Collection currentBatch = new ArrayList<>(maxExportBatchSize); - // Iterate through each MetricData and fill up the current batch, splitting if - // necessary + // Fill active batch and split overlapping metric points if needed for (MetricData metricData : metrics) { MetricDataSplitOperationResult splitResult = prepareExportBatches(metricData, currentBatch); preparedBatchesForExport.addAll(splitResult.getPreparedBatches()); currentBatch = splitResult.getLastInProgressBatch(); } - // Add the last in-progress batch if it is not empty + // Push trailing capacity block if (!currentBatch.isEmpty()) { preparedBatchesForExport.add(currentBatch); } - return Collections.unmodifiableCollection(preparedBatchesForExport); } @@ -103,17 +100,14 @@ private MetricDataSplitOperationResult prepareExportBatches( currentBatch.add(metricData); return new MetricDataSplitOperationResult(Collections.emptyList(), currentBatch); } else { - // remaining capacity in current batch cannot hold all points from metric data - // split the metric data into multiple metric data objects + // Remaining capacity can't hold all points, partition existing metric data object List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); Collection> preparedBatches = new ArrayList<>(); - // Split the points into chunks of size maxExportBatchSize - // From the first chunk, take as many points as possible to fill current batch + // Fill current batch buffer completely int pointsToTake = remainingCapacityInCurrentBatch; int currentIndex = 0; - // fill the current batch and add it to prepared batches if (pointsToTake > 0) { currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); @@ -121,19 +115,13 @@ private MetricDataSplitOperationResult prepareExportBatches( preparedBatches.add(currentBatch); } - // If the current metric contains more data points than could fit into the - // filled batch above, - // we initialize a fresh batch to receive the spillover points on subsequent - // iterations. + // Buffer spillover onto fresh partitions int remainingPoints = totalPointsInMetricData - currentIndex; currentBatch = new ArrayList<>(maxExportBatchSize); remainingCapacityInCurrentBatch = maxExportBatchSize; - // Add remaining points in chunks of size maxExportBatchSize + // Iterate extra chunks sized to exact transport constraints while (currentIndex < totalPointsInMetricData && remainingPoints > 0) { - // There are still more points in the current metricData - // Take as many points as possible to fill current batch up till remaining - // capacity pointsToTake = Math.min(remainingPoints, remainingCapacityInCurrentBatch); currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); From 1da7d248393aa2c4cab49e429bc3ede6985dc841 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 17:55:38 +0000 Subject: [PATCH 13/20] Fix bug for miscalculating remaining capacity of a batch --- .../metrics/export/MetricExportBatcher.java | 6 +- .../export/MetricExportBatcherTest.java | 92 +++++++++++++++---- 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 0c6c7a62986..623e17bea39 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -93,7 +93,11 @@ Collection> batchMetrics(Collection metrics) */ private MetricDataSplitOperationResult prepareExportBatches( MetricData metricData, Collection currentBatch) { - int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatch.size(); + int currentBatchPoints = 0; + for (MetricData m : currentBatch) { + currentBatchPoints += m.getData().getPoints().size(); + } + int remainingCapacityInCurrentBatch = maxExportBatchSize - currentBatchPoints; int totalPointsInMetricData = metricData.getData().getPoints().size(); if (remainingCapacityInCurrentBatch >= totalPointsInMetricData) { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 303d3bdb970..98f71ac5cea 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -114,27 +114,27 @@ void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { assertThat(secondBatch.size()).isEqualTo(1); assertThat(thirdBatch.size()).isEqualTo(1); - MetricData firsBatch_m1 = firstBatch.iterator().next(); - assertThat(firsBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); - assertThat(firsBatch_m1.getName()).isEqualTo("name"); - assertThat(firsBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(firsBatch_m1.getUnit()).isEqualTo("1"); - assertThat(firsBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); - - MetricData secondBatch_m1 = secondBatch.iterator().next(); - assertThat(secondBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); - assertThat(secondBatch_m1.getName()).isEqualTo("name"); - assertThat(secondBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(secondBatch_m1.getUnit()).isEqualTo("1"); - assertThat(secondBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); + MetricData b1m1 = firstBatch.iterator().next(); + assertThat(b1m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(b1m1.getName()).isEqualTo("name"); + assertThat(b1m1.getDescription()).isEqualTo("desc"); + assertThat(b1m1.getUnit()).isEqualTo("1"); + assertThat(b1m1.getDoubleGaugeData().getPoints()).containsExactly(p1, p2); + + MetricData b2m1 = secondBatch.iterator().next(); + assertThat(b2m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(b2m1.getName()).isEqualTo("name"); + assertThat(b2m1.getDescription()).isEqualTo("desc"); + assertThat(b2m1.getUnit()).isEqualTo("1"); + assertThat(b2m1.getDoubleGaugeData().getPoints()).containsExactly(p3, p4); // Last batch is partially filled. - MetricData thirdBatch_m1 = thirdBatch.iterator().next(); - assertThat(thirdBatch_m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); - assertThat(thirdBatch_m1.getName()).isEqualTo("name"); - assertThat(thirdBatch_m1.getDescription()).isEqualTo("desc"); - assertThat(thirdBatch_m1.getUnit()).isEqualTo("1"); - assertThat(thirdBatch_m1.getDoubleGaugeData().getPoints()).containsExactly(p5); + MetricData b3m1 = thirdBatch.iterator().next(); + assertThat(b3m1.getType()).isEqualByComparingTo(MetricDataType.DOUBLE_GAUGE); + assertThat(b3m1.getName()).isEqualTo("name"); + assertThat(b3m1.getDescription()).isEqualTo("desc"); + assertThat(b3m1.getUnit()).isEqualTo("1"); + assertThat(b3m1.getDoubleGaugeData().getPoints()).containsExactly(p5); } @Test @@ -396,6 +396,60 @@ void batchMetrics_MultipleMetricsExactCapacityMatch() { assertThat(res2.getLongGaugeData().getPoints()).containsExactly(p3, p4); } + @Test + void batchMetrics_SplitsLongGauge_MultipleMetrics_ExceedsCapacity() { + MetricExportBatcher batcher = new MetricExportBatcher(4); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L); + LongPointData p6 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 6L); + + MetricData m1 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2, p3))); + MetricData m2 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p4, p5, p6))); + + Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); + + assertThat(batches).hasSize(2); + + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(2); + MetricData b1m1 = firstBatch.iterator().next(); + MetricData b1m2 = firstBatch.stream().skip(1).findFirst().get(); + assertThat(b1m1.getName()).isEqualTo("name_1"); + assertThat(b1m1.getDescription()).isEqualTo("desc"); + assertThat(b1m1.getUnit()).isEqualTo("1"); + assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2, p3); + + assertThat(b1m2.getName()).isEqualTo("name_2"); + assertThat(b1m2.getDescription()).isEqualTo("desc"); + assertThat(b1m2.getUnit()).isEqualTo("1"); + assertThat(b1m2.getLongGaugeData().getPoints()).containsExactly(p4); + + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(secondBatch).hasSize(1); + MetricData b2m1 = secondBatch.iterator().next(); + assertThat(b2m1.getName()).isEqualTo("name_2"); + assertThat(b2m1.getDescription()).isEqualTo("desc"); + assertThat(b2m1.getUnit()).isEqualTo("1"); + assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p5, p6); + } + @Test void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { MetricExportBatcher batcher = new MetricExportBatcher(1); From 628a35fbe839718bbfd1b4e9684510eeb4779edf Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 18:10:10 +0000 Subject: [PATCH 14/20] Add missing Javadoc for public facing API --- .../sdk/metrics/export/PeriodicMetricReaderBuilder.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java index df4d387caef..23c39fdc4ee 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderBuilder.java @@ -61,6 +61,14 @@ public PeriodicMetricReaderBuilder setExecutor(ScheduledExecutorService executor return this; } + /** + * Sets the maximum number of data points to include in a single export batch. If unset, no + * batching will be performed. The maximum number of data points is considered across MetricData + * objects scheduled for export. + * + * @param maxExportBatchSize The maximum number of data points to include in a single export + * batch. + */ public PeriodicMetricReaderBuilder setMaxExportBatchSize(int maxExportBatchSize) { checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive"); this.metricsBatcher = new MetricExportBatcher(maxExportBatchSize); From d080d1d2c89485fad855a5a4a1c98a30baf41649 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 15 Apr 2026 20:43:08 +0000 Subject: [PATCH 15/20] Refactor logic in prepareExportBatches to remove redundancy --- .../metrics/export/MetricExportBatcher.java | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 623e17bea39..c9a9ad3f500 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -107,31 +107,16 @@ private MetricDataSplitOperationResult prepareExportBatches( // Remaining capacity can't hold all points, partition existing metric data object List originalPointsList = new ArrayList<>(metricData.getData().getPoints()); Collection> preparedBatches = new ArrayList<>(); - - // Fill current batch buffer completely - int pointsToTake = remainingCapacityInCurrentBatch; int currentIndex = 0; - if (pointsToTake > 0) { - currentBatch.add( - copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); - currentIndex = pointsToTake; - preparedBatches.add(currentBatch); - } - - // Buffer spillover onto fresh partitions - int remainingPoints = totalPointsInMetricData - currentIndex; - currentBatch = new ArrayList<>(maxExportBatchSize); - remainingCapacityInCurrentBatch = maxExportBatchSize; - - // Iterate extra chunks sized to exact transport constraints - while (currentIndex < totalPointsInMetricData && remainingPoints > 0) { - pointsToTake = Math.min(remainingPoints, remainingCapacityInCurrentBatch); + while (currentIndex < totalPointsInMetricData) { + int pointsToTake = + Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch); currentBatch.add( copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); currentIndex += pointsToTake; - remainingPoints -= pointsToTake; remainingCapacityInCurrentBatch -= pointsToTake; + if (remainingCapacityInCurrentBatch == 0) { preparedBatches.add(currentBatch); currentBatch = new ArrayList<>(maxExportBatchSize); From 51c52974fbe2cbda9067cb6a54356be1f9bc8381 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 16 Apr 2026 15:58:02 +0000 Subject: [PATCH 16/20] Address comment about defensive copy for original point sublist --- .../opentelemetry/sdk/metrics/export/MetricExportBatcher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index c9a9ad3f500..30adbd9c58f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -133,7 +133,9 @@ private static MetricData copyMetricData( int dataPointsOffset, int dataPointsToTake) { List points = - originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake); + Collections.unmodifiableList( + new ArrayList<>( + originalPointsList.subList(dataPointsOffset, dataPointsOffset + dataPointsToTake))); return createMetricDataWithPoints(original, points); } From fc08f5b196abe8131730bcf4abb39c3615934d41 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 16 Apr 2026 16:16:40 +0000 Subject: [PATCH 17/20] Prevent copying MetricData for 0 points --- .../sdk/metrics/export/MetricExportBatcher.java | 11 +++++++---- .../sdk/metrics/export/MetricExportBatcherTest.java | 1 - 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java index 30adbd9c58f..4db7d2ec0cb 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcher.java @@ -112,10 +112,13 @@ private MetricDataSplitOperationResult prepareExportBatches( while (currentIndex < totalPointsInMetricData) { int pointsToTake = Math.min(totalPointsInMetricData - currentIndex, remainingCapacityInCurrentBatch); - currentBatch.add( - copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); - currentIndex += pointsToTake; - remainingCapacityInCurrentBatch -= pointsToTake; + + if (pointsToTake > 0) { + currentBatch.add( + copyMetricData(metricData, originalPointsList, currentIndex, pointsToTake)); + currentIndex += pointsToTake; + remainingCapacityInCurrentBatch -= pointsToTake; + } if (remainingCapacityInCurrentBatch == 0) { preparedBatches.add(currentBatch); diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 98f71ac5cea..48f2132c946 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -83,7 +83,6 @@ void batchMetrics_MetricFitsIntact() { } @Test - @SuppressWarnings("all") void batchMetrics_SplitsDoubleGauge_LastBatchPartiallyFilled() { MetricExportBatcher batcher = new MetricExportBatcher(2); DoublePointData p1 = ImmutableDoublePointData.create(1, 2, Attributes.empty(), 1.0); From d393c9613ecf67a7578d58d10bef71c770a6de91 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 16 Apr 2026 18:08:41 +0000 Subject: [PATCH 18/20] Add test case to verify there are no batches with empty metric points --- .../export/MetricExportBatcherTest.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java index 48f2132c946..ef8c30dbd6e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/MetricExportBatcherTest.java @@ -449,6 +449,62 @@ void batchMetrics_SplitsLongGauge_MultipleMetrics_ExceedsCapacity() { assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p5, p6); } + @Test + void batchMetrics_SplitsLongGauge_MultipleMetrics_PerfectFillThenSplit() { + // m1 fills the batch completely (remaining capacity becomes 0). + // m2 has 3 points, which forces it to split from the start of a fully-exhausted + // previous pass. + // This test case fails if there is an empty batch + MetricExportBatcher batcher = new MetricExportBatcher(2); + LongPointData p1 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 1L); + LongPointData p2 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 2L); + LongPointData p3 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 3L); + LongPointData p4 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 4L); + LongPointData p5 = ImmutableLongPointData.create(1, 2, Attributes.empty(), 5L); + + MetricData m1 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_1", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p1, p2))); + MetricData m2 = + ImmutableMetricData.createLongGauge( + Resource.empty(), + InstrumentationScopeInfo.empty(), + "name_2", + "desc", + "1", + ImmutableGaugeData.create(Arrays.asList(p3, p4, p5))); + + Collection> batches = batcher.batchMetrics(Arrays.asList(m1, m2)); + + assertThat(batches).hasSize(3); + + // Batch 1 should contain exactly m1 (p1, p2) + Collection firstBatch = batches.iterator().next(); + assertThat(firstBatch).hasSize(1); + MetricData b1m1 = firstBatch.iterator().next(); + assertThat(b1m1.getName()).isEqualTo("name_1"); + assertThat(b1m1.getLongGaugeData().getPoints()).containsExactly(p1, p2); + + // Batch 2 should contain the first part of m2 (p3, p4) + Collection secondBatch = batches.stream().skip(1).findFirst().get(); + assertThat(secondBatch).hasSize(1); + MetricData b2m1 = secondBatch.iterator().next(); + assertThat(b2m1.getName()).isEqualTo("name_2"); + assertThat(b2m1.getLongGaugeData().getPoints()).containsExactly(p3, p4); + + // Batch 3 should contain the rest of m2 (p5) + Collection thirdBatch = batches.stream().skip(2).findFirst().get(); + assertThat(thirdBatch).hasSize(1); + MetricData b3m1 = thirdBatch.iterator().next(); + assertThat(b3m1.getName()).isEqualTo("name_2"); + assertThat(b3m1.getLongGaugeData().getPoints()).containsExactly(p5); + } + @Test void batchMetrics_SplitsExponentialHistogram_MultipleBatchesCompletelyFilled_SingleMetric() { MetricExportBatcher batcher = new MetricExportBatcher(1); From 56c9385a59d2a1c26056526837cb9351b3e3bc2a Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Sun, 19 Apr 2026 04:20:36 +0000 Subject: [PATCH 19/20] Switch to sequential export --- .../metrics/export/PeriodicMetricReader.java | 46 +++++++++++++++---- .../export/PeriodicMetricReaderTest.java | 42 +++++++++++++++++ 2 files changed, 79 insertions(+), 9 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java index 14d9a236502..cff00ea392f 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java @@ -16,9 +16,8 @@ import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.Iterator; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -226,15 +225,44 @@ CompletableResultCode doRun() { exportAvailable.set(true); flushResult.succeed(); } else { - Collection> batches = null; CompletableResultCode result; if (metricsBatcher != null) { - batches = metricsBatcher.batchMetrics(metricData); - List results = new ArrayList<>(batches.size()); - for (Collection batch : batches) { - results.add(exporter.export(batch)); - } - result = CompletableResultCode.ofAll(results); + Collection> batches = metricsBatcher.batchMetrics(metricData); + CompletableResultCode sequentialResult = new CompletableResultCode(); + AtomicBoolean anyFailed = new AtomicBoolean(false); + Iterator> batchIterator = batches.iterator(); + + Runnable exportNext = + new Runnable() { + @Override + public void run() { + while (batchIterator.hasNext()) { + Collection currentBatch = batchIterator.next(); + CompletableResultCode currentResult = exporter.export(currentBatch); + if (currentResult.isDone()) { + if (!currentResult.isSuccess()) { + anyFailed.set(true); + } + } else { + currentResult.whenComplete( + () -> { + if (!currentResult.isSuccess()) { + anyFailed.set(true); + } + this.run(); + }); + return; + } + } + if (anyFailed.get()) { + sequentialResult.fail(); + } else { + sequentialResult.succeed(); + } + } + }; + exportNext.run(); + result = sequentialResult; } else { result = exporter.export(metricData); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index 6fe562dfd05..2969a3cdc8e 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -451,6 +451,48 @@ void invalidConfig() { .hasMessage("executor"); } + @Test + void periodicExport_SequentialBatches() throws Exception { + MetricExporter mockExporter = mock(MetricExporter.class); + when(mockExporter.getAggregationTemporality(any())) + .thenReturn(AggregationTemporality.CUMULATIVE); + when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + CompletableResultCode batch1Result = new CompletableResultCode(); + CompletableResultCode batch2Result = CompletableResultCode.ofSuccess(); + + // Configure mock to return pending for 1st call, success for 2nd + when(mockExporter.export(any())).thenReturn(batch1Result).thenReturn(batch2Result); + + PeriodicMetricReader reader = + PeriodicMetricReader.builder(mockExporter) + .setInterval( + Duration.ofSeconds(Integer.MAX_VALUE)) // Long interval to prevent auto-trigger + .setMaxExportBatchSize(3) + .build(); + // Setup metrics that will result in 2 batches (we have 6 points in + // LONG_POINT_LIST) + when(collectionRegistration.collectAllMetrics()) + .thenReturn(Collections.singletonList(METRIC_DATA)); + reader.register(collectionRegistration); + + // Trigger manual flush + CompletableResultCode flushResult = reader.forceFlush(); + // Verify that the first batch WAS exported + verify(mockExporter, times(1)).export(any()); + // At this point, batch 1 is stuck waiting. Batch 2 should NOT be exported yet. + // We verify that export was only called once in total so far. + verify(mockExporter, times(1)).export(any()); + // Now we complete the first batch + batch1Result.succeed(); + // Verify that the second batch IS NOW exported + verify(mockExporter, times(2)).export(any()); + // Ensure the flush operation completes successfully + assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); + reader.shutdown(); + } + @Test void stringRepresentation() { when(metricExporter.toString()).thenReturn("MockMetricExporter{}"); From d87f7850433777cffe8fe153b6fd01be83ac51c5 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Sun, 19 Apr 2026 17:18:00 +0000 Subject: [PATCH 20/20] Add tests to verify sequential export for PeriodicMetricReader --- .../export/PeriodicMetricReaderTest.java | 195 +++++++++++++++++- 1 file changed, 193 insertions(+), 2 deletions(-) diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java index 2969a3cdc8e..d7bced887bd 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java @@ -42,6 +42,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import javax.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -413,7 +417,8 @@ public CompletableResultCode shutdown() { shutdownThread.start(); // Give shutdown() time to reach the flushInProgress.join() wait. - // Even if this executes before shutdown enters the wait, the assertions below still + // Even if this executes before shutdown enters the wait, the assertions below + // still // validate correctness — they just won't exercise the concurrent case. Thread.sleep(200); @@ -428,7 +433,8 @@ public CompletableResultCode shutdown() { assertThat(flushResult.isSuccess()).isTrue(); // Final shutdown export also ran (in-flight + final = 2) assertThat(exportCount.get()).isEqualTo(2); - // Exporter.shutdown() was not called while the in-flight export was still pending + // Exporter.shutdown() was not called while the in-flight export was still + // pending assertThat(shutdownCalledWhileExportPending.get()).isFalse(); } @@ -493,6 +499,170 @@ void periodicExport_SequentialBatches() throws Exception { reader.shutdown(); } + @Test + void periodicExport_SequentialBatches_PartialFailure() throws Exception { + MetricExporter mockExporter = mock(MetricExporter.class); + when(mockExporter.getAggregationTemporality(any())) + .thenReturn(AggregationTemporality.CUMULATIVE); + when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + CompletableResultCode batch1Result = new CompletableResultCode(); + CompletableResultCode batch2Result = new CompletableResultCode(); + CompletableResultCode batch3Result = new CompletableResultCode(); + + when(mockExporter.export(any())) + .thenReturn(batch1Result) + .thenReturn(batch2Result) + .thenReturn(batch3Result); + + PeriodicMetricReader reader = + PeriodicMetricReader.builder(mockExporter) + .setInterval(Duration.ofSeconds(Integer.MAX_VALUE)) + .setMaxExportBatchSize(2) // 6 points / 2 = 3 batches + .build(); + + when(collectionRegistration.collectAllMetrics()) + .thenReturn(Collections.singletonList(METRIC_DATA)); + reader.register(collectionRegistration); + + Logger targetLogger = Logger.getLogger(PeriodicMetricReader.class.getName()); + Level originalLevel = targetLogger.getLevel(); + targetLogger.setLevel(Level.FINE); + + TestHandler testHandler = new TestHandler(); + testHandler.setLevel(Level.FINE); + targetLogger.addHandler(testHandler); + + try { + CompletableResultCode flushResult = reader.forceFlush(); + + verify(mockExporter, times(1)).export(any()); + + batch1Result.succeed(); + verify(mockExporter, times(2)).export(any()); + + batch2Result.fail(); + verify(mockExporter, times(3)).export(any()); + + batch3Result.succeed(); + + // Flush result should still be success + assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); + + boolean logFound = + testHandler.getLogRecords().stream() + .anyMatch( + record -> + record.getLevel().equals(Level.FINE) + && record.getMessage().equals("Exporter failed")); + assertThat(logFound).isTrue(); + + reader.shutdown(); + } finally { + targetLogger.removeHandler(testHandler); + targetLogger.setLevel(originalLevel); + } + } + + @Test + void periodicExport_SequentialBatches_PurelySynchronous() throws Exception { + MetricExporter mockExporter = mock(MetricExporter.class); + when(mockExporter.getAggregationTemporality(any())) + .thenReturn(AggregationTemporality.CUMULATIVE); + when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + when(mockExporter.export(any())) + .thenReturn(CompletableResultCode.ofSuccess()) + .thenReturn(CompletableResultCode.ofSuccess()) + .thenReturn(CompletableResultCode.ofSuccess()); + + PeriodicMetricReader reader = + PeriodicMetricReader.builder(mockExporter) + .setInterval(Duration.ofSeconds(Integer.MAX_VALUE)) + .setMaxExportBatchSize(2) // 6 points / 2 = 3 batches + .build(); + + when(collectionRegistration.collectAllMetrics()) + .thenReturn(Collections.singletonList(METRIC_DATA)); + reader.register(collectionRegistration); + + CompletableResultCode flushResult = reader.forceFlush(); + + // Verify that all 3 batches WERE exported immediately + verify(mockExporter, times(3)).export(any()); + + assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); + + reader.shutdown(); + } + + @Test + void periodicExport_SequentialBatches_PurelyAsynchronous() throws Exception { + MetricExporter mockExporter = mock(MetricExporter.class); + when(mockExporter.getAggregationTemporality(any())) + .thenReturn(AggregationTemporality.CUMULATIVE); + when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + + CompletableResultCode batch1Result = new CompletableResultCode(); + CompletableResultCode batch2Result = new CompletableResultCode(); + CompletableResultCode batch3Result = new CompletableResultCode(); + + when(mockExporter.export(any())) + .thenReturn(batch1Result) + .thenReturn(batch2Result) + .thenReturn(batch3Result); + + PeriodicMetricReader reader = + PeriodicMetricReader.builder(mockExporter) + .setInterval(Duration.ofSeconds(Integer.MAX_VALUE)) + .setMaxExportBatchSize(2) // 6 points / 2 = 3 batches + .build(); + + when(collectionRegistration.collectAllMetrics()) + .thenReturn(Collections.singletonList(METRIC_DATA)); + reader.register(collectionRegistration); + + Logger targetLogger = Logger.getLogger(PeriodicMetricReader.class.getName()); + Level originalLevel = targetLogger.getLevel(); + targetLogger.setLevel(Level.FINE); + + TestHandler testHandler = new TestHandler(); + testHandler.setLevel(Level.FINE); + targetLogger.addHandler(testHandler); + + try { + CompletableResultCode flushResult = reader.forceFlush(); + + verify(mockExporter, times(1)).export(any()); + + batch1Result.succeed(); + verify(mockExporter, times(2)).export(any()); + + batch2Result.succeed(); + verify(mockExporter, times(3)).export(any()); + + batch3Result.succeed(); + + assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue(); + + boolean logFound = + testHandler.getLogRecords().stream() + .anyMatch( + record -> + record.getLevel().equals(Level.FINE) + && record.getMessage().equals("Exporter failed")); + assertThat(logFound).isFalse(); + + reader.shutdown(); + } finally { + targetLogger.removeHandler(testHandler); + targetLogger.setLevel(originalLevel); + } + } + @Test void stringRepresentation() { when(metricExporter.toString()).thenReturn("MockMetricExporter{}"); @@ -567,4 +737,25 @@ List> waitForNumberOfExports(int numberOfExports) throws Except return result; } } + + private static class TestHandler extends Handler { + private final List logRecords = new ArrayList<>(); + + private TestHandler() {} + + @Override + public void publish(LogRecord record) { + logRecords.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() {} + + List getLogRecords() { + return logRecords; + } + } }