Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [ENHANCEMENT] Ingester: Add `cortex_ingester_ingestion_delay_seconds` native histogram metric to track the delay between sample ingestion time and sample timestamp. #7443
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359
Expand Down
14 changes: 14 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if ref != 0 {
if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil {
succeededSamplesCount++
if delayMs := time.Now().UnixMilli() - s.TimestampMs; delayMs >= 0 {
i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0)
}
continue
}

Expand All @@ -1482,6 +1485,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
newSeries = append(newSeries, copiedLabels)
}
succeededSamplesCount++
if delayMs := time.Now().UnixMilli() - s.TimestampMs; delayMs >= 0 {
i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0)
}
continue
}
}
Expand Down Expand Up @@ -1525,6 +1531,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil {
succeededHistogramsCount++
i.metrics.ingestedHistogramBuckets.WithLabelValues(userID).Observe(float64(hp.BucketCount()))
// Observe ingestion delay
if delayMs := time.Now().UnixMilli() - hp.TimestampMs; delayMs >= 0 {
i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line adds 15% overhead

go test -tags "netgo slicelabels" -run='^$' -bench=BenchmarkIngesterPush -benchtime=5s -count=3 ./pkg/ingester/...

it could be improved to 6/10% with something like:

  delayObserver := i.metrics.ingestionDelaySeconds.WithLabelValues(userID)                                                                                                                                                                      
...                                                                                                                                                                              
  observeDelay := func(timestampMs int64) {                                                                                                                                                                                                     
      if delayMs := time.Now().UnixMilli() - timestampMs; delayMs >= 0 {                                                                                                                                                                        
          delayObserver.Observe(float64(delayMs) / 1000.0)                                                                                                                                                                                      
      }                                                                                                                                                                                                                                         
  }  

The existing ingestedHistogramBuckets has the same per-sample WithLabelValues pattern and would benefit from the same treatment. But let's leave that for another PR

}
continue
}
} else {
Expand All @@ -1537,6 +1547,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
}
succeededHistogramsCount++
i.metrics.ingestedHistogramBuckets.WithLabelValues(userID).Observe(float64(hp.BucketCount()))
// Observe ingestion delay
if delayMs := time.Now().UnixMilli() - hp.TimestampMs; delayMs >= 0 {
i.metrics.ingestionDelaySeconds.WithLabelValues(userID).Observe(float64(delayMs) / 1000.0)
}
continue
}
}
Expand Down
187 changes: 187 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,193 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) {
require.NoError(t, err)
}

func TestIngester_IngestionDelayMetric(t *testing.T) {
metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := cortexpb.FromLabelAdaptersToLabels(metricLabelAdapters)

t.Run("float samples with delays", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

userID := "user-1"
ctx := user.InjectOrgID(context.Background(), userID)

// Push samples with different delays (oldest first to avoid OOO)
now := time.Now().UnixMilli()
samples := []cortexpb.Sample{
{Value: 1, TimestampMs: now - 30000}, // 30 seconds ago
{Value: 2, TimestampMs: now - 10000}, // 10 seconds ago
{Value: 3, TimestampMs: now - 5000}, // 5 seconds ago
}

for _, sample := range samples {
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{sample},
nil,
nil,
cortexpb.API,
)
_, err = ing.Push(ctx, req)
require.NoError(t, err)
}

// Verify metric exists and has 3 observations
metricFamily, err := registry.Gather()
require.NoError(t, err)

var found bool
var sampleCount uint64
var sampleSum float64
for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" && label.GetValue() == userID {
found = true
if metric.Histogram != nil {
sampleCount = metric.Histogram.GetSampleCount()
sampleSum = metric.Histogram.GetSampleSum()
}
}
}
}
}
}

require.True(t, found, "ingestion delay metric not found")
assert.Equal(t, uint64(3), sampleCount, "expected 3 observations")

