From 23ccaeabe53a77622ba7442fc5a745f0e1057962 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 21 Apr 2026 05:48:16 +0000 Subject: [PATCH 1/2] adding ingestion delay metric Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 14 +++ pkg/ingester/ingester_test.go | 187 ++++++++++++++++++++++++++++++++++ pkg/ingester/metrics.go | 10 ++ 4 files changed, 212 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66d2be50ef2..cae2697e1cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #6748 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cffb9ca332b..ee11ee3ebd6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1471,6 +1471,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil { succeededSamplesCount++ + if delayMs := time.Now().UnixMilli() - s.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } @@ -1482,6 +1485,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededSamplesCount++ + if delayMs := time.Now().UnixMilli() - s.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } } @@ -1525,6 +1531,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil { succeededHistogramsCount++ i.metrics.ingestedHistogramBuckets.WithLabelValues(userID).Observe(float64(hp.BucketCount())) + // Observe ingestion delay + if delayMs := time.Now().UnixMilli() - hp.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } } else { @@ -1537,6 +1547,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } succeededHistogramsCount++ i.metrics.ingestedHistogramBuckets.WithLabelValues(userID).Observe(float64(hp.BucketCount())) + // Observe ingestion delay + if delayMs := time.Now().UnixMilli() - hp.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5dd0d1ec36b..0369106daf6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2833,6 +2833,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) { require.NoError(t, err) } +func TestIngester_IngestionDelayMetric(t *testing.T) { + metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters) + + t.Run("float samples with delays", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-1" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push samples with different delays (oldest first to avoid OOO) + now := time.Now().UnixMilli() + samples := []cortexpb.Sample{ + {Value: 1, TimestampMs: now - 30000}, // 30 seconds ago + {Value: 2, TimestampMs: now - 10000}, // 10 seconds ago + {Value: 3, TimestampMs: now - 5000}, // 5 seconds ago + } + + for _, sample := range samples { + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{sample}, + nil, + nil, + cortexpb.API, + ) + _, err = ing.Push(ctx, req) + require.NoError(t, err) + } + + // Verify metric exists and has 3 observations + metricFamily, err := registry.Gather() + require.NoError(t, err) + + var found bool + var sampleCount uint64 + var sampleSum float64 + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + found = true + if metric.Histogram != nil { + sampleCount = metric.Histogram.GetSampleCount() + sampleSum = metric.Histogram.GetSampleSum() + } + } + } + } + } + } + + require.True(t, found, "ingestion delay metric not found") + assert.Equal(t, uint64(3), sampleCount, "expected 3 observations") + + // Verify delays were actually measured (sum should be positive and reasonable) + // We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added) + assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s") + }) + + t.Run("future timestamps are filtered", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-future" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push sample with future timestamp + futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}}, + nil, + nil, + cortexpb.API, + ) + + _, err = ing.Push(ctx, req) + require.NoError(t, err) + + // Verify metric has 0 observations (future timestamp filtered) + metricFamily, err := registry.Gather() + require.NoError(t, err) + + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + if metric.Histogram != nil { + assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(), + "future timestamp should not be observed") + } + } + } + } + } + } + }) + + t.Run("per-user isolation", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + // Push samples for two users + baseTime := time.Now().UnixMilli() + for _, u := range []struct { + id string + numSamples int + delayMs int64 + }{ + {"user-a", 2, 5000}, + {"user-b", 3, 30000}, + } { + ctx := user.InjectOrgID(context.Background(), u.id) + // Send in chronological order (oldest first) + for idx := u.numSamples - 1; idx >= 0; idx-- { + timestamp := baseTime - u.delayMs - int64(idx*1000) + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}}, + nil, + nil, + cortexpb.API, + ) + _, err := ing.Push(ctx, req) + require.NoError(t, err) + } + } + + // Verify each user has their own metric + metricFamily, err := registry.Gather() + require.NoError(t, err) + + userCounts := make(map[string]uint64) + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" { + userID := label.GetValue() + if metric.Histogram != nil { + userCounts[userID] = metric.Histogram.GetSampleCount() + } + } + } + } + } + } + + assert.Equal(t, uint64(2), userCounts["user-a"]) + assert.Equal(t, uint64(3), userCounts["user-b"]) + }) +} + func BenchmarkIngesterPush(b *testing.B) { limits := defaultLimitsTestConfig() benchmarkIngesterPush(b, limits, false) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 1487696fce3..b8b3269a925 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -40,6 +40,7 @@ type ingesterMetrics struct { ingestedExemplarsFail prometheus.Counter ingestedMetadataFail prometheus.Counter ingestedHistogramBuckets *prometheus.HistogramVec + ingestionDelaySeconds *prometheus.HistogramVec oooLabelsTotal *prometheus.CounterVec queries prometheus.Counter queriedSamples prometheus.Histogram @@ -144,6 +145,14 @@ func newIngesterMetrics(r prometheus.Registerer, NativeHistogramMinResetDuration: 1, Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets }, []string{"user"}), + ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ingester_ingestion_delay_seconds", + Help: "Delay in seconds between sample ingestion time and sample timestamp.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1, + Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m + }, []string{"user"}), oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_out_of_order_labels_total", Help: "The total number of out of order label found per user.", @@ -377,6 +386,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) m.ingestedHistogramBuckets.DeleteLabelValues(userID) + m.ingestionDelaySeconds.DeleteLabelValues(userID) if m.memSeriesCreatedTotal != nil { m.memSeriesCreatedTotal.DeleteLabelValues(userID) From 9091bc7342d5b10e0c47264bb452fc4522ccea27 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 21 Apr 2026 06:48:39 +0000 Subject: [PATCH 2/2] updated pr id Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cae2697e1cd..95c50a7176d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 -* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #6748 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359