From c7c60df33c41b1d1eb07d760afff121d9f8f1130 Mon Sep 17 00:00:00 2001 From: Sandy Chen Date: Sun, 7 Jun 2026 21:31:42 +0900 Subject: [PATCH] fix(scheduler): stop FragmentTable cleanup goroutine on shutdown NewFragmentTable starts a periodicCleanup goroutine (driven by a ticker) that had no stop mechanism: the loop ranged over ticker.C forever, the deferred ticker.Stop() was unreachable, and FragmentTable had no Close method, so the goroutine and ticker lived for the whole process lifetime and could not be reclaimed (issue #7596). Add an idempotent Close() (done channel + sync.Once); periodicCleanup now selects on the done channel and returns. The scheduler closes the fragment table in its stopping hook so the goroutine is reclaimed on shutdown. A package goleak guard plus a regression test cover it, and the existing tests now stop the table they create. Fixes #7596 Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Sandy Chen --- CHANGELOG.md | 1 + go.mod | 2 +- .../fragment_table/fragment_table.go | 20 ++++++++++++++-- .../fragment_table/fragment_table_test.go | 24 +++++++++++++++++++ pkg/scheduler/scheduler.go | 2 ++ 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea57cca48fb..fdb35f4b481 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569 * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 +* [BUGFIX] Query Scheduler: Stop the fragment table's background cleanup goroutine (and its ticker) on shutdown; it previously had no stop mechanism and lived for the process lifetime. #7599 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/go.mod b/go.mod index ff195426da8..c2fbfc150bf 100644 --- a/go.mod +++ b/go.mod @@ -96,6 +96,7 @@ require ( github.com/tjhop/slog-gokit v0.1.4 go.opentelemetry.io/collector/pdata v1.45.0 go.uber.org/automaxprocs v1.6.0 + go.uber.org/goleak v1.3.0 google.golang.org/protobuf v1.36.11 ) @@ -283,7 +284,6 @@ require ( go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect diff --git a/pkg/scheduler/fragment_table/fragment_table.go b/pkg/scheduler/fragment_table/fragment_table.go index 3cf161070b8..916dc9498bf 100644 --- a/pkg/scheduler/fragment_table/fragment_table.go +++ b/pkg/scheduler/fragment_table/fragment_table.go @@ -18,6 +18,8 @@ type FragmentTable struct { mappings map[distributed_execution.FragmentKey]*fragmentEntry mu sync.RWMutex expiration time.Duration + done chan struct{} + closeOnce sync.Once } // NewFragmentTable creates a new FragmentTable with the specified expiration duration. @@ -27,6 +29,7 @@ func NewFragmentTable(expiration time.Duration) *FragmentTable { ft := &FragmentTable{ mappings: make(map[distributed_execution.FragmentKey]*fragmentEntry), expiration: expiration, + done: make(chan struct{}), } go ft.periodicCleanup() return ft @@ -55,6 +58,14 @@ func (f *FragmentTable) GetAddrByID(queryID uint64, fragmentID uint64) (string, return "", false } +// Close stops the background cleanup goroutine started by NewFragmentTable. +// It is safe to call more than once. +func (f *FragmentTable) Close() { + f.closeOnce.Do(func() { + close(f.done) + }) +} + func (f *FragmentTable) cleanupExpired() { f.mu.Lock() defer f.mu.Unlock() @@ -73,7 +84,12 @@ func (f *FragmentTable) cleanupExpired() { func (f *FragmentTable) periodicCleanup() { ticker := time.NewTicker(f.expiration / 2) defer ticker.Stop() - for range ticker.C { - f.cleanupExpired() + for { + select { + case <-ticker.C: + f.cleanupExpired() + case <-f.done: + return + } } } diff --git a/pkg/scheduler/fragment_table/fragment_table_test.go b/pkg/scheduler/fragment_table/fragment_table_test.go index 576e2726455..641db5c0485 100644 --- a/pkg/scheduler/fragment_table/fragment_table_test.go +++ b/pkg/scheduler/fragment_table/fragment_table_test.go @@ -7,8 +7,15 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) +// TestMain runs the package tests under goleak so that any FragmentTable whose +// cleanup goroutine is not stopped (via Close) fails the package. +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + func TestNewFragmentTable(t *testing.T) { tests := []struct { name string @@ -27,6 +34,7 @@ func TestNewFragmentTable(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ft := NewFragmentTable(tt.expiration) + t.Cleanup(ft.Close) require.NotNil(t, ft) require.NotNil(t, ft.mappings) assert.Equal(t, tt.expiration, ft.expiration) @@ -36,6 +44,7 @@ func TestNewFragmentTable(t *testing.T) { func TestFragmentTable_AddAndGetAddress(t *testing.T) { ft := NewFragmentTable(time.Hour) + t.Cleanup(ft.Close) tests := []struct { name string @@ -84,6 +93,7 @@ func TestFragmentTable_AddAndGetAddress(t *testing.T) { func TestFragmentTable_Expiration(t *testing.T) { expiration := 100 * time.Millisecond ft := NewFragmentTable(expiration) + t.Cleanup(ft.Close) t.Run("entries expire after timeout", func(t *testing.T) { ft.AddAddressByID(1, 1, "addr1") @@ -106,6 +116,7 @@ func TestFragmentTable_Expiration(t *testing.T) { func TestFragmentTable_ConcurrentAccess(t *testing.T) { ft := NewFragmentTable(time.Hour) + t.Cleanup(ft.Close) const ( numGoroutines = 10 @@ -140,6 +151,7 @@ func TestFragmentTable_ConcurrentAccess(t *testing.T) { func TestFragmentTable_PeriodicCleanup(t *testing.T) { expiration := 100 * time.Millisecond ft := NewFragmentTable(expiration) + t.Cleanup(ft.Close) ft.AddAddressByID(1, 1, "addr1") ft.AddAddressByID(1, 2, "addr2") @@ -163,3 +175,15 @@ func TestFragmentTable_PeriodicCleanup(t *testing.T) { _, ok = ft.GetAddrByID(1, 2) require.False(t, ok) } + +// TestFragmentTable_Close verifies that Close stops the background cleanup +// goroutine — asserted package-wide by goleak in TestMain — and is safe to call +// more than once. +func TestFragmentTable_Close(t *testing.T) { + ft := NewFragmentTable(time.Hour) + + ft.Close() + + // Close is idempotent: a second call must not panic (e.g. double close). + require.NotPanics(t, ft.Close) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3539c01e6bc..9285c9ce8b8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -695,6 +695,8 @@ func (s *Scheduler) running(ctx context.Context) error { // Close the Scheduler. func (s *Scheduler) stopping(_ error) error { + // Stop the fragment table's background cleanup goroutine. + s.fragmentTable.Close() // This will also stop the requests queue, which stop accepting new requests and errors out any tracked requests. return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) }