Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
* [ENHANCEMENT] Ruler/Ingester: Propagate append hints to discard out of order samples on Ingester #7226
* [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278
* [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics.
* [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293
Expand Down
241 changes: 146 additions & 95 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ message WriteRequest {

bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false];
// When true, indicates that out-of-order samples should be discarded even if OOO is enabled.
bool discard_out_of_order = 1002;
}

// refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto
Expand Down
1 change: 1 addition & 0 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func ReuseWriteRequest(req *PreallocWriteRequest) {
req.Source = 0
req.Metadata = nil
req.Timeseries = nil
req.DiscardOutOfOrder = false
writeRequestPool.Put(req)
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
}
}

return d.send(localCtx, ingester, timeseries, metadata, req.Source)
return d.send(localCtx, ingester, timeseries, metadata, req.Source, req.DiscardOutOfOrder)
}, func() {
cortexpb.ReuseSlice(req.Timeseries)
req.Free()
Expand Down Expand Up @@ -1252,7 +1252,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
})
}

func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum) error {
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum, discardOutOfOrder bool) error {
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
if err != nil {
return err
Expand All @@ -1270,16 +1270,18 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time

if d.cfg.UseStreamPush {
req := &cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
Timeseries: timeseries,
Metadata: metadata,
Source: source,
DiscardOutOfOrder: discardOutOfOrder,
}
_, err = c.PushStreamConnection(ctx, req)
} else {
req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source
req.DiscardOutOfOrder = discardOutOfOrder

_, err = c.PushPreAlloc(ctx, req)

Expand Down
113 changes: 105 additions & 8 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,99 @@ func TestDistributor_Push(t *testing.T) {
}
}

func TestDistributor_Push_DiscardOutOfOrder(t *testing.T) {
t.Parallel()

ctx := user.InjectOrgID(context.Background(), "userDiscardOOO")

tests := []struct {
name string
discardOutOfOrder bool
expectedDiscardOOO bool
useStreamPush bool
}{
{
name: "DiscardOutOfOrder=true with regular push",
discardOutOfOrder: true,
expectedDiscardOOO: true,
useStreamPush: false,
},
{
name: "DiscardOutOfOrder=false with regular push",
discardOutOfOrder: false,
expectedDiscardOOO: false,
useStreamPush: false,
},
{
name: "DiscardOutOfOrder=true with stream push",
discardOutOfOrder: true,
expectedDiscardOOO: true,
useStreamPush: true,
},
{
name: "DiscardOutOfOrder=false with stream push",
discardOutOfOrder: false,
expectedDiscardOOO: false,
useStreamPush: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

limits := &validation.Limits{}
flagext.DefaultValues(limits)

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
useStreamPush: tc.useStreamPush,
})

request := makeWriteRequest(123456789000, 5, 0, 0)
request.DiscardOutOfOrder = tc.discardOutOfOrder

_, err := ds[0].Push(ctx, request)
require.NoError(t, err)

// Poll to ensure all ingesters have received the push before verifying.
test.Poll(t, time.Second, nil, func() any {
for _, ing := range ingesters {
ing.Lock()
pushCalls := ing.calls["Push"]
lastDiscardOOO := ing.lastDiscardOutOfOrder
ing.Unlock()

// Wait for all ingesters to receive the push call
if pushCalls == 0 {
return fmt.Errorf("ingester has not received push yet")
}

// Wait for the DiscardOutOfOrder flag to match expected value
if lastDiscardOOO != tc.expectedDiscardOOO {
return fmt.Errorf("ingester has DiscardOutOfOrder=%v, expected %v", lastDiscardOOO, tc.expectedDiscardOOO)
}
}
return nil
})

// Final assertion: verify all ingesters received the correct DiscardOutOfOrder flag
for _, ing := range ingesters {
ing.Lock()
lastDiscardOOO := ing.lastDiscardOutOfOrder
ing.Unlock()

assert.Equal(t, tc.expectedDiscardOOO, lastDiscardOOO,
"ingester should have received DiscardOutOfOrder=%v", tc.expectedDiscardOOO)
}
})
}
}

