From 55a26723a017c51c4503b2f1de3f79ce2096bf05 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Mon, 20 Apr 2026 16:25:05 -0500 Subject: [PATCH 1/5] Break out top level classes for cumulative, delta sync storage --- .../CumulativeSynchronousMetricStorage.java | 99 ++++++ .../DefaultSynchronousMetricStorage.java | 304 +----------------- .../state/DeltaSynchronousMetricStorage.java | 243 ++++++++++++++ .../SynchronousInstrumentStressTest.java | 36 ++- 4 files changed, 367 insertions(+), 315 deletions(-) create mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java create mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java new file mode 100644 index 00000000000..66ed05a2b9d --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java @@ -0,0 +1,99 @@ +package io.opentelemetry.sdk.metrics.internal.state; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; +import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; +import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; +import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; +import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; +import io.opentelemetry.sdk.resources.Resource; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; + +class CumulativeSynchronousMetricStorage + extends DefaultSynchronousMetricStorage { + private final MemoryMode memoryMode; + private final ConcurrentHashMap> aggregatorHandles = + new ConcurrentHashMap<>(); + // Only populated if memoryMode == REUSABLE_DATA + private final ArrayList reusableResultList = new ArrayList<>(); + + CumulativeSynchronousMetricStorage( + MetricDescriptor metricDescriptor, + Aggregator aggregator, + AttributesProcessor attributesProcessor, + Clock clock, + int maxCardinality, + boolean enabled, + MemoryMode memoryMode) { + super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled); + this.memoryMode = memoryMode; + } + + @Override + void doRecordLong(long value, Attributes attributes, Context context) { + getAggregatorHandle(aggregatorHandles, attributes, context) + .recordLong(value, attributes, context); + } + + @Override + void doRecordDouble(double value, Attributes attributes, Context context) { + getAggregatorHandle(aggregatorHandles, attributes, context) + .recordDouble(value, attributes, context); + } + + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + // No aggregator handle pooling for cumulative temporality + return null; + } + + @Override + public MetricData collect( + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { + List points; + if (memoryMode == REUSABLE_DATA) { + reusableResultList.clear(); + points = reusableResultList; + } else { + points = new ArrayList<>(aggregatorHandles.size()); + } + + // Grab aggregated points. + aggregatorHandles.forEach( + (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + return; + } + // Start time for cumulative synchronous instruments is the time the first series + // measurement was recorded. I.e. the time the AggregatorHandle was created. + T point = + handle.aggregateThenMaybeReset( + handle.getCreationEpochNanos(), epochNanos, attributes, /* reset= */ false); + + if (point != null) { + points.add(point); + } + }); + + if (points.isEmpty() || !enabled) { + return EmptyMetricData.getInstance(); + } + + return aggregator.toMetricData( + resource, instrumentationScopeInfo, metricDescriptor, points, CUMULATIVE); + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index a44508d6659..eef3c245508 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -5,33 +5,22 @@ package io.opentelemetry.sdk.metrics.internal.state; -import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; -import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; -import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; -import io.opentelemetry.sdk.common.InstrumentationScopeInfo; -import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.internal.ThrottlingLogger; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; -import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; -import io.opentelemetry.sdk.resources.Resource; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -62,7 +51,7 @@ public abstract class DefaultSynchronousMetricStorage protected volatile boolean enabled; - private DefaultSynchronousMetricStorage( + DefaultSynchronousMetricStorage( MetricDescriptor metricDescriptor, Aggregator aggregator, AttributesProcessor attributesProcessor, @@ -186,295 +175,4 @@ public MetricDescriptor getMetricDescriptor() { return metricDescriptor; } - private static class DeltaSynchronousMetricStorage - extends DefaultSynchronousMetricStorage { - private final long instrumentCreationEpochNanos; - private final RegisteredReader registeredReader; - private final MemoryMode memoryMode; - - private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); - // Only populated if memoryMode == REUSABLE_DATA - private volatile ConcurrentHashMap> - previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); - // Only populated if memoryMode == REUSABLE_DATA - private final ArrayList reusableResultList = new ArrayList<>(); - private final ConcurrentLinkedQueue> aggregatorHandlePool = - new ConcurrentLinkedQueue<>(); - - DeltaSynchronousMetricStorage( - RegisteredReader registeredReader, - MetricDescriptor metricDescriptor, - Aggregator aggregator, - AttributesProcessor attributesProcessor, - Clock clock, - int maxCardinality, - boolean enabled) { - super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled); - this.instrumentCreationEpochNanos = clock.now(); - this.registeredReader = registeredReader; - this.memoryMode = registeredReader.getReader().getMemoryMode(); - } - - @Override - void doRecordLong(long value, Attributes attributes, Context context) { - AggregatorHolder holderForRecord = getHolderForRecord(); - try { - getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) - .recordLong(value, attributes, context); - } finally { - releaseHolderForRecord(holderForRecord); - } - } - - @Override - void doRecordDouble(double value, Attributes attributes, Context context) { - AggregatorHolder holderForRecord = getHolderForRecord(); - try { - getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) - .recordDouble(value, attributes, context); - } finally { - releaseHolderForRecord(holderForRecord); - } - } - - @Nullable - @Override - AggregatorHandle maybeGetPooledAggregatorHandle() { - return aggregatorHandlePool.poll(); - } - - /** - * Obtain the AggregatorHolder for recording measurements, re-reading the volatile - * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets - * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced. - * Record operations increment recordInProgress by 2. Callers MUST call {@link - * #releaseHolderForRecord(AggregatorHolder)} when record operation completes to signal to that - * its safe to proceed with Collect operations. - */ - private AggregatorHolder getHolderForRecord() { - do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 == 0) { - return aggregatorHolder; - } else { - // Collect is in progress, decrement recordsInProgress to allow collect to proceed and - // re-read aggregatorHolder - aggregatorHolder.activeRecordingThreads.addAndGet(-2); - } - } while (true); - } - - /** - * Called on the {@link AggregatorHolder} obtained from {@link #getHolderForRecord()} to - * indicate that recording is complete, and it is safe to collect. - */ - private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { - aggregatorHolder.activeRecordingThreads.addAndGet(-2); - } - - @Override - public MetricData collect( - Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { - ConcurrentHashMap> aggregatorHandles; - AggregatorHolder holder = this.aggregatorHolder; - this.aggregatorHolder = - (memoryMode == REUSABLE_DATA) - ? new AggregatorHolder<>(previousCollectionAggregatorHandles) - : new AggregatorHolder<>(); - - // Increment recordsInProgress by 1, which produces an odd number acting as a signal that - // record operations should re-read the volatile this.aggregatorHolder. - // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record - // operations are complete. - int recordsInProgress = holder.activeRecordingThreads.addAndGet(1); - while (recordsInProgress > 1) { - recordsInProgress = holder.activeRecordingThreads.get(); - } - aggregatorHandles = holder.aggregatorHandles; - - List points; - if (memoryMode == REUSABLE_DATA) { - reusableResultList.clear(); - points = reusableResultList; - } else { - points = new ArrayList<>(aggregatorHandles.size()); - } - - // In DELTA aggregation temporality each Attributes is reset to 0 - // every time we perform a collection (by definition of DELTA). - // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles - // (into which the values are recorded) effectively starting from 0 - // for each recorded Attributes. - // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing - // a key-value from a map and putting it again on next recording will cost an allocation, - // we are keeping the aggregator handles in their map, and only reset their value once - // we finish collecting the aggregated value from each one. - // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory, - // hence during collect(), when the map is at full capacity, we try to clear away unused - // aggregator handles, so on next recording cycle using this map, there will be room for newly - // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided - // if the user chooses to increase the maxCardinality. - if (memoryMode == REUSABLE_DATA) { - if (aggregatorHandles.size() >= maxCardinality) { - aggregatorHandles.forEach( - (attribute, handle) -> { - if (!handle.hasRecordedValues()) { - aggregatorHandles.remove(attribute); - } - }); - } - } - - // Start time for synchronous delta instruments is the time of the last collection, or if no - // collection has yet taken place, the time the instrument was created. - long startEpochNanos = - registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos); - - // Grab aggregated points. - aggregatorHandles.forEach( - (attributes, handle) -> { - if (!handle.hasRecordedValues()) { - return; - } - T point = - handle.aggregateThenMaybeReset( - startEpochNanos, epochNanos, attributes, /* reset= */ true); - - if (memoryMode == IMMUTABLE_DATA) { - // Return the aggregator to the pool. - // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is - // always used as it is the place accumulating the values and never resets) - // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid - // using the pool since it allocates memory internally on each put() or remove() - aggregatorHandlePool.offer(handle); - } - - if (point != null) { - points.add(point); - } - }); - - // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are - // created during collection. - int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1); - for (int i = 0; i < toDelete; i++) { - aggregatorHandlePool.poll(); - } - - if (memoryMode == REUSABLE_DATA) { - previousCollectionAggregatorHandles = aggregatorHandles; - } - - if (points.isEmpty() || !enabled) { - return EmptyMetricData.getInstance(); - } - - return aggregator.toMetricData( - resource, instrumentationScopeInfo, metricDescriptor, points, DELTA); - } - } - - private static class AggregatorHolder { - private final ConcurrentHashMap> aggregatorHandles; - // Recording threads grab the current interval (AggregatorHolder) and atomically increment - // this by 2 before recording against it (and then decrement by two when done). - // - // The collection thread grabs the current interval (AggregatorHolder) and atomically - // increments this by 1 to "lock" this interval (and then waits for any active recording - // threads to complete before collecting it). - // - // Recording threads check the return value of their atomic increment, and if it's odd - // that means the collector thread has "locked" this interval for collection. - // - // But before the collector "locks" the interval it sets up a new current interval - // (AggregatorHolder), and so if a recording thread encounters an odd value, - // all it needs to do is release the "read lock" it just obtained (decrementing by 2), - // and then grab and record against the new current interval (AggregatorHolder). - private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); - - private AggregatorHolder() { - aggregatorHandles = new ConcurrentHashMap<>(); - } - - private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { - this.aggregatorHandles = aggregatorHandles; - } - } - - private static class CumulativeSynchronousMetricStorage - extends DefaultSynchronousMetricStorage { - private final MemoryMode memoryMode; - private final ConcurrentHashMap> aggregatorHandles = - new ConcurrentHashMap<>(); - // Only populated if memoryMode == REUSABLE_DATA - private final ArrayList reusableResultList = new ArrayList<>(); - - CumulativeSynchronousMetricStorage( - MetricDescriptor metricDescriptor, - Aggregator aggregator, - AttributesProcessor attributesProcessor, - Clock clock, - int maxCardinality, - boolean enabled, - MemoryMode memoryMode) { - super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled); - this.memoryMode = memoryMode; - } - - @Override - void doRecordLong(long value, Attributes attributes, Context context) { - getAggregatorHandle(aggregatorHandles, attributes, context) - .recordLong(value, attributes, context); - } - - @Override - void doRecordDouble(double value, Attributes attributes, Context context) { - getAggregatorHandle(aggregatorHandles, attributes, context) - .recordDouble(value, attributes, context); - } - - @Nullable - @Override - AggregatorHandle maybeGetPooledAggregatorHandle() { - // No aggregator handle pooling for cumulative temporality - return null; - } - - @Override - public MetricData collect( - Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { - List points; - if (memoryMode == REUSABLE_DATA) { - reusableResultList.clear(); - points = reusableResultList; - } else { - points = new ArrayList<>(aggregatorHandles.size()); - } - - // Grab aggregated points. - aggregatorHandles.forEach( - (attributes, handle) -> { - if (!handle.hasRecordedValues()) { - return; - } - // Start time for cumulative synchronous instruments is the time the first series - // measurement was recorded. I.e. the time the AggregatorHandle was created. - T point = - handle.aggregateThenMaybeReset( - handle.getCreationEpochNanos(), epochNanos, attributes, /* reset= */ false); - - if (point != null) { - points.add(point); - } - }); - - if (points.isEmpty() || !enabled) { - return EmptyMetricData.getInstance(); - } - - return aggregator.toMetricData( - resource, instrumentationScopeInfo, metricDescriptor, points, CUMULATIVE); - } - } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java new file mode 100644 index 00000000000..a6bdca3d524 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java @@ -0,0 +1,243 @@ +package io.opentelemetry.sdk.metrics.internal.state; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.common.export.MemoryMode; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; +import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; +import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; +import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; +import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; +import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; +import io.opentelemetry.sdk.resources.Resource; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; + +class DeltaSynchronousMetricStorage + extends DefaultSynchronousMetricStorage { + private final long instrumentCreationEpochNanos; + private final RegisteredReader registeredReader; + private final MemoryMode memoryMode; + + private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); + // Only populated if memoryMode == REUSABLE_DATA + private volatile ConcurrentHashMap> + previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); + // Only populated if memoryMode == REUSABLE_DATA + private final ArrayList reusableResultList = new ArrayList<>(); + private final ConcurrentLinkedQueue> aggregatorHandlePool = + new ConcurrentLinkedQueue<>(); + + DeltaSynchronousMetricStorage( + RegisteredReader registeredReader, + MetricDescriptor metricDescriptor, + Aggregator aggregator, + AttributesProcessor attributesProcessor, + Clock clock, + int maxCardinality, + boolean enabled) { + super(metricDescriptor, aggregator, attributesProcessor, clock, maxCardinality, enabled); + this.instrumentCreationEpochNanos = clock.now(); + this.registeredReader = registeredReader; + this.memoryMode = registeredReader.getReader().getMemoryMode(); + } + + @Override + void doRecordLong(long value, Attributes attributes, Context context) { + AggregatorHolder holderForRecord = getHolderForRecord(); + try { + getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) + .recordLong(value, attributes, context); + } finally { + releaseHolderForRecord(holderForRecord); + } + } + + @Override + void doRecordDouble(double value, Attributes attributes, Context context) { + AggregatorHolder holderForRecord = getHolderForRecord(); + try { + getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) + .recordDouble(value, attributes, context); + } finally { + releaseHolderForRecord(holderForRecord); + } + } + + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + return aggregatorHandlePool.poll(); + } + + /** + * Obtain the AggregatorHolder for recording measurements, re-reading the volatile + * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets + * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced. + * Record operations increment recordInProgress by 2. Callers MUST call {@link + * #releaseHolderForRecord(DeltaSynchronousMetricStorage.AggregatorHolder)} when record operation completes to signal to that + * its safe to proceed with Collect operations. + */ + private AggregatorHolder getHolderForRecord() { + do { + AggregatorHolder aggregatorHolder = this.aggregatorHolder; + int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); + if (recordsInProgress % 2 == 0) { + return aggregatorHolder; + } else { + // Collect is in progress, decrement recordsInProgress to allow collect to proceed and + // re-read aggregatorHolder + aggregatorHolder.activeRecordingThreads.addAndGet(-2); + } + } while (true); + } + + /** + * Called on the {@link DeltaSynchronousMetricStorage.AggregatorHolder} obtained from {@link #getHolderForRecord()} to + * indicate that recording is complete, and it is safe to collect. + */ + private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { + aggregatorHolder.activeRecordingThreads.addAndGet(-2); + } + + @Override + public MetricData collect( + Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { + ConcurrentHashMap> aggregatorHandles; + AggregatorHolder holder = this.aggregatorHolder; + this.aggregatorHolder = + (memoryMode == REUSABLE_DATA) + ? new AggregatorHolder<>(previousCollectionAggregatorHandles) + : new AggregatorHolder<>(); + + // Increment recordsInProgress by 1, which produces an odd number acting as a signal that + // record operations should re-read the volatile this.aggregatorHolder. + // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record + // operations are complete. + int recordsInProgress = holder.activeRecordingThreads.addAndGet(1); + while (recordsInProgress > 1) { + recordsInProgress = holder.activeRecordingThreads.get(); + } + aggregatorHandles = holder.aggregatorHandles; + + List points; + if (memoryMode == REUSABLE_DATA) { + reusableResultList.clear(); + points = reusableResultList; + } else { + points = new ArrayList<>(aggregatorHandles.size()); + } + + // In DELTA aggregation temporality each Attributes is reset to 0 + // every time we perform a collection (by definition of DELTA). + // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles + // (into which the values are recorded) effectively starting from 0 + // for each recorded Attributes. + // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing + // a key-value from a map and putting it again on next recording will cost an allocation, + // we are keeping the aggregator handles in their map, and only reset their value once + // we finish collecting the aggregated value from each one. + // The SDK must adhere to keeping no more than maxCardinality unique Attributes in memory, + // hence during collect(), when the map is at full capacity, we try to clear away unused + // aggregator handles, so on next recording cycle using this map, there will be room for newly + // recorded Attributes. This comes at the expanse of memory allocations. This can be avoided + // if the user chooses to increase the maxCardinality. + if (memoryMode == REUSABLE_DATA) { + if (aggregatorHandles.size() >= maxCardinality) { + aggregatorHandles.forEach( + (attribute, handle) -> { + if (!handle.hasRecordedValues()) { + aggregatorHandles.remove(attribute); + } + }); + } + } + + // Start time for synchronous delta instruments is the time of the last collection, or if no + // collection has yet taken place, the time the instrument was created. + long startEpochNanos = + registeredReader.getLastCollectEpochNanosOrDefault(instrumentCreationEpochNanos); + + // Grab aggregated points. + aggregatorHandles.forEach( + (attributes, handle) -> { + if (!handle.hasRecordedValues()) { + return; + } + T point = + handle.aggregateThenMaybeReset( + startEpochNanos, epochNanos, attributes, /* reset= */ true); + + if (memoryMode == IMMUTABLE_DATA) { + // Return the aggregator to the pool. + // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is + // always used as it is the place accumulating the values and never resets) + // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid + // using the pool since it allocates memory internally on each put() or remove() + aggregatorHandlePool.offer(handle); + } + + if (point != null) { + points.add(point); + } + }); + + // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are + // created during collection. + int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1); + for (int i = 0; i < toDelete; i++) { + aggregatorHandlePool.poll(); + } + + if (memoryMode == REUSABLE_DATA) { + previousCollectionAggregatorHandles = aggregatorHandles; + } + + if (points.isEmpty() || !enabled) { + return EmptyMetricData.getInstance(); + } + + return aggregator.toMetricData( + resource, instrumentationScopeInfo, metricDescriptor, points, DELTA); + } + + private static class AggregatorHolder { + private final ConcurrentHashMap> aggregatorHandles; + // Recording threads grab the current interval (AggregatorHolder) and atomically increment + // this by 2 before recording against it (and then decrement by two when done). + // + // The collection thread grabs the current interval (AggregatorHolder) and atomically + // increments this by 1 to "lock" this interval (and then waits for any active recording + // threads to complete before collecting it). + // + // Recording threads check the return value of their atomic increment, and if it's odd + // that means the collector thread has "locked" this interval for collection. + // + // But before the collector "locks" the interval it sets up a new current interval + // (AggregatorHolder), and so if a recording thread encounters an odd value, + // all it needs to do is release the "read lock" it just obtained (decrementing by 2), + // and then grab and record against the new current interval (AggregatorHolder). + private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + + private AggregatorHolder() { + aggregatorHandles = new ConcurrentHashMap<>(); + } + + private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { + this.aggregatorHandles = aggregatorHandles; + } + } +} diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java index af12cc6af12..2622f678198 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -87,6 +87,18 @@ void stressTest( Aggregation aggregation, MemoryMode memoryMode, InstrumentValueType instrumentValueType) { + for (int repetition = 0; repetition < 50; repetition++) { + stressTestOnce( + aggregationTemporality, instrumentType, aggregation, memoryMode, instrumentValueType); + } + } + + private void stressTestOnce( + AggregationTemporality aggregationTemporality, + InstrumentType instrumentType, + Aggregation aggregation, + MemoryMode memoryMode, + InstrumentValueType instrumentValueType) { // Initialize metric SDK DefaultAggregationSelector aggregationSelector = DefaultAggregationSelector.getDefault().with(instrumentType, aggregation); @@ -100,6 +112,8 @@ void stressTest( SdkMeterProvider.builder().registerMetricReader(reader).build(); cleanup.addCloseable(meterProvider); Meter meter = meterProvider.get("test"); + List attributes = Arrays.asList(ATTR_1, ATTR_2, ATTR_3, ATTR_4); + Collections.shuffle(attributes); Instrument instrument = getInstrument(meter, instrumentType, instrumentValueType); // Define list of measurements to record @@ -120,8 +134,6 @@ void stressTest( recordThreads.add( new Thread( () -> { - List attributes = Arrays.asList(ATTR_1, ATTR_2, ATTR_3, ATTR_4); - Collections.shuffle(attributes); for (Long measurement : measurements) { for (Attributes attr : attributes) { instrument.record(measurement, attr); @@ -263,8 +275,8 @@ void stressTest( assertThat(p.getMax()).isEqualTo((double) max.get()); assertThat(p.getZeroCount()).isEqualTo(zeroCount.get()); assertThat( - p.getPositiveBuckets().getBucketCounts().stream() - .reduce(0L, Long::sum)) + p.getPositiveBuckets().getBucketCounts().stream() + .reduce(0L, Long::sum)) .isEqualTo(totalCount.get() - zeroCount.get()); })); } else { @@ -307,15 +319,15 @@ private static Instrument getInstrument( case HISTOGRAM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter - .histogramBuilder(INSTRUMENT_NAME) - .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) - .build() - ::record + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .build() + ::record : meter - .histogramBuilder(INSTRUMENT_NAME) - .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) - .ofLongs() - .build() + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .ofLongs() + .build() ::record; case GAUGE: return instrumentValueType == InstrumentValueType.DOUBLE From 195e93d0d4c7d8a200919887652a583679988803 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:06:32 -0500 Subject: [PATCH 2/5] wip --- .../DefaultSynchronousMetricStorage.java | 4 +- .../state/DeltaSynchronousMetricStorage.java | 172 +++++++++++------- 2 files changed, 104 insertions(+), 72 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index eef3c245508..a21c3f13ec8 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -37,8 +37,8 @@ public abstract class DefaultSynchronousMetricStorage private static final Logger internalLogger = Logger.getLogger(DefaultSynchronousMetricStorage.class.getName()); - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - private final AttributesProcessor attributesProcessor; + final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); + final AttributesProcessor attributesProcessor; protected final Clock clock; protected final MetricDescriptor metricDescriptor; protected final Aggregator aggregator; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java index a6bdca3d524..5b4207509f4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java @@ -18,9 +18,11 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; @@ -34,7 +36,7 @@ class DeltaSynchronousMetricStorage private volatile AggregatorHolder aggregatorHolder = new AggregatorHolder<>(); // Only populated if memoryMode == REUSABLE_DATA - private volatile ConcurrentHashMap> + private volatile ConcurrentHashMap> previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); // Only populated if memoryMode == REUSABLE_DATA private final ArrayList reusableResultList = new ArrayList<>(); @@ -57,66 +59,91 @@ class DeltaSynchronousMetricStorage @Override void doRecordLong(long value, Attributes attributes, Context context) { - AggregatorHolder holderForRecord = getHolderForRecord(); - try { - getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) - .recordLong(value, attributes, context); - } finally { - releaseHolderForRecord(holderForRecord); - } + do { + AggregatorHolder aggregatorHolder = this.aggregatorHolder; + DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); + int recordsInProgress = deltaAggregatorHandle.activeRecordingThreads.addAndGet(2); + if (recordsInProgress % 2 != 0) { + deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); + } else { + try { + deltaAggregatorHandle.handle.recordLong(value, attributes, context); + break; + } finally { + deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); + } + } + } while (true); } @Override void doRecordDouble(double value, Attributes attributes, Context context) { - AggregatorHolder holderForRecord = getHolderForRecord(); - try { - getAggregatorHandle(holderForRecord.aggregatorHandles, attributes, context) - .recordDouble(value, attributes, context); - } finally { - releaseHolderForRecord(holderForRecord); - } - } - - @Nullable - @Override - AggregatorHandle maybeGetPooledAggregatorHandle() { - return aggregatorHandlePool.poll(); - } - - /** - * Obtain the AggregatorHolder for recording measurements, re-reading the volatile - * this.aggregatorHolder until we access one where recordsInProgress is even. Collect sets - * recordsInProgress to odd as a signal that AggregatorHolder is stale and is being replaced. - * Record operations increment recordInProgress by 2. Callers MUST call {@link - * #releaseHolderForRecord(DeltaSynchronousMetricStorage.AggregatorHolder)} when record operation completes to signal to that - * its safe to proceed with Collect operations. - */ - private AggregatorHolder getHolderForRecord() { do { AggregatorHolder aggregatorHolder = this.aggregatorHolder; - int recordsInProgress = aggregatorHolder.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 == 0) { - return aggregatorHolder; + DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); + int recordsInProgress = deltaAggregatorHandle.activeRecordingThreads.addAndGet(2); + if (recordsInProgress % 2 != 0) { + deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); } else { - // Collect is in progress, decrement recordsInProgress to allow collect to proceed and - // re-read aggregatorHolder - aggregatorHolder.activeRecordingThreads.addAndGet(-2); + try { + deltaAggregatorHandle.handle.recordDouble(value, attributes, context); + break; + } finally { + deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); + } } } while (true); } - /** - * Called on the {@link DeltaSynchronousMetricStorage.AggregatorHolder} obtained from {@link #getHolderForRecord()} to - * indicate that recording is complete, and it is safe to collect. - */ - private void releaseHolderForRecord(AggregatorHolder aggregatorHolder) { - aggregatorHolder.activeRecordingThreads.addAndGet(-2); + private DeltaAggregatorHandle getDeltaAggregatorHandle( + ConcurrentHashMap> aggregatorHandles, + Attributes attributes, + Context context) { + Objects.requireNonNull(attributes, "attributes"); + attributes = attributesProcessor.process(attributes, context); + DeltaAggregatorHandle handle = aggregatorHandles.get(attributes); + if (handle != null) { + return handle; + } + if (aggregatorHandles.size() >= maxCardinality) { + logger.log( + Level.WARNING, + "Instrument " + + metricDescriptor.getSourceInstrument().getName() + + " has exceeded the maximum allowed cardinality (" + + maxCardinality + + ")."); + // Return handle for overflow series, first checking if a handle already exists for it + attributes = MetricStorage.CARDINALITY_OVERFLOW; + handle = aggregatorHandles.get(attributes); + if (handle != null) { + return handle; + } + } + // Get handle from pool if available, else create a new one. + // Note: pooled handles (used only for delta temporality) retain their original + // creationEpochNanos, but delta storage does not use the handle's creation time for the + // start epoch — it uses the reader's last collect time directly in collect(). So the stale + // creation time on a recycled handle does not affect correctness. + AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); + if (newHandle == null) { + newHandle = aggregator.createHandle(clock.now()); + } + DeltaAggregatorHandle newDeltaHandle = new DeltaAggregatorHandle<>(newHandle); + handle = aggregatorHandles.putIfAbsent(attributes, newDeltaHandle); + return handle != null ? handle : newDeltaHandle; + } + + @Nullable + @Override + AggregatorHandle maybeGetPooledAggregatorHandle() { + return aggregatorHandlePool.poll(); } @Override public MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { - ConcurrentHashMap> aggregatorHandles; + ConcurrentHashMap> aggregatorHandles; AggregatorHolder holder = this.aggregatorHolder; this.aggregatorHolder = (memoryMode == REUSABLE_DATA) @@ -127,10 +154,14 @@ public MetricData collect( // record operations should re-read the volatile this.aggregatorHolder. // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record // operations are complete. - int recordsInProgress = holder.activeRecordingThreads.addAndGet(1); - while (recordsInProgress > 1) { - recordsInProgress = holder.activeRecordingThreads.get(); - } + holder.aggregatorHandles.values().forEach(handle -> handle.activeRecordingThreads.addAndGet(1)); + holder.aggregatorHandles.values().forEach(handle -> { + int recordsInProgress = handle.activeRecordingThreads.get(); + while (recordsInProgress > 1) { + recordsInProgress = handle.activeRecordingThreads.get(); + } + handle.activeRecordingThreads.addAndGet(-1); + }); aggregatorHandles = holder.aggregatorHandles; List points; @@ -159,7 +190,7 @@ public MetricData collect( if (aggregatorHandles.size() >= maxCardinality) { aggregatorHandles.forEach( (attribute, handle) -> { - if (!handle.hasRecordedValues()) { + if (!handle.handle.hasRecordedValues()) { aggregatorHandles.remove(attribute); } }); @@ -174,11 +205,11 @@ public MetricData collect( // Grab aggregated points. aggregatorHandles.forEach( (attributes, handle) -> { - if (!handle.hasRecordedValues()) { + if (!handle.handle.hasRecordedValues()) { return; } T point = - handle.aggregateThenMaybeReset( + handle.handle.aggregateThenMaybeReset( startEpochNanos, epochNanos, attributes, /* reset= */ true); if (memoryMode == IMMUTABLE_DATA) { @@ -187,7 +218,7 @@ public MetricData collect( // always used as it is the place accumulating the values and never resets) // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid // using the pool since it allocates memory internally on each put() or remove() - aggregatorHandlePool.offer(handle); + aggregatorHandlePool.offer(handle.handle); } if (point != null) { @@ -215,29 +246,30 @@ public MetricData collect( } private static class AggregatorHolder { - private final ConcurrentHashMap> aggregatorHandles; - // Recording threads grab the current interval (AggregatorHolder) and atomically increment - // this by 2 before recording against it (and then decrement by two when done). - // - // The collection thread grabs the current interval (AggregatorHolder) and atomically - // increments this by 1 to "lock" this interval (and then waits for any active recording - // threads to complete before collecting it). - // - // Recording threads check the return value of their atomic increment, and if it's odd - // that means the collector thread has "locked" this interval for collection. - // - // But before the collector "locks" the interval it sets up a new current interval - // (AggregatorHolder), and so if a recording thread encounters an odd value, - // all it needs to do is release the "read lock" it just obtained (decrementing by 2), - // and then grab and record against the new current interval (AggregatorHolder). - private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + private final ConcurrentHashMap> aggregatorHandles; private AggregatorHolder() { aggregatorHandles = new ConcurrentHashMap<>(); } - private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { + private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; } } + + private static final class DeltaAggregatorHandle { + final AggregatorHandle handle; + // Uses the same even/odd protocol as the former AggregatorHolder.activeRecordingThreads, + // but scoped to a single series instead of the entire map: + // - Recording threads increment by 2 before recording, decrement by 2 when done. + // - The collect thread increments by 1 (making the count odd) as a signal that this + // handle is being collected; recorders that observe an odd count release and retry. + // - Once all in-flight recordings finish the count returns to 1, and the collect + // thread decrements by 1 to restore it to even for the next cycle. + final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + + DeltaAggregatorHandle(AggregatorHandle handle) { + this.handle = handle; + } + } } From aec0fd634712a28226509ccaf2ef56b178021fff Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Mon, 20 Apr 2026 19:34:37 -0500 Subject: [PATCH 3/5] tests passing --- .../state/DeltaSynchronousMetricStorage.java | 127 ++++++++++++------ 1 file changed, 85 insertions(+), 42 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java index 5b4207509f4..b0bdf5d49fe 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java @@ -61,17 +61,15 @@ class DeltaSynchronousMetricStorage void doRecordLong(long value, Attributes attributes, Context context) { do { AggregatorHolder aggregatorHolder = this.aggregatorHolder; - DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); - int recordsInProgress = deltaAggregatorHandle.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 != 0) { + DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder, attributes, context); + if (deltaAggregatorHandle == null) { + continue; + } + try { + deltaAggregatorHandle.handle.recordLong(value, attributes, context); + break; + } finally { deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); - } else { - try { - deltaAggregatorHandle.handle.recordLong(value, attributes, context); - break; - } finally { - deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); - } } } while (true); } @@ -80,32 +78,29 @@ void doRecordLong(long value, Attributes attributes, Context context) { void doRecordDouble(double value, Attributes attributes, Context context) { do { AggregatorHolder aggregatorHolder = this.aggregatorHolder; - DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder.aggregatorHandles, attributes, context); - int recordsInProgress = deltaAggregatorHandle.activeRecordingThreads.addAndGet(2); - if (recordsInProgress % 2 != 0) { + DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder, attributes, context); + if (deltaAggregatorHandle == null) { + continue; + } + try { + deltaAggregatorHandle.handle.recordDouble(value, attributes, context); + break; + } finally { deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); - } else { - try { - deltaAggregatorHandle.handle.recordDouble(value, attributes, context); - break; - } finally { - deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); - } } } while (true); } - private DeltaAggregatorHandle getDeltaAggregatorHandle( - ConcurrentHashMap> aggregatorHandles, + @Nullable + protected DeltaAggregatorHandle getDeltaAggregatorHandle( + AggregatorHolder holder, Attributes attributes, Context context) { Objects.requireNonNull(attributes, "attributes"); attributes = attributesProcessor.process(attributes, context); + ConcurrentHashMap> aggregatorHandles = holder.aggregatorHandles; DeltaAggregatorHandle handle = aggregatorHandles.get(attributes); - if (handle != null) { - return handle; - } - if (aggregatorHandles.size() >= maxCardinality) { + if (handle == null && aggregatorHandles.size() >= maxCardinality) { logger.log( Level.WARNING, "Instrument " @@ -113,25 +108,60 @@ private DeltaAggregatorHandle getDeltaAggregatorHandle( + " has exceeded the maximum allowed cardinality (" + maxCardinality + ")."); - // Return handle for overflow series, first checking if a handle already exists for it attributes = MetricStorage.CARDINALITY_OVERFLOW; handle = aggregatorHandles.get(attributes); - if (handle != null) { - return handle; + } + if (handle != null) { + // Existing series: pre-increment the per-handle counter and check if odd (locked by the + // collect thread's lock pass). + int count = handle.activeRecordingThreads.addAndGet(2); + if (count % 2 != 0) { + handle.activeRecordingThreads.addAndGet(-2); + return null; // handle is being collected; caller should retry with new holder } + // Also check the holder-level counter. The collect thread sets it to 1 (odd) and never + // resets it. This catches the window after the collect thread's wait-pass decrements the + // per-handle counter back to 0 (even) but before collection finishes: a stale thread that + // read the old holder can still reach here with an even per-handle count. The hb chain + // (CT's holder lock → CT's wait-pass decrement → this addAndGet(2)) guarantees we see + // the holder counter as odd at that point. + if (holder.activeRecordingThreads.get() % 2 != 0) { + handle.activeRecordingThreads.addAndGet(-2); + return null; // holder is being collected; caller should retry with new holder + } + return handle; } - // Get handle from pool if available, else create a new one. - // Note: pooled handles (used only for delta temporality) retain their original - // creationEpochNanos, but delta storage does not use the handle's creation time for the - // start epoch — it uses the reader's last collect time directly in collect(). So the stale - // creation time on a recycled handle does not affect correctness. - AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); - if (newHandle == null) { - newHandle = aggregator.createHandle(clock.now()); + // New series: use the holder-level gate to coordinate with the collect thread. + // The gate ensures (a) we don't insert into a holder whose lock pass has already run, + // and (b) the per-handle pre-increment below is visible to the collect thread's lock pass. + int holderCount = holder.activeRecordingThreads.addAndGet(2); + if (holderCount % 2 != 0) { + holder.activeRecordingThreads.addAndGet(-2); + return null; // holder is being collected; caller should retry with new holder + } + try { + // Get handle from pool if available, else create a new one. + // Note: pooled handles (used only for delta temporality) retain their original + // creationEpochNanos, but delta storage does not use the handle's creation time for the + // start epoch — it uses the reader's last collect time directly in collect(). So the stale + // creation time on a recycled handle does not affect correctness. + AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); + if (newHandle == null) { + newHandle = aggregator.createHandle(clock.now()); + } + DeltaAggregatorHandle newDeltaHandle = new DeltaAggregatorHandle<>(newHandle); + handle = aggregatorHandles.putIfAbsent(attributes, newDeltaHandle); + if (handle == null) { + handle = newDeltaHandle; + } + // Pre-increment per-handle counter while the holder gate is still held. The collect + // thread's lock pass cannot start until all threads release the holder gate, so this + // increment is guaranteed to be observed by the lock pass before it runs. + handle.activeRecordingThreads.addAndGet(2); + return handle; + } finally { + holder.activeRecordingThreads.addAndGet(-2); } - DeltaAggregatorHandle newDeltaHandle = new DeltaAggregatorHandle<>(newHandle); - handle = aggregatorHandles.putIfAbsent(attributes, newDeltaHandle); - return handle != null ? handle : newDeltaHandle; } @Nullable @@ -150,8 +180,17 @@ public MetricData collect( ? new AggregatorHolder<>(previousCollectionAggregatorHandles) : new AggregatorHolder<>(); - // Increment recordsInProgress by 1, which produces an odd number acting as a signal that - // record operations should re-read the volatile this.aggregatorHolder. + // Lock out new series creation in the old holder by making its activeRecordingThreads odd, + // then wait until it equals 1, meaning no new-series creation is in flight. + // This guarantees the per-handle lock pass below sees every handle that will ever be + // inserted into holder.aggregatorHandles. + int holderRecordingThreads = holder.activeRecordingThreads.addAndGet(1); + while (holderRecordingThreads != 1) { + holderRecordingThreads = holder.activeRecordingThreads.get(); + } + + // Increment per-handle recordsInProgress by 1, which produces an odd number acting as a + // signal that record operations should re-read the volatile this.aggregatorHolder. // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record // operations are complete. holder.aggregatorHandles.values().forEach(handle -> handle.activeRecordingThreads.addAndGet(1)); @@ -247,6 +286,10 @@ public MetricData collect( private static class AggregatorHolder { private final ConcurrentHashMap> aggregatorHandles; + // Used as a gate for new-series creation (not for per-handle recording contention). + // Recording threads creating a new series increment by 2; the collect thread increments + // by 1 to lock out new-series creation and waits for the value to return to 1. + private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); private AggregatorHolder() { aggregatorHandles = new ConcurrentHashMap<>(); From e09fb8d0fc151006ba807e3367b2733b72098ccf Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 21 Apr 2026 08:48:26 -0500 Subject: [PATCH 4/5] spotless --- .../CumulativeSynchronousMetricStorage.java | 14 ++++-- .../DefaultSynchronousMetricStorage.java | 1 - .../state/DeltaSynchronousMetricStorage.java | 50 +++++++++++-------- .../SynchronousInstrumentStressTest.java | 20 ++++---- 4 files changed, 49 insertions(+), 36 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java index 66ed05a2b9d..932c0615e61 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java @@ -1,5 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.sdk.metrics.internal.state; +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; + import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; @@ -13,14 +21,10 @@ import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import io.opentelemetry.sdk.resources.Resource; - -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; - -import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; -import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; +import javax.annotation.Nullable; class CumulativeSynchronousMetricStorage extends DefaultSynchronousMetricStorage { diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index a21c3f13ec8..d2f94ba00c4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -174,5 +174,4 @@ protected AggregatorHandle getAggregatorHandle( public MetricDescriptor getMetricDescriptor() { return metricDescriptor; } - } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java index b0bdf5d49fe..710f13838c4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java @@ -1,5 +1,14 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.sdk.metrics.internal.state; +import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; +import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; + import io.opentelemetry.api.common.Attributes; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; @@ -14,8 +23,6 @@ import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; import io.opentelemetry.sdk.resources.Resource; - -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -23,10 +30,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; - -import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA; -import static io.opentelemetry.sdk.common.export.MemoryMode.REUSABLE_DATA; -import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; +import javax.annotation.Nullable; class DeltaSynchronousMetricStorage extends DefaultSynchronousMetricStorage { @@ -61,7 +65,8 @@ class DeltaSynchronousMetricStorage void doRecordLong(long value, Attributes attributes, Context context) { do { AggregatorHolder aggregatorHolder = this.aggregatorHolder; - DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder, attributes, context); + DeltaAggregatorHandle deltaAggregatorHandle = + getDeltaAggregatorHandle(aggregatorHolder, attributes, context); if (deltaAggregatorHandle == null) { continue; } @@ -78,7 +83,8 @@ void doRecordLong(long value, Attributes attributes, Context context) { void doRecordDouble(double value, Attributes attributes, Context context) { do { AggregatorHolder aggregatorHolder = this.aggregatorHolder; - DeltaAggregatorHandle deltaAggregatorHandle = getDeltaAggregatorHandle(aggregatorHolder, attributes, context); + DeltaAggregatorHandle deltaAggregatorHandle = + getDeltaAggregatorHandle(aggregatorHolder, attributes, context); if (deltaAggregatorHandle == null) { continue; } @@ -93,12 +99,11 @@ void doRecordDouble(double value, Attributes attributes, Context context) { @Nullable protected DeltaAggregatorHandle getDeltaAggregatorHandle( - AggregatorHolder holder, - Attributes attributes, - Context context) { + AggregatorHolder holder, Attributes attributes, Context context) { Objects.requireNonNull(attributes, "attributes"); attributes = attributesProcessor.process(attributes, context); - ConcurrentHashMap> aggregatorHandles = holder.aggregatorHandles; + ConcurrentHashMap> aggregatorHandles = + holder.aggregatorHandles; DeltaAggregatorHandle handle = aggregatorHandles.get(attributes); if (handle == null && aggregatorHandles.size() >= maxCardinality) { logger.log( @@ -194,13 +199,17 @@ public MetricData collect( // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record // operations are complete. holder.aggregatorHandles.values().forEach(handle -> handle.activeRecordingThreads.addAndGet(1)); - holder.aggregatorHandles.values().forEach(handle -> { - int recordsInProgress = handle.activeRecordingThreads.get(); - while (recordsInProgress > 1) { - recordsInProgress = handle.activeRecordingThreads.get(); - } - handle.activeRecordingThreads.addAndGet(-1); - }); + holder + .aggregatorHandles + .values() + .forEach( + handle -> { + int recordsInProgress = handle.activeRecordingThreads.get(); + while (recordsInProgress > 1) { + recordsInProgress = handle.activeRecordingThreads.get(); + } + handle.activeRecordingThreads.addAndGet(-1); + }); aggregatorHandles = holder.aggregatorHandles; List points; @@ -295,7 +304,8 @@ private AggregatorHolder() { aggregatorHandles = new ConcurrentHashMap<>(); } - private AggregatorHolder(ConcurrentHashMap> aggregatorHandles) { + private AggregatorHolder( + ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java index 2622f678198..bb30a0f28da 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SynchronousInstrumentStressTest.java @@ -275,8 +275,8 @@ private void stressTestOnce( assertThat(p.getMax()).isEqualTo((double) max.get()); assertThat(p.getZeroCount()).isEqualTo(zeroCount.get()); assertThat( - p.getPositiveBuckets().getBucketCounts().stream() - .reduce(0L, Long::sum)) + p.getPositiveBuckets().getBucketCounts().stream() + .reduce(0L, Long::sum)) .isEqualTo(totalCount.get() - zeroCount.get()); })); } else { @@ -319,15 +319,15 @@ private static Instrument getInstrument( case HISTOGRAM: return instrumentValueType == InstrumentValueType.DOUBLE ? meter - .histogramBuilder(INSTRUMENT_NAME) - .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) - .build() - ::record + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .build() + ::record : meter - .histogramBuilder(INSTRUMENT_NAME) - .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) - .ofLongs() - .build() + .histogramBuilder(INSTRUMENT_NAME) + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .ofLongs() + .build() ::record; case GAUGE: return instrumentValueType == InstrumentValueType.DOUBLE From ba7165230c457ec1df672d63068df64ef344df18 Mon Sep 17 00:00:00 2001 From: Jack Berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 21 Apr 2026 09:11:03 -0500 Subject: [PATCH 5/5] Cleanup --- .../CumulativeSynchronousMetricStorage.java | 36 ++- .../DefaultSynchronousMetricStorage.java | 45 ---- .../state/DeltaSynchronousMetricStorage.java | 251 ++++++++++-------- .../state/SynchronousMetricStorageTest.java | 69 ++--- 4 files changed, 188 insertions(+), 213 deletions(-) diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java index 932c0615e61..32db280cea0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/CumulativeSynchronousMetricStorage.java @@ -23,8 +23,9 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; +import java.util.logging.Level; class CumulativeSynchronousMetricStorage extends DefaultSynchronousMetricStorage { @@ -58,11 +59,34 @@ void doRecordDouble(double value, Attributes attributes, Context context) { .recordDouble(value, attributes, context); } - @Nullable - @Override - AggregatorHandle maybeGetPooledAggregatorHandle() { - // No aggregator handle pooling for cumulative temporality - return null; + private AggregatorHandle getAggregatorHandle( + ConcurrentHashMap> aggregatorHandles, + Attributes attributes, + Context context) { + Objects.requireNonNull(attributes, "attributes"); + attributes = attributesProcessor.process(attributes, context); + AggregatorHandle handle = aggregatorHandles.get(attributes); + if (handle != null) { + return handle; + } + if (aggregatorHandles.size() >= maxCardinality) { + logger.log( + Level.WARNING, + "Instrument " + + metricDescriptor.getSourceInstrument().getName() + + " has exceeded the maximum allowed cardinality (" + + maxCardinality + + ")."); + // Return handle for overflow series, first checking if a handle already exists for it + attributes = MetricStorage.CARDINALITY_OVERFLOW; + handle = aggregatorHandles.get(attributes); + if (handle != null) { + return handle; + } + } + AggregatorHandle newHandle = aggregator.createHandle(clock.now()); + handle = aggregatorHandles.putIfAbsent(attributes, newHandle); + return handle != null ? handle : newHandle; } @Override diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java index d2f94ba00c4..6fd960d6bd4 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DefaultSynchronousMetricStorage.java @@ -15,15 +15,11 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor; import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader; import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Stores aggregated {@link MetricData} for synchronous instruments. @@ -129,47 +125,6 @@ public boolean isEnabled() { return enabled; } - protected AggregatorHandle getAggregatorHandle( - ConcurrentHashMap> aggregatorHandles, - Attributes attributes, - Context context) { - Objects.requireNonNull(attributes, "attributes"); - attributes = attributesProcessor.process(attributes, context); - AggregatorHandle handle = aggregatorHandles.get(attributes); - if (handle != null) { - return handle; - } - if (aggregatorHandles.size() >= maxCardinality) { - logger.log( - Level.WARNING, - "Instrument " - + metricDescriptor.getSourceInstrument().getName() - + " has exceeded the maximum allowed cardinality (" - + maxCardinality - + ")."); - // Return handle for overflow series, first checking if a handle already exists for it - attributes = MetricStorage.CARDINALITY_OVERFLOW; - handle = aggregatorHandles.get(attributes); - if (handle != null) { - return handle; - } - } - // Get handle from pool if available, else create a new one. - // Note: pooled handles (used only for delta temporality) retain their original - // creationEpochNanos, but delta storage does not use the handle's creation time for the - // start epoch — it uses the reader's last collect time directly in collect(). So the stale - // creation time on a recycled handle does not affect correctness. - AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); - if (newHandle == null) { - newHandle = aggregator.createHandle(clock.now()); - } - handle = aggregatorHandles.putIfAbsent(attributes, newHandle); - return handle != null ? handle : newHandle; - } - - @Nullable - abstract AggregatorHandle maybeGetPooledAggregatorHandle(); - @Override public MetricDescriptor getMetricDescriptor() { return metricDescriptor; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java index 710f13838c4..3e3d216878e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DeltaSynchronousMetricStorage.java @@ -44,7 +44,7 @@ class DeltaSynchronousMetricStorage previousCollectionAggregatorHandles = new ConcurrentHashMap<>(); // Only populated if memoryMode == REUSABLE_DATA private final ArrayList reusableResultList = new ArrayList<>(); - private final ConcurrentLinkedQueue> aggregatorHandlePool = + private final ConcurrentLinkedQueue> aggregatorHandlePool = new ConcurrentLinkedQueue<>(); DeltaSynchronousMetricStorage( @@ -63,38 +63,32 @@ class DeltaSynchronousMetricStorage @Override void doRecordLong(long value, Attributes attributes, Context context) { - do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - DeltaAggregatorHandle deltaAggregatorHandle = - getDeltaAggregatorHandle(aggregatorHolder, attributes, context); - if (deltaAggregatorHandle == null) { - continue; - } - try { - deltaAggregatorHandle.handle.recordLong(value, attributes, context); - break; - } finally { - deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); - } - } while (true); + DeltaAggregatorHandle handle = acquireHandleForRecord(attributes, context); + try { + handle.handle.recordLong(value, attributes, context); + } finally { + handle.releaseRecord(); + } } @Override void doRecordDouble(double value, Attributes attributes, Context context) { - do { - AggregatorHolder aggregatorHolder = this.aggregatorHolder; - DeltaAggregatorHandle deltaAggregatorHandle = - getDeltaAggregatorHandle(aggregatorHolder, attributes, context); - if (deltaAggregatorHandle == null) { - continue; - } - try { - deltaAggregatorHandle.handle.recordDouble(value, attributes, context); - break; - } finally { - deltaAggregatorHandle.activeRecordingThreads.addAndGet(-2); + DeltaAggregatorHandle handle = acquireHandleForRecord(attributes, context); + try { + handle.handle.recordDouble(value, attributes, context); + } finally { + handle.releaseRecord(); + } + } + + private DeltaAggregatorHandle acquireHandleForRecord(Attributes attributes, Context context) { + while (true) { + DeltaAggregatorHandle handle = + getDeltaAggregatorHandle(this.aggregatorHolder, attributes, context); + if (handle != null) { + return handle; } - } while (true); + } } @Nullable @@ -117,64 +111,53 @@ protected DeltaAggregatorHandle getDeltaAggregatorHandle( handle = aggregatorHandles.get(attributes); } if (handle != null) { - // Existing series: pre-increment the per-handle counter and check if odd (locked by the - // collect thread's lock pass). - int count = handle.activeRecordingThreads.addAndGet(2); - if (count % 2 != 0) { - handle.activeRecordingThreads.addAndGet(-2); - return null; // handle is being collected; caller should retry with new holder + // Existing series: try to acquire a recording slot. Returns false if the collector has + // locked this handle (odd state), meaning we should retry with the new holder. + if (!handle.tryAcquireForRecord()) { + return null; } - // Also check the holder-level counter. The collect thread sets it to 1 (odd) and never - // resets it. This catches the window after the collect thread's wait-pass decrements the - // per-handle counter back to 0 (even) but before collection finishes: a stale thread that - // read the old holder can still reach here with an even per-handle count. The hb chain - // (CT's holder lock → CT's wait-pass decrement → this addAndGet(2)) guarantees we see - // the holder counter as odd at that point. - if (holder.activeRecordingThreads.get() % 2 != 0) { - handle.activeRecordingThreads.addAndGet(-2); - return null; // holder is being collected; caller should retry with new holder + // Also check the holder-level gate. The collect thread sets it to locked (odd) and never + // resets it. This catches the window after the collect thread's awaitRecordersAndUnlock() + // decrements the per-handle state back to even but before collection finishes: a stale + // thread that read the old holder can still reach here with an even per-handle state. The + // hb chain (CT's holder lock → CT's awaitRecordersAndUnlock() decrement → this + // tryAcquireForRecord) guarantees we see the holder gate as locked at that point. + if (holder.isLockedForCollect()) { + handle.releaseRecord(); + return null; } return handle; } - // New series: use the holder-level gate to coordinate with the collect thread. + // New series: acquire the holder gate to coordinate with the collect thread. // The gate ensures (a) we don't insert into a holder whose lock pass has already run, // and (b) the per-handle pre-increment below is visible to the collect thread's lock pass. - int holderCount = holder.activeRecordingThreads.addAndGet(2); - if (holderCount % 2 != 0) { - holder.activeRecordingThreads.addAndGet(-2); - return null; // holder is being collected; caller should retry with new holder + if (!holder.tryAcquireForNewSeries()) { + return null; } try { // Get handle from pool if available, else create a new one. - // Note: pooled handles (used only for delta temporality) retain their original - // creationEpochNanos, but delta storage does not use the handle's creation time for the - // start epoch — it uses the reader's last collect time directly in collect(). So the stale - // creation time on a recycled handle does not affect correctness. - AggregatorHandle newHandle = maybeGetPooledAggregatorHandle(); - if (newHandle == null) { - newHandle = aggregator.createHandle(clock.now()); + // Note: pooled handles retain their original creationEpochNanos, but delta storage does not + // use the handle's creation time for the start epoch — it uses the reader's last collect time + // directly in collect(). So the stale creation time on a recycled handle does not affect + // correctness. + DeltaAggregatorHandle newDeltaHandle = aggregatorHandlePool.poll(); + if (newDeltaHandle == null) { + newDeltaHandle = new DeltaAggregatorHandle<>(aggregator.createHandle(clock.now())); } - DeltaAggregatorHandle newDeltaHandle = new DeltaAggregatorHandle<>(newHandle); handle = aggregatorHandles.putIfAbsent(attributes, newDeltaHandle); if (handle == null) { handle = newDeltaHandle; } - // Pre-increment per-handle counter while the holder gate is still held. The collect + // Pre-increment per-handle state while the holder gate is still held. The collect // thread's lock pass cannot start until all threads release the holder gate, so this // increment is guaranteed to be observed by the lock pass before it runs. - handle.activeRecordingThreads.addAndGet(2); + handle.acquireForRecord(); return handle; } finally { - holder.activeRecordingThreads.addAndGet(-2); + holder.releaseNewSeries(); } } - @Nullable - @Override - AggregatorHandle maybeGetPooledAggregatorHandle() { - return aggregatorHandlePool.poll(); - } - @Override public MetricData collect( Resource resource, InstrumentationScopeInfo instrumentationScopeInfo, long epochNanos) { @@ -185,31 +168,14 @@ public MetricData collect( ? new AggregatorHolder<>(previousCollectionAggregatorHandles) : new AggregatorHolder<>(); - // Lock out new series creation in the old holder by making its activeRecordingThreads odd, - // then wait until it equals 1, meaning no new-series creation is in flight. - // This guarantees the per-handle lock pass below sees every handle that will ever be - // inserted into holder.aggregatorHandles. - int holderRecordingThreads = holder.activeRecordingThreads.addAndGet(1); - while (holderRecordingThreads != 1) { - holderRecordingThreads = holder.activeRecordingThreads.get(); - } + // Lock out new series creation in the old holder and wait for any in-flight new-series + // operations to complete. This guarantees the per-handle lock pass below sees every handle + // that will ever be inserted into holder.aggregatorHandles. + holder.lockForCollectAndAwait(); - // Increment per-handle recordsInProgress by 1, which produces an odd number acting as a - // signal that record operations should re-read the volatile this.aggregatorHolder. - // Repeatedly grab recordsInProgress until it is <= 1, which signals all active record - // operations are complete. - holder.aggregatorHandles.values().forEach(handle -> handle.activeRecordingThreads.addAndGet(1)); - holder - .aggregatorHandles - .values() - .forEach( - handle -> { - int recordsInProgress = handle.activeRecordingThreads.get(); - while (recordsInProgress > 1) { - recordsInProgress = handle.activeRecordingThreads.get(); - } - handle.activeRecordingThreads.addAndGet(-1); - }); + // Lock each handle and wait for any in-flight recorders against it to finish. + holder.aggregatorHandles.values().forEach(DeltaAggregatorHandle::lockForCollect); + holder.aggregatorHandles.values().forEach(DeltaAggregatorHandle::awaitRecordersAndUnlock); aggregatorHandles = holder.aggregatorHandles; List points; @@ -222,9 +188,8 @@ public MetricData collect( // In DELTA aggregation temporality each Attributes is reset to 0 // every time we perform a collection (by definition of DELTA). - // In IMMUTABLE_DATA MemoryMode, this is accomplished by removing all aggregator handles - // (into which the values are recorded) effectively starting from 0 - // for each recorded Attributes. + // In IMMUTABLE_DATA MemoryMode, this is accomplished by swapping in a new empty holder, + // abandoning the old map so each new recording in the next interval starts fresh from 0. // In REUSABLE_DATA MemoryMode, we strive for zero allocations. Since even removing // a key-value from a map and putting it again on next recording will cost an allocation, // we are keeping the aggregator handles in their map, and only reset their value once @@ -261,12 +226,10 @@ public MetricData collect( startEpochNanos, epochNanos, attributes, /* reset= */ true); if (memoryMode == IMMUTABLE_DATA) { - // Return the aggregator to the pool. - // The pool is only used in DELTA temporality (since in CUMULATIVE the handler is - // always used as it is the place accumulating the values and never resets) - // AND only in IMMUTABLE_DATA memory mode since in REUSABLE_DATA we avoid - // using the pool since it allocates memory internally on each put() or remove() - aggregatorHandlePool.offer(handle.handle); + // Return the handle to the pool. + // Only in IMMUTABLE_DATA memory mode: in REUSABLE_DATA we avoid using the pool + // since ConcurrentLinkedQueue.offer() allocates memory internally. + aggregatorHandlePool.offer(handle); } if (point != null) { @@ -274,13 +237,6 @@ public MetricData collect( } }); - // Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are - // created during collection. - int toDelete = aggregatorHandlePool.size() - (maxCardinality + 1); - for (int i = 0; i < toDelete; i++) { - aggregatorHandlePool.poll(); - } - if (memoryMode == REUSABLE_DATA) { previousCollectionAggregatorHandles = aggregatorHandles; } @@ -295,10 +251,12 @@ public MetricData collect( private static class AggregatorHolder { private final ConcurrentHashMap> aggregatorHandles; - // Used as a gate for new-series creation (not for per-handle recording contention). - // Recording threads creating a new series increment by 2; the collect thread increments - // by 1 to lock out new-series creation and waits for the value to return to 1. - private final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + // Guards new-series creation using an even/odd protocol: + // - Threads creating a new series increment by 2 (keeping the value even while unlocked) + // and decrement by 2 on release. + // - The collect thread increments by 1 (making the value odd) to lock out new-series + // creation, then waits for the value to return to 1 (no threads in-flight). + private final AtomicInteger newSeriesGate = new AtomicInteger(0); private AggregatorHolder() { aggregatorHandles = new ConcurrentHashMap<>(); @@ -308,21 +266,90 @@ private AggregatorHolder( ConcurrentHashMap> aggregatorHandles) { this.aggregatorHandles = aggregatorHandles; } + + /** Returns true and acquires the gate if not locked for collection. */ + boolean tryAcquireForNewSeries() { + int s = newSeriesGate.addAndGet(2); + if ((s & 1) != 0) { + newSeriesGate.addAndGet(-2); + return false; + } + return true; + } + + /** Releases the gate acquired via {@link #tryAcquireForNewSeries()}. */ + void releaseNewSeries() { + newSeriesGate.addAndGet(-2); + } + + /** Returns true if the collector has locked this holder against new-series creation. */ + boolean isLockedForCollect() { + return (newSeriesGate.get() & 1) != 0; + } + + /** Locks new-series creation and waits for any in-flight new-series operations to complete. */ + void lockForCollectAndAwait() { + int s = newSeriesGate.addAndGet(1); + while (s != 1) { + s = newSeriesGate.get(); + } + } } private static final class DeltaAggregatorHandle { final AggregatorHandle handle; - // Uses the same even/odd protocol as the former AggregatorHolder.activeRecordingThreads, - // but scoped to a single series instead of the entire map: + // Guards per-handle recording using the same even/odd protocol as + // AggregatorHolder.newSeriesGate, + // but scoped to a single series: // - Recording threads increment by 2 before recording, decrement by 2 when done. // - The collect thread increments by 1 (making the count odd) as a signal that this // handle is being collected; recorders that observe an odd count release and retry. // - Once all in-flight recordings finish the count returns to 1, and the collect // thread decrements by 1 to restore it to even for the next cycle. - final AtomicInteger activeRecordingThreads = new AtomicInteger(0); + private final AtomicInteger state = new AtomicInteger(0); DeltaAggregatorHandle(AggregatorHandle handle) { this.handle = handle; } + + /** + * Tries to acquire a recording slot. Returns false if the collector has locked this handle (odd + * state); the caller should retry with a fresh holder. + */ + boolean tryAcquireForRecord() { + int s = state.addAndGet(2); + if ((s & 1) != 0) { + state.addAndGet(-2); + return false; + } + return true; + } + + /** + * Acquires a recording slot unconditionally. Only safe to call while the holder gate is held, + * which prevents the collector from starting its lock pass. + */ + void acquireForRecord() { + state.addAndGet(2); + } + + /** + * Releases a recording slot acquired via {@link #tryAcquireForRecord()} or {@link + * #acquireForRecord()}. + */ + void releaseRecord() { + state.addAndGet(-2); + } + + /** Signals that collection is starting. Recorders that observe this will abort and retry. */ + void lockForCollect() { + state.addAndGet(1); + } + + /** Waits for all in-flight recorders to finish, then clears the collection lock. */ + void awaitRecordersAndUnlock() { + while (state.get() > 1) {} + state.addAndGet(-1); + } } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java index ec2ad73652f..6a694d81d9a 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/SynchronousMetricStorageTest.java @@ -33,7 +33,6 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorHandle; import io.opentelemetry.sdk.metrics.internal.aggregator.EmptyMetricData; import io.opentelemetry.sdk.metrics.internal.descriptor.Advice; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; @@ -287,9 +286,7 @@ void recordAndCollect_DeltaResets_ImmutableData() { // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> @@ -300,9 +297,7 @@ void recordAndCollect_DeltaResets_ImmutableData() { .hasStartEpochNanos(testClock.now()) .hasEpochNanos(10) .hasValue(3))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(1); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(1); deltaReader.setLastCollectEpochNanos(10); // Record measurement and collect at time 30 @@ -310,35 +305,27 @@ void recordAndCollect_DeltaResets_ImmutableData() { // AggregatorHandle should be returned to the pool on reset so shouldn't create additional // handles verify(aggregator, times(1)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(1); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(1); deltaReader.setLastCollectEpochNanos(30); // Record measurement and collect at time 35 storage.recordDouble(2, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(2))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(1); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(1); } @Test @@ -358,9 +345,7 @@ void recordAndCollect_DeltaResets_ReusableData() { // Record measurement and collect at time 10 storage.recordDouble(3, Attributes.empty(), Context.current()); verify(aggregator, times(1)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> @@ -371,9 +356,7 @@ void recordAndCollect_DeltaResets_ReusableData() { .hasStartEpochNanos(testClock.now()) .hasEpochNanos(10) .hasValue(3))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); deltaReader.setLastCollectEpochNanos(10); @@ -382,18 +365,14 @@ void recordAndCollect_DeltaResets_ReusableData() { // We're switched to secondary map so a handle will be created verify(aggregator, times(2)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> sum.isDelta() .hasPointsSatisfying( point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); deltaReader.setLastCollectEpochNanos(30); @@ -405,9 +384,7 @@ void recordAndCollect_DeltaResets_ReusableData() { // aggregator handle is still there, thus no handle was created for empty(), but it will for // the "foo" verify(aggregator, times(3)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); MetricData metricData = storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 35); assertThat(metricData).hasDoubleSumSatisfying(DoubleSumAssert::isDelta); @@ -435,9 +412,7 @@ void recordAndCollect_DeltaResets_ReusableData() { Attributes.of(AttributeKey.stringKey("foo"), "bar")); }))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); deltaReader.setLastCollectEpochNanos(40); storage.recordDouble(6, Attributes.of(AttributeKey.stringKey("foo"), "bar"), Context.current()); @@ -551,9 +526,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { 3, Attributes.builder().put("key", "value" + i).build(), Context.current()); } verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 10)) .hasDoubleSumSatisfying( sum -> @@ -568,7 +541,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { assertThat(point.getValue()).isEqualTo(3); }))); assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .extracting("aggregatorHandlePool", as(collection(Object.class))) .hasSize(CARDINALITY_LIMIT - 1); assertThat(logs.getEvents()).isEmpty(); @@ -580,7 +553,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { // Should use handle returned to pool instead of creating new ones verify(aggregator, times(CARDINALITY_LIMIT - 1)).createHandle(testClock.now()); assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .extracting("aggregatorHandlePool", as(collection(Object.class))) .hasSize(CARDINALITY_LIMIT - 2); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 20)) .hasDoubleSumSatisfying( @@ -597,7 +570,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { .put("key", "value" + CARDINALITY_LIMIT) .build()))); assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .extracting("aggregatorHandlePool", as(collection(Object.class))) .hasSize(CARDINALITY_LIMIT - 1); assertThat(logs.getEvents()).isEmpty(); deltaReader.setLastCollectEpochNanos(20); @@ -610,9 +583,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { } // Should use handles returned to pool instead of creating new ones verify(aggregator, times(CARDINALITY_LIMIT)).createHandle(testClock.now()); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .hasSize(0); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).hasSize(0); assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 30)) .hasDoubleSumSatisfying( sum -> @@ -639,7 +610,7 @@ void recordAndCollect_DeltaAtLimit_ImmutableDataMemoryMode() { .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW)))); assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) + .extracting("aggregatorHandlePool", as(collection(Object.class))) .hasSize(CARDINALITY_LIMIT); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); } @@ -728,9 +699,7 @@ void recordAndCollect_DeltaAtLimit_ReusableDataMemoryMode() { assertThat(point.getAttributes()) .isEqualTo(MetricStorage.CARDINALITY_OVERFLOW)))); - assertThat(storage) - .extracting("aggregatorHandlePool", as(collection(AggregatorHandle.class))) - .isEmpty(); + assertThat(storage).extracting("aggregatorHandlePool", as(collection(Object.class))).isEmpty(); logs.assertContains("Instrument name has exceeded the maximum allowed cardinality"); }