diff --git a/CHANGELOG.md b/CHANGELOG.md index ea57cca48fb..fdb35f4b481 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569 * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 +* [BUGFIX] Query Scheduler: Stop the fragment table's background cleanup goroutine (and its ticker) on shutdown; it previously had no stop mechanism and lived for the process lifetime. #7599 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/go.mod b/go.mod index ff195426da8..c2fbfc150bf 100644 --- a/go.mod +++ b/go.mod @@ -96,6 +96,7 @@ require ( github.com/tjhop/slog-gokit v0.1.4 go.opentelemetry.io/collector/pdata v1.45.0 go.uber.org/automaxprocs v1.6.0 + go.uber.org/goleak v1.3.0 google.golang.org/protobuf v1.36.11 ) @@ -283,7 +284,6 @@ require ( go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect diff --git a/pkg/scheduler/fragment_table/fragment_table.go b/pkg/scheduler/fragment_table/fragment_table.go index 3cf161070b8..916dc9498bf 100644 --- a/pkg/scheduler/fragment_table/fragment_table.go +++ b/pkg/scheduler/fragment_table/fragment_table.go @@ -18,6 +18,8 @@ type FragmentTable struct { mappings map[distributed_execution.FragmentKey]*fragmentEntry mu sync.RWMutex expiration time.Duration + done chan struct{} + closeOnce sync.Once } // NewFragmentTable creates a new FragmentTable with the specified expiration duration. @@ -27,6 +29,7 @@ func NewFragmentTable(expiration time.Duration) *FragmentTable { ft := &FragmentTable{ mappings: make(map[distributed_execution.FragmentKey]*fragmentEntry), expiration: expiration, + done: make(chan struct{}), } go ft.periodicCleanup() return ft @@ -55,6 +58,14 @@ func (f *FragmentTable) GetAddrByID(queryID uint64, fragmentID uint64) (string, return "", false } +// Close stops the background cleanup goroutine started by NewFragmentTable. +// It is safe to call more than once. +func (f *FragmentTable) Close() { + f.closeOnce.Do(func() { + close(f.done) + }) +} + func (f *FragmentTable) cleanupExpired() { f.mu.Lock() defer f.mu.Unlock() @@ -73,7 +84,12 @@ func (f *FragmentTable) cleanupExpired() { func (f *FragmentTable) periodicCleanup() { ticker := time.NewTicker(f.expiration / 2) defer ticker.Stop() - for range ticker.C { - f.cleanupExpired() + for { + select { + case <-ticker.C: + f.cleanupExpired() + case <-f.done: + return + } } } diff --git a/pkg/scheduler/fragment_table/fragment_table_test.go b/pkg/scheduler/fragment_table/fragment_table_test.go index 576e2726455..641db5c0485 100644 --- a/pkg/scheduler/fragment_table/fragment_table_test.go +++ b/pkg/scheduler/fragment_table/fragment_table_test.go @@ -7,8 +7,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) +// TestMain runs the package tests under goleak so that any FragmentTable whose +// cleanup goroutine is not stopped (via Close) fails the package. +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + func TestNewFragmentTable(t *testing.T) { tests := []struct { name string @@ -27,6 +34,7 @@ func TestNewFragmentTable(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ft := NewFragmentTable(tt.expiration) + t.Cleanup(ft.Close) require.NotNil(t, ft) require.NotNil(t, ft.mappings) assert.Equal(t, tt.expiration, ft.expiration) @@ -36,6 +44,7 @@ func TestNewFragmentTable(t *testing.T) { func TestFragmentTable_AddAndGetAddress(t *testing.T) { ft := NewFragmentTable(time.Hour) + t.Cleanup(ft.Close) tests := []struct { name string @@ -84,6 +93,7 @@ func TestFragmentTable_AddAndGetAddress(t *testing.T) { func TestFragmentTable_Expiration(t *testing.T) { expiration := 100 * time.Millisecond ft := NewFragmentTable(expiration) + t.Cleanup(ft.Close) t.Run("entries expire after timeout", func(t *testing.T) { ft.AddAddressByID(1, 1, "addr1") @@ -106,6 +116,7 @@ func TestFragmentTable_Expiration(t *testing.T) { func TestFragmentTable_ConcurrentAccess(t *testing.T) { ft := NewFragmentTable(time.Hour) + t.Cleanup(ft.Close) const ( numGoroutines = 10 @@ -140,6 +151,7 @@ func TestFragmentTable_ConcurrentAccess(t *testing.T) { func TestFragmentTable_PeriodicCleanup(t *testing.T) { expiration := 100 * time.Millisecond ft := NewFragmentTable(expiration) + t.Cleanup(ft.Close) ft.AddAddressByID(1, 1, "addr1") ft.AddAddressByID(1, 2, "addr2") @@ -163,3 +175,15 @@ func TestFragmentTable_PeriodicCleanup(t *testing.T) { _, ok = ft.GetAddrByID(1, 2) require.False(t, ok) } + +// TestFragmentTable_Close verifies that Close stops the background cleanup +// goroutine — asserted package-wide by goleak in TestMain — and is safe to call +// more than once. +func TestFragmentTable_Close(t *testing.T) { + ft := NewFragmentTable(time.Hour) + + ft.Close() + + // Close is idempotent: a second call must not panic (e.g. double close). + require.NotPanics(t, ft.Close) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3539c01e6bc..9285c9ce8b8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -695,6 +695,8 @@ func (s *Scheduler) running(ctx context.Context) error { // Close the Scheduler. func (s *Scheduler) stopping(_ error) error { + // Stop the fragment table's background cleanup goroutine. + s.fragmentTable.Close() // This will also stop the requests queue, which stop accepting new requests and errors out any tracked requests. return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) }