Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -1701,6 +1702,161 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
}

// TestRW1vsRW2QueryFuzz pushes the same time series data to two isolated Cortex instances,
// one via PRW1 and one via PRW2, then uses promqlsmith to generate random PromQL queries,
// and verifies that both instances return identical results.
func TestRW1vsRW2QueryFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul1 := e2edb.NewConsulWithName("consul-rw1")
consul2 := e2edb.NewConsulWithName("consul-rw2")
require.NoError(t, s.StartAndWaitReady(consul1, consul2))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.tsdb.retention-period": "2h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
// Ingester.
"-ring.store": "consul",
// Distributor.
"-distributor.replication-factor": "1",
"-distributor.remote-writev2-enabled": "true",
// Alert manager.
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path1 := path.Join(s.SharedDir(), "cortex-rw1")
path2 := path.Join(s.SharedDir(), "cortex-rw2")

flags1 := mergeFlags(flags, map[string]string{
"-blocks-storage.filesystem.dir": path1,
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
})
flags2 := mergeFlags(flags, map[string]string{
"-blocks-storage.filesystem.dir": path2,
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
})

cortexRW1 := e2ecortex.NewSingleBinary("cortex-rw1", flags1, "")
cortexRW2 := e2ecortex.NewSingleBinary("cortex-rw2", flags2, "")
require.NoError(t, s.StartAndWaitReady(cortexRW1, cortexRW2))

require.NoError(t, cortexRW1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
require.NoError(t, cortexRW2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c1, err := e2ecortex.NewClient(cortexRW1.HTTPEndpoint(), cortexRW1.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
c2, err := e2ecortex.NewClient(cortexRW2.HTTPEndpoint(), cortexRW2.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()
start := now.Add(-time.Hour * 2)
end := now.Add(-time.Hour)
numSeries := 3
numSamples := 60
scrapeInterval := time.Minute

// Generate the same series data once as prompb (PRW1 format).
// The exact same timestamps/values will be used for PRW2 via convertSeriesToPRW2.
lbls := make([]labels.Labels, numSeries*2)
serieses := make([]prompb.TimeSeries, numSeries*2)

for i := 0; i < numSeries; i++ {
series := e2e.GenerateSeriesWithSamples("test_series_a", start, scrapeInterval, i*numSamples, numSamples,
prompb.Label{Name: "job", Value: "test"},
prompb.Label{Name: "series", Value: strconv.Itoa(i)},
)
serieses[i] = series
builder := labels.NewBuilder(labels.EmptyLabels())
for _, lbl := range series.Labels {
builder.Set(lbl.Name, lbl.Value)
}
lbls[i] = builder.Labels()
}
for i := numSeries; i < 2*numSeries; i++ {
prompbLabels := []prompb.Label{
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa(i)},
}
switch i % 3 {
case 0:
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "200"})
case 1:
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "400"})
default:
prompbLabels = append(prompbLabels, prompb.Label{Name: "status_code", Value: "500"})
}
series := e2e.GenerateSeriesWithSamples("test_series_b", start, scrapeInterval, i*numSamples, numSamples, prompbLabels...)
serieses[i] = series
builder := labels.NewBuilder(labels.EmptyLabels())
for _, lbl := range series.Labels {
builder.Set(lbl.Name, lbl.Value)
}
lbls[i] = builder.Labels()
}

// Push via PRW1 to cortex-rw1.
res, err := c1.Push(serieses)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Convert the same series to PRW2 format and push to cortex-rw2.
symbols, v2Series := convertSeriesToPRW2(serieses)
_, err = c2.PushV2(symbols, v2Series)
Comment thread
friedrichg marked this conversation as resolved.
require.NoError(t, err)

seed := now.Unix()
rnd := rand.New(rand.NewSource(seed))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rnd := rand.New(rand.NewSource(seed))
rnd := rand.New(rand.NewSource(seed))
ctx := context.Background()
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)

This is to ensure we get all series before testing


ctx := context.Background()
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)

opts := []promqlsmith.Option{
promqlsmith.WithEnabledFunctions(enabledFunctions),
promqlsmith.WithEnabledAggrs(enabledAggrs),
promqlsmith.WithEnableExperimentalPromQLFunctions(true),
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
}

func convertSeriesToPRW2(timeSeries []prompb.TimeSeries) ([]string, []writev2.TimeSeries) {
st := writev2.NewSymbolTable()
v2Series := make([]writev2.TimeSeries, 0, len(timeSeries))

for _, ts := range timeSeries {
lb := labels.NewScratchBuilder(len(ts.Labels))
for _, l := range ts.Labels {
lb.Add(l.Name, l.Value)
}
lb.Sort()

samples := make([]writev2.Sample, len(ts.Samples))
for i, s := range ts.Samples {
samples[i] = writev2.Sample{Timestamp: s.Timestamp, Value: s.Value}
}

v2Series = append(v2Series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
Samples: samples,
})
}

return st.Symbols(), v2Series
}

// waitUntilReady is a helper function to wait and check if both servers to test load the expected data.
func waitUntilReady(t *testing.T, ctx context.Context, c1, c2 *e2ecortex.Client, query string, start, end time.Time) {
retries := backoff.New(ctx, backoff.Config{
Expand Down
Loading