func TestDistributor_MetricsCleanup(t *testing.T) {
t.Parallel()
dists, _, regs, r := prepare(t, prepConfig{
Expand Down Expand Up @@ -3604,14 +3697,15 @@ type mockIngester struct {
sync.Mutex
client.IngesterClient
grpc_health_v1.HealthClient
happy atomic.Bool
failResp atomic.Error
stats client.UsersStatsResponse
timeseries map[uint32]*cortexpb.PreallocTimeseries
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
queryDelay time.Duration
calls map[string]int
lblsValues []string
happy atomic.Bool
failResp atomic.Error
stats client.UsersStatsResponse
timeseries map[uint32]*cortexpb.PreallocTimeseries
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
queryDelay time.Duration
calls map[string]int
lblsValues []string
lastDiscardOutOfOrder bool
}

func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester {
Expand Down Expand Up @@ -3682,6 +3776,9 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt

i.trackCall("Push")

// Store the DiscardOutOfOrder flag for test assertions
i.lastDiscardOutOfOrder = req.DiscardOutOfOrder

if !i.happy.Load() {
return nil, i.failResp.Load()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)

// Even when OOO is enabled globally, we want to reject OOO samples in some cases.
// prometheus implementation: https://github.com/prometheus/prometheus/pull/14710
if req.DiscardOutOfOrder {
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
}

var newSeries []labels.Labels

for _, ts := range req.Timeseries {
Expand Down
80 changes: 80 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7863,3 +7863,83 @@ func TestIngester_checkRegexMatcherLimits(t *testing.T) {
})
}
}
func TestIngester_DiscardOutOfOrderFlagIntegration(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute)

i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() any {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// Create labels for our test metric
metricLabels := labels.FromStrings(labels.MetricName, "test_metric", "job", "test")

currentTime := time.Now().UnixMilli()
olderTime := currentTime - 60000 // 1 minute earlier (within OOO window)

// First, push a sample with current timestamp with discardOutOfOrder=true
req1 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 100, TimestampMs: currentTime}},
nil, nil, cortexpb.RULE)
req1.DiscardOutOfOrder = true

_, err = i.Push(ctx, req1)
require.NoError(t, err, "First sample push should succeed")

// Now try to push a sample with older timestamp with discardOutOfOrder=true
// This should be discarded because DiscardOutOfOrder is true
req2 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 50, TimestampMs: olderTime}},
nil, nil, cortexpb.RULE)
req2.DiscardOutOfOrder = true

_, _ = i.Push(ctx, req2)

// Query back the data to ensure only the first (current time) sample was stored
s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: olderTime - 1000,
EndTimestampMs: currentTime + 1000,
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: labels.MetricName, Value: "test_metric"},
},
}, s)
require.NoError(t, err)

// Verify we only have one series with one sample (the current time sample)
require.Len(t, s.series, 1, "Should have exactly one series")

// Convert chunks to samples to verify content
series := s.series[0]
require.Len(t, series.Chunks, 1, "Should have exactly one chunk")

chunk := series.Chunks[0]
chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data)
require.NoError(t, err)

iter := chunkData.Iterator(nil)
sampleCount := 0
for iter.Next() != chunkenc.ValNone {
ts, val := iter.At()
require.Equal(t, currentTime, ts, "Sample timestamp should match current time")
require.Equal(t, 100.0, val, "Sample value should match first push")
sampleCount++
}
require.NoError(t, iter.Err())
require.Equal(t, 1, sampleCount, "Should have exactly one sample stored")
}
11 changes: 10 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PusherAppender struct {
histogramLabels []labels.Labels
histograms []cortexpb.Histogram
userID string
opts *storage.AppendOptions
}

func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
Expand All @@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
return 0, nil
}

func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {}
func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {
a.opts = opts
}

func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only.
Expand All @@ -94,6 +97,12 @@ func (a *PusherAppender) Commit() error {

req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)
req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)

// Set DiscardOutOfOrder flag if requested via AppendOptions
if a.opts != nil && a.opts.DiscardOutOfOrder {
req.DiscardOutOfOrder = true
}

// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)
Expand Down
24 changes: 24 additions & 0 deletions pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -413,3 +414,26 @@ func TestRecordAndReportRuleQueryMetrics(t *testing.T) {
require.Equal(t, testutil.ToFloat64(metrics.RulerQueryChunkBytes.WithLabelValues("userID")), float64(10))
require.Equal(t, testutil.ToFloat64(metrics.RulerQueryDataBytes.WithLabelValues("userID")), float64(14))
}
func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) {
pusher := &fakePusher{response: &cortexpb.WriteResponse{}}
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"})

appender := &PusherAppender{
ctx: context.Background(),
pusher: pusher,
userID: "test-user",
totalWrites: counter,
failedWrites: counter,
labels: []labels.Labels{labels.FromStrings(labels.MetricName, "test_metric")},
samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}},
}

appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})

err := appender.Commit()
require.NoError(t, err)

// Verify that DiscardOutOfOrder was set in the WriteRequest
require.NotNil(t, pusher.request, "WriteRequest should have been sent")
require.True(t, pusher.request.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest")
}
Loading