diff --git a/CHANGELOG.md b/CHANGELOG.md index 66d2be50ef..95c50a7176 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 +* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cffb9ca332..ee11ee3ebd 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1471,6 +1471,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if ref != 0 { if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil { succeededSamplesCount++ + if delayMs := time.Now().UnixMilli() - s.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } @@ -1482,6 +1485,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte newSeries = append(newSeries, copiedLabels) } succeededSamplesCount++ + if delayMs := time.Now().UnixMilli() - s.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } } @@ -1525,6 +1531,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil { succeededHistogramsCount++ i.metrics.ingestedHistogramBuckets.WithLabelValues(userID).Observe(float64(hp.BucketCount())) + // Observe ingestion delay + if delayMs := time.Now().UnixMilli() - hp.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } } else { @@ -1537,6 +1547,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } succeededHistogramsCount++ i.metrics.ingestedHistogramBuckets.WithLabelValues(userID).Observe(float64(hp.BucketCount())) + // Observe ingestion delay + if delayMs := time.Now().UnixMilli() - hp.TimestampMs; delayMs >= 0 { + i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0) + } continue } } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5dd0d1ec36..0369106daf 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2833,6 +2833,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) { require.NoError(t, err) } +func TestIngester_IngestionDelayMetric(t *testing.T) { + metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters) + + t.Run("float samples with delays", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-1" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push samples with different delays (oldest first to avoid OOO) + now := time.Now().UnixMilli() + samples := []cortexpb.Sample{ + {Value: 1, TimestampMs: now - 30000}, // 30 seconds ago + {Value: 2, TimestampMs: now - 10000}, // 10 seconds ago + {Value: 3, TimestampMs: now - 5000}, // 5 seconds ago + } + + for _, sample := range samples { + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{sample}, + nil, + nil, + cortexpb.API, + ) + _, err = ing.Push(ctx, req) + require.NoError(t, err) + } + + // Verify metric exists and has 3 observations + metricFamily, err := registry.Gather() + require.NoError(t, err) + + var found bool + var sampleCount uint64 + var sampleSum float64 + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + found = true + if metric.Histogram != nil { + sampleCount = metric.Histogram.GetSampleCount() + sampleSum = metric.Histogram.GetSampleSum() + } + } + } + } + } + } + + require.True(t, found, "ingestion delay metric not found") + assert.Equal(t, uint64(3), sampleCount, "expected 3 observations") + + // Verify delays were actually measured (sum should be positive and reasonable) + // We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added) + assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s") + }) + + t.Run("future timestamps are filtered", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + userID := "user-future" + ctx := user.InjectOrgID(context.Background(), userID) + + // Push sample with future timestamp + futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}}, + nil, + nil, + cortexpb.API, + ) + + _, err = ing.Push(ctx, req) + require.NoError(t, err) + + // Verify metric has 0 observations (future timestamp filtered) + metricFamily, err := registry.Gather() + require.NoError(t, err) + + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" && label.GetValue() == userID { + if metric.Histogram != nil { + assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(), + "future timestamp should not be observed") + } + } + } + } + } + } + }) + + t.Run("per-user isolation", func(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + // Push samples for two users + baseTime := time.Now().UnixMilli() + for _, u := range []struct { + id string + numSamples int + delayMs int64 + }{ + {"user-a", 2, 5000}, + {"user-b", 3, 30000}, + } { + ctx := user.InjectOrgID(context.Background(), u.id) + // Send in chronological order (oldest first) + for idx := u.numSamples - 1; idx >= 0; idx-- { + timestamp := baseTime - u.delayMs - int64(idx*1000) + req := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}}, + nil, + nil, + cortexpb.API, + ) + _, err := ing.Push(ctx, req) + require.NoError(t, err) + } + } + + // Verify each user has their own metric + metricFamily, err := registry.Gather() + require.NoError(t, err) + + userCounts := make(map[string]uint64) + for _, mf := range metricFamily { + if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" { + for _, metric := range mf.GetMetric() { + for _, label := range metric.GetLabel() { + if label.GetName() == "user" { + userID := label.GetValue() + if metric.Histogram != nil { + userCounts[userID] = metric.Histogram.GetSampleCount() + } + } + } + } + } + } + + assert.Equal(t, uint64(2), userCounts["user-a"]) + assert.Equal(t, uint64(3), userCounts["user-b"]) + }) +} + func BenchmarkIngesterPush(b *testing.B) { limits := defaultLimitsTestConfig() benchmarkIngesterPush(b, limits, false) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 1487696fce..b8b3269a92 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -40,6 +40,7 @@ type ingesterMetrics struct { ingestedExemplarsFail prometheus.Counter ingestedMetadataFail prometheus.Counter ingestedHistogramBuckets *prometheus.HistogramVec + ingestionDelaySeconds *prometheus.HistogramVec oooLabelsTotal *prometheus.CounterVec queries prometheus.Counter queriedSamples prometheus.Histogram @@ -144,6 +145,14 @@ func newIngesterMetrics(r prometheus.Registerer, NativeHistogramMinResetDuration: 1, Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets }, []string{"user"}), + ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ingester_ingestion_delay_seconds", + Help: "Delay in seconds between sample ingestion time and sample timestamp.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1, + Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m + }, []string{"user"}), oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_out_of_order_labels_total", Help: "The total number of out of order label found per user.", @@ -377,6 +386,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) m.ingestedHistogramBuckets.DeleteLabelValues(userID) + m.ingestionDelaySeconds.DeleteLabelValues(userID) if m.memSeriesCreatedTotal != nil { m.memSeriesCreatedTotal.DeleteLabelValues(userID)