// Verify delays were actually measured (sum should be positive and reasonable)
// We sent samples 30s, 10s, and 5s old, so total delay should be ~45s (with some execution time added)
assert.Greater(t, sampleSum, 40.0, "sum of delays should be at least 40s")
})

t.Run("future timestamps are filtered", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

userID := "user-future"
ctx := user.InjectOrgID(context.Background(), userID)

// Push sample with future timestamp
futureTimestamp := time.Now().UnixMilli() + 60000 // 60 seconds in the future
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 1, TimestampMs: futureTimestamp}},
nil,
nil,
cortexpb.API,
)

_, err = ing.Push(ctx, req)
require.NoError(t, err)

// Verify metric has 0 observations (future timestamp filtered)
metricFamily, err := registry.Gather()
require.NoError(t, err)

for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" && label.GetValue() == userID {
if metric.Histogram != nil {
assert.Equal(t, uint64(0), metric.Histogram.GetSampleCount(),
"future timestamp should not be observed")
}
}
}
}
}
}
})

t.Run("per-user isolation", func(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

ing, err := prepareIngesterWithBlocksStorage(t, cfg, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return ing.lifecycler.GetState()
})

// Push samples for two users
baseTime := time.Now().UnixMilli()
for _, u := range []struct {
id string
numSamples int
delayMs int64
}{
{"user-a", 2, 5000},
{"user-b", 3, 30000},
} {
ctx := user.InjectOrgID(context.Background(), u.id)
// Send in chronological order (oldest first)
for idx := u.numSamples - 1; idx >= 0; idx-- {
timestamp := baseTime - u.delayMs - int64(idx*1000)
req := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: float64(idx + 1), TimestampMs: timestamp}},
nil,
nil,
cortexpb.API,
)
_, err := ing.Push(ctx, req)
require.NoError(t, err)
}
}

// Verify each user has their own metric
metricFamily, err := registry.Gather()
require.NoError(t, err)

userCounts := make(map[string]uint64)
for _, mf := range metricFamily {
if mf.GetName() == "cortex_ingester_ingestion_delay_seconds" {
for _, metric := range mf.GetMetric() {
for _, label := range metric.GetLabel() {
if label.GetName() == "user" {
userID := label.GetValue()
if metric.Histogram != nil {
userCounts[userID] = metric.Histogram.GetSampleCount()
}
}
}
}
}
}

assert.Equal(t, uint64(2), userCounts["user-a"])
assert.Equal(t, uint64(3), userCounts["user-b"])
})
}

func BenchmarkIngesterPush(b *testing.B) {
limits := defaultLimitsTestConfig()
benchmarkIngesterPush(b, limits, false)
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ingesterMetrics struct {
ingestedExemplarsFail prometheus.Counter
ingestedMetadataFail prometheus.Counter
ingestedHistogramBuckets *prometheus.HistogramVec
ingestionDelaySeconds *prometheus.HistogramVec
oooLabelsTotal *prometheus.CounterVec
queries prometheus.Counter
queriedSamples prometheus.Histogram
Expand Down Expand Up @@ -144,6 +145,14 @@ func newIngesterMetrics(r prometheus.Registerer,
NativeHistogramMinResetDuration: 1,
Buckets: prometheus.ExponentialBuckets(1, 2, 10), // 1 to 512 buckets
}, []string{"user"}),
ingestionDelaySeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ingester_ingestion_delay_seconds",
Help: "Delay in seconds between sample ingestion time and sample timestamp.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1,
Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m
}, []string{"user"}),
oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_out_of_order_labels_total",
Help: "The total number of out of order label found per user.",
Expand Down Expand Up @@ -377,6 +386,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID})
m.ingestedHistogramBuckets.DeleteLabelValues(userID)
m.ingestionDelaySeconds.DeleteLabelValues(userID)

if m.memSeriesCreatedTotal != nil {
m.memSeriesCreatedTotal.DeleteLabelValues(userID)
Expand Down
Loading