diff --git a/CHANGELOG.md b/CHANGELOG.md index 66d2be50ef2..82746f134b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 +* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 5ceb537848d..f75c27c1e71 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -313,6 +313,23 @@ querier: # Eval time threshold above which a timeout is classified as user error (4XX). # CLI flag: -querier.timeout-classification-eval-threshold [timeout_classification_eval_threshold: | default = 1m30s] + + query_protection: + rejection: + threshold: + # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization + [heap_utilization: | default = 0] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2dc7d8bfe12..146170047c6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4989,6 +4989,23 @@ thanos_engine: # Eval time threshold above which a timeout is classified as user error (4XX). # CLI flag: -querier.timeout-classification-eval-threshold [timeout_classification_eval_threshold: | default = 1m30s] + +query_protection: + rejection: + threshold: + # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, between + # 0 and 1. monitored_resources config must include the resource type. 0 to + # disable. + # CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, between + # 0 and 1. monitored_resources config must include the resource type. 0 to + # disable. + # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization + [heap_utilization: | default = 0] ``` ### `query_frontend_config` diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 08dcd2d3fbb..36eb3779313 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -229,7 +229,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.Distributor.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid distributor config") } - if err := c.Querier.Validate(); err != nil { + if err := c.Querier.Validate(c.ResourceMonitor.Resources); err != nil { return errors.Wrap(err, "invalid querier config") } if c.Querier.TimeoutClassificationEnabled && !c.Frontend.Handler.QueryStatsEnabled { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 805c2095429..6e6cfc523be 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -284,7 +284,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData) + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor) // Use distributor as default MetadataQuerier t.MetadataQuerier = t.Distributor @@ -701,7 +701,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) } else { // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData) + queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil) } managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) @@ -949,7 +949,7 @@ func (t *Cortex) setupModuleManager() error { Ingester: {IngesterService, OverridesConfig, API}, IngesterService: {OverridesConfig, RuntimeConfig, MemberlistKV, ResourceMonitor}, Flusher: {OverridesConfig, API}, - Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV}, + Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV, ResourceMonitor}, Querier: {TenantFederation}, StoreQueryable: {OverridesConfig, OverridesConfig, MemberlistKV, GrpcClientService}, QueryFrontendTripperware: {API, OverridesConfig}, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index c04d320268d..8cb23cf3112 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" + "github.com/cortexproject/cortex/pkg/configs" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/lazyquery" @@ -32,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/parquetutil" + "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -103,6 +105,9 @@ type Config struct { TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"` TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"` TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"` + + // Query protection: resource-based rejection. + QueryProtection configs.QueryProtection `yaml:"query_protection"` } var ( @@ -161,10 +166,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.") f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", time.Minute+59*time.Second, "The total time before the querier proactively cancels a query for timeout classification. Set this a few seconds less than the querier timeout.") f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", time.Minute+30*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).") + cfg.QueryProtection.RegisterFlagsWithPrefix(f, "querier.") } // Validate the config -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(monitoredResources flagext.StringSliceCSV) error { if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" && cfg.ResponseCompression != "snappy" && cfg.ResponseCompression != "zstd" { return errUnsupportedResponseCompression @@ -207,6 +213,10 @@ func (cfg *Config) Validate() error { return err } + if err := cfg.QueryProtection.Validate(monitoredResources); err != nil { + return err + } + return nil } @@ -223,9 +233,28 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { } // New builds a queryable and promql engine. -func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { +func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) + // Create resource-based limiter if resource monitor is available and thresholds are configured. + var resourceBasedLimiter *limiter.ResourceBasedLimiter + if resourceMonitor != nil { + resourceLimits := make(map[resource.Type]float64) + if cfg.QueryProtection.Rejection.Threshold.CPUUtilization > 0 { + resourceLimits[resource.CPU] = cfg.QueryProtection.Rejection.Threshold.CPUUtilization + } + if cfg.QueryProtection.Rejection.Threshold.HeapUtilization > 0 { + resourceLimits[resource.Heap] = cfg.QueryProtection.Rejection.Threshold.HeapUtilization + } + if len(resourceLimits) > 0 { + var err error + resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, reg, "querier") + if err != nil { + level.Error(logger).Log("msg", "failed to create resource based limiter for querier", "err", err) + } + } + } + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil) ns := make([]QueryableWithFilter, len(stores)) @@ -235,7 +264,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor limits: limits, } } - queryable := NewQueryable(distributorQueryable, ns, cfg, limits) + queryable := NewQueryable(distributorQueryable, ns, cfg, limits, resourceBasedLimiter, logger) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -311,7 +340,7 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides, resourceBasedLimiter *limiter.ResourceBasedLimiter, logger log.Logger) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ now: time.Now(), @@ -324,6 +353,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, distributor: distributor, stores: stores, limiterHolder: &limiterHolder{}, + resourceBasedLimiter: resourceBasedLimiter, + logger: logger, } return q, nil @@ -339,6 +370,8 @@ type querier struct { distributor QueryableWithFilter stores []QueryableWithFilter limiterHolder *limiterHolder + resourceBasedLimiter *limiter.ResourceBasedLimiter + logger log.Logger ignoreMaxQueryLength bool } @@ -390,6 +423,11 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_st // Select implements storage.Querier interface. // The bool passed is ignored because the series is always sorted. func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + // Check resource utilization before processing the query. + if err := q.checkResourceUtilization(); err != nil { + return storage.ErrSeriesSet(err) + } + ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx) if err == errEmptyTimeRange { return storage.EmptySeriesSet() @@ -490,6 +528,11 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select // LabelValues implements storage.Querier. func (q querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + // Check resource utilization before processing the query. + if err := q.checkResourceUtilization(); err != nil { + return nil, nil, err + } + ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx) if err == errEmptyTimeRange { return nil, nil, nil @@ -558,6 +601,11 @@ func (q querier) LabelValues(ctx context.Context, name string, hints *storage.La } func (q querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + // Check resource utilization before processing the query. + if err := q.checkResourceUtilization(); err != nil { + return nil, nil, err + } + ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx) if err == errEmptyTimeRange { return nil, nil, nil @@ -625,6 +673,19 @@ func (q querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matc return strutil.MergeSlices(limit, sets...), warnings, nil } +func (q querier) checkResourceUtilization() error { + if q.resourceBasedLimiter == nil { + return nil + } + + if err := q.resourceBasedLimiter.AcceptNewRequest(); err != nil { + level.Warn(q.logger).Log("msg", "querier failed to accept request due to resource utilization", "err", err) + return limiter.ErrResourceLimitReached + } + + return nil +} + func (querier) Close() error { return nil } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c623aa5abf4..e8ee70e5581 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -40,6 +40,8 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -337,7 +339,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger()) opts := promql.EngineOpts{ Logger: promslog.NewNopLogger(), MaxSamples: 1e6, @@ -528,7 +530,7 @@ func TestLimits(t *testing.T) { } overrides := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) - queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger()) opts := promql.EngineOpts{ Logger: promslog.NewNopLogger(), MaxSamples: 1e6, @@ -584,7 +586,7 @@ func TestQuerier(t *testing.T) { overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) testRangeQuery(t, queryable, queryEngine, through, query, enc) }) } @@ -605,7 +607,7 @@ func TestQuerierMetric(t *testing.T) { queryables := []QueryableWithFilter{} r := prometheus.NewRegistry() reg := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, r) - New(cfg, overrides, distributor, queryables, reg, log.NewNopLogger(), nil) + New(cfg, overrides, distributor, queryables, reg, log.NewNopLogger(), nil, nil) assert.NoError(t, promutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_max_concurrent_queries The maximum number of concurrent queries. # TYPE cortex_max_concurrent_queries gauge @@ -688,7 +690,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { overrides := validation.NewOverrides(limits, nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -781,7 +783,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "0") queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) require.NoError(t, err) @@ -873,7 +875,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) queryEngine := promql.NewEngine(opts) ctx := user.InjectOrgID(context.Background(), "test") @@ -911,7 +913,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Series(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") now := time.Now() @@ -969,7 +971,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") @@ -1119,7 +1121,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor := &MockDistributor{} distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, testData.query, testData.queryStartTime, testData.queryEndTime, time.Minute) require.NoError(t, err) @@ -1147,7 +1149,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1188,7 +1190,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1216,7 +1218,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1243,7 +1245,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1580,7 +1582,7 @@ func TestShortTermQueryToLTS(t *testing.T) { limits.QueryStoreAfter = model.Duration(c.queryStoreAfter) overrides := validation.NewOverrides(limits, nil) - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "0") query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -1754,7 +1756,7 @@ func TestConfig_Validate(t *testing.T) { flagext.DefaultValues(cfg) testData.setup(cfg) - assert.Equal(t, testData.expected, cfg.Validate()) + assert.Equal(t, testData.expected, cfg.Validate(nil)) }) } } @@ -1900,7 +1902,7 @@ func TestQuerier_ProjectionHints(t *testing.T) { wDistributorQueryable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: distributorQueryable} - queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides) + queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides, nil, log.NewNopLogger()) q, err := queryable.Querier(util.TimeToMillis(start), util.TimeToMillis(end)) require.NoError(t, err) @@ -1935,3 +1937,81 @@ func TestQuerier_ProjectionHints(t *testing.T) { }) } } + +func TestQuerier_ResourceBasedLimiter(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.MaxQueryIntoFuture = 0 + + limits := DefaultLimitsConfig() + overrides := validation.NewOverrides(limits, nil) + + mockMonitor := &limiter.MockMonitor{ + CpuUtilization: 0.9, + HeapUtilization: 0.9, + } + + resourceLimits := map[resource.Type]float64{ + resource.CPU: 0.8, + resource.Heap: 0.8, + } + + resourceBasedLimiter, err := limiter.NewResourceBasedLimiter(mockMonitor, resourceLimits, nil, "querier") + require.NoError(t, err) + + chunkStore := &errDistributor{} + distributorQueryable := newDistributorQueryable(chunkStore, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, overrides, nil) + + queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, resourceBasedLimiter, log.NewNopLogger()) + + ctx := user.InjectOrgID(context.Background(), "test") + q, err := queryable.Querier(util.TimeToMillis(time.Now().Add(-1*time.Hour)), util.TimeToMillis(time.Now())) + require.NoError(t, err) + + // Select should fail due to resource limit. + ss := q.Select(ctx, true, &storage.SelectHints{Start: util.TimeToMillis(time.Now().Add(-1 * time.Hour)), End: util.TimeToMillis(time.Now())}) + require.Error(t, ss.Err()) + require.ErrorIs(t, ss.Err(), limiter.ErrResourceLimitReached) + + // LabelValues should fail due to resource limit. + _, _, err = q.LabelValues(ctx, "__name__", nil) + require.Error(t, err) + require.ErrorIs(t, err, limiter.ErrResourceLimitReached) + + // LabelNames should fail due to resource limit. + _, _, err = q.LabelNames(ctx, nil) + require.Error(t, err) + require.ErrorIs(t, err, limiter.ErrResourceLimitReached) +} + +func TestQuerier_ResourceBasedLimiter_Nil(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.MaxQueryIntoFuture = 0 + + limits := DefaultLimitsConfig() + overrides := validation.NewOverrides(limits, nil) + + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) + distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) + + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, overrides, nil) + + // nil resourceBasedLimiter should not block queries. + queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, nil, log.NewNopLogger()) + + ctx := user.InjectOrgID(context.Background(), "test") + q, err := queryable.Querier(util.TimeToMillis(time.Now().Add(-1*time.Hour)), util.TimeToMillis(time.Now())) + require.NoError(t, err) + + ss := q.Select(ctx, true, &storage.SelectHints{Start: util.TimeToMillis(time.Now().Add(-1 * time.Hour)), End: util.TimeToMillis(time.Now())}) + require.NoError(t, ss.Err()) + + _, _, err = q.LabelValues(ctx, "__name__", nil) + require.NoError(t, err) + + _, _, err = q.LabelNames(ctx, nil) + require.NoError(t, err) +} diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index e5738945cb4..b9f30118e88 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -227,7 +227,7 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg querierTestConfig.Cfg.ActiveQueryTrackerDir = "" overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil) + q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil, nil) return func(mint, maxt int64) (storage.Querier, error) { return q.Querier(mint, maxt) } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index feb746bdb22..4a64ff0d2d1 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -6034,6 +6034,33 @@ "type": "boolean", "x-cli-flag": "querier.per-step-stats-enabled" }, + "query_protection": { + "properties": { + "rejection": { + "properties": { + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.rejection.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.rejection.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + } + }, + "type": "object" + }, "response_compression": { "default": "gzip", "description": "Use compression for metrics query API or instant and range query APIs. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)",