From 612ad2f6d3b19f62e4a5f149f775e6e9096a8304 Mon Sep 17 00:00:00 2001 From: Sandy Chen Date: Sun, 7 Jun 2026 19:00:27 +0900 Subject: [PATCH 1/2] fix(ruler): stop per-tenant notifier when manager creation fails newManager started the per-user notifier (and its discovery/notification goroutines) and registered it in r.notifiers before creating the rule manager. When managerFactory returned an error the user was never added to r.userManagers, so the removal loop in SyncRuleGroups never stopped the notifier, leaking it and its goroutines until the process exited. The same applied to the applyConfig error path in getOrCreateNotifier. Tear down the partially-initialized notifier and per-user metrics registry on any early return from newManager (disarmed on success), and stop the notifier directly when applyConfig fails. Adds regression tests for both leak paths. Fixes #7595 Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Sandy Chen --- CHANGELOG.md | 1 + pkg/ruler/manager.go | 33 ++++++- pkg/ruler/manager_test.go | 197 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 230 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea57cca48fb..1b11dbad3d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569 * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 +* [BUGFIX] Ruler: Stop the per-tenant notifier (and its Alertmanager service-discovery goroutines) and remove the per-user metrics registry when rule manager creation fails, instead of leaking them until process shutdown. #7597 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 86611201899..5094f8fda2d 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -300,12 +300,37 @@ func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID strin reg := prometheus.NewRegistry() r.userManagerMetrics.AddUserRegistry(userID, reg) + // getOrCreateNotifier starts the per-user notifier (and its discovery and + // notification goroutines) and registers it in r.notifiers before the rule + // manager is created. If anything below fails, this user is never added to + // r.userManagers, so the removal loop in SyncRuleGroups would never stop the + // notifier, leaking it and its goroutines until the process exits (issue + // #7595). Tear the partially-initialized state down on any early return; the + // cleanup is disarmed once a manager is successfully returned. removeNotifier + // takes r.notifiersMtx while this method runs under r.userManagerMtx; those two + // locks are always acquired in that order (userManagerMtx, then notifiersMtx) + // and never the reverse, so this cannot deadlock. removeNotifier is a no-op + // when no notifier was registered. + success := false + defer func() { + if !success { + r.removeNotifier(userID) + r.userManagerMetrics.RemoveUserRegistry(userID) + } + }() + notifier, err := r.getOrCreateNotifier(userID, reg) if err != nil { return nil, err } - return r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg) + manager, err := r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg) + if err != nil { + return nil, err + } + + success = true + return manager, nil } func (r *DefaultMultiTenantManager) removeNotifier(userID string) { @@ -374,6 +399,12 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag // This should never fail, unless there's a programming mistake. if err := n.applyConfig(r.notifierCfg); err != nil { + // n.run() already started the notifier's discovery and notification + // goroutines, but n has not been added to r.notifiers yet, so neither + // removeNotifier nor Stop would ever stop it. Stop it directly to avoid + // leaking those goroutines. We must not call removeNotifier here because + // we already hold r.notifiersMtx, which it would try to re-acquire. + n.stop() return nil, err } diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index 845d0ffc203..a90f40b2567 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -1,13 +1,18 @@ package ruler import ( + "bytes" "context" + "errors" + "runtime/pprof" "sync" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" + promConfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/notifier" @@ -304,6 +309,198 @@ func TestBackupRules(t *testing.T) { require.Equal(t, userRules[user2], m.GetBackupRules(user2)) } +// TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError is a regression test for +// https://github.com/cortexproject/cortex/issues/7595. When the manager factory +// returns an error, newManager has already created and started the per-user +// notifier (via getOrCreateNotifier -> n.run()) and registered it in r.notifiers. +// Because the user is never added to r.userManagers, the removal loop in +// SyncRuleGroups never stops that notifier, so it and its discovery/notification +// goroutines used to leak until the process exited. +func TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError(t *testing.T) { + dir := t.TempDir() + const user = "testUser" + + factoryErr := errors.New("manager factory failed") + failingFactory := func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ *client.Pool, _ prometheus.Registerer) (RulesManager, error) { + return nil, factoryErr + } + + // Use a dedicated registry (not nil): a nil registry registers the notifier + // service-discovery metrics on the global default registerer, which can + // os.Exit(1) on duplicate registration when running alongside other tests. + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, failingFactory, nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + t.Cleanup(m.Stop) + + // Baseline notifier-run goroutines before the (failing) sync. Assert a delta + // back to this baseline rather than an absolute zero so the check is robust to + // any unrelated notifier goroutines from other tests sharing this process. + before := countNotifierRunGoroutines() + + userRules := map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + }, + } + m.SyncRuleGroups(context.Background(), userRules) + + // The factory failed, so the user must not be tracked as a live manager. + require.Nil(t, getManager(m, user)) + + // The notifier must have been stopped and removed from the map. + m.notifiersMtx.Lock() + _, notifierExists := m.notifiers[user] + m.notifiersMtx.Unlock() + require.False(t, notifierExists, "notifier must be removed after a managerFactory error") + + // The per-user metrics registry must have been removed too. + require.False(t, hasUserManagerRegistry(t, m, user), "per-user metrics registry must be removed after a managerFactory error") + + // Its goroutines (started by rulerNotifier.run) must not leak. removeNotifier + // -> stop() -> wg.Wait() is synchronous, so they are gone by now; poll back to + // the baseline to absorb any scheduling latency. + test.Poll(t, 5*time.Second, before, func() interface{} { + return countNotifierRunGoroutines() + }) +} + +// TestGetOrCreateNotifierStopsNotifierOnApplyConfigError is a regression test for +// the secondary leak path in https://github.com/cortexproject/cortex/issues/7595: +// getOrCreateNotifier starts the notifier with n.run() before calling +// n.applyConfig. If applyConfig fails, the notifier was never inserted into +// r.notifiers, so it must be stopped directly to avoid leaking its goroutines. +func TestGetOrCreateNotifierStopsNotifierOnApplyConfigError(t *testing.T) { + const user = "testUser" + m, err := NewDefaultMultiTenantManager(Config{RulePath: t.TempDir()}, &ruleLimits{}, RuleManagerFactory(nil, nil), nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + t.Cleanup(m.Stop) + + // Force applyConfig to fail by pointing the Alertmanager TLS config at a CA + // file that does not exist. + m.notifierCfg = &promConfig.Config{ + AlertingConfig: promConfig.AlertingConfig{ + AlertmanagerConfigs: promConfig.AlertmanagerConfigs{ + { + HTTPClientConfig: config_util.HTTPClientConfig{ + TLSConfig: config_util.TLSConfig{CAFile: "/does/not/exist"}, + }, + APIVersion: promConfig.AlertmanagerAPIVersionV2, + }, + }, + }, + } + + before := countNotifierRunGoroutines() + _, err = m.getOrCreateNotifier(user, prometheus.NewRegistry()) + require.Error(t, err) + + m.notifiersMtx.Lock() + _, ok := m.notifiers[user] + m.notifiersMtx.Unlock() + require.False(t, ok, "notifier must not be registered when applyConfig fails") + + test.Poll(t, 5*time.Second, before, func() interface{} { + return countNotifierRunGoroutines() + }) +} + +// TestSyncRuleGroupsRecoversAfterManagerFactoryError verifies that a user whose +// first manager creation failed — and whose notifier was therefore cleaned up by +// the fix for https://github.com/cortexproject/cortex/issues/7595 — is created +// normally on a later sync once the factory succeeds, i.e. the failure cleanup +// does not leave the user in an unrecoverable state. +func TestSyncRuleGroupsRecoversAfterManagerFactoryError(t *testing.T) { + dir := t.TempDir() + const user = "testUser" + + fail := atomic.NewBool(true) + base := RuleManagerFactory([][]*promRules.Group{{}, {}}, []time.Duration{time.Millisecond, time.Millisecond}) + factory := func(ctx context.Context, userID string, n *notifier.Manager, logger log.Logger, p *client.Pool, reg prometheus.Registerer) (RulesManager, error) { + if fail.Load() { + return nil, errors.New("manager factory failed") + } + return base(ctx, userID, n, logger, p, reg) + } + + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, factory, nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + t.Cleanup(m.Stop) + + before := countNotifierRunGoroutines() + + userRules := map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{Name: "group1", Namespace: "ns", Interval: 1 * time.Minute, User: user}, + }, + } + + // First sync fails: no manager is tracked and the notifier must be cleaned up. + m.SyncRuleGroups(context.Background(), userRules) + require.Nil(t, getManager(m, user)) + m.notifiersMtx.Lock() + _, notifierExists := m.notifiers[user] + m.notifiersMtx.Unlock() + require.False(t, notifierExists) + test.Poll(t, 5*time.Second, before, func() interface{} { + return countNotifierRunGoroutines() + }) + + // Once the factory succeeds, the user is created normally and gets a notifier. + fail.Store(false) + m.SyncRuleGroups(context.Background(), userRules) + require.NotNil(t, getManager(m, user)) + m.notifiersMtx.Lock() + _, notifierExists = m.notifiers[user] + m.notifiersMtx.Unlock() + require.True(t, notifierExists, "notifier should be created when the user recovers") +} + +// countNotifierRunGoroutines returns the number of goroutines currently running +// inside rulerNotifier.run (its discovery and notification loops). It is used to +// assert that a notifier's goroutines have been stopped rather than leaked. +func countNotifierRunGoroutines() int { + var buf bytes.Buffer + _ = pprof.Lookup("goroutine").WriteTo(&buf, 2) + count := 0 + for _, stack := range bytes.Split(buf.Bytes(), []byte("\n\n")) { + // Matches both the run.funcN frames and the "created by ...run" line; each + // stack block is counted at most once, so the count stays accurate even if + // the closure frame is ever inlined away. + if bytes.Contains(stack, []byte("github.com/cortexproject/cortex/pkg/ruler.(*rulerNotifier).run")) { + count++ + } + } + return count +} + +// hasUserManagerRegistry reports whether a per-user metrics registry for the +// given user is still registered with the manager's metrics aggregator. The +// notifier registers prometheus_notifications_* metrics into that per-user +// registry, so its presence is observable via the aggregated, user-labelled +// output. +func hasUserManagerRegistry(t *testing.T, m *DefaultMultiTenantManager, user string) bool { + t.Helper() + tmp := prometheus.NewRegistry() + tmp.MustRegister(m.userManagerMetrics) + mfs, err := tmp.Gather() + require.NoError(t, err) + for _, mf := range mfs { + for _, metric := range mf.GetMetric() { + for _, lp := range metric.GetLabel() { + if lp.GetName() == "user" && lp.GetValue() == user { + return true + } + } + } + } + return false +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.RLock() defer m.userManagerMtx.RUnlock() From 3f862ca7374f60a7c17003f2b791b2e8bd61a20c Mon Sep 17 00:00:00 2001 From: Sandy Chen Date: Sun, 7 Jun 2026 21:35:59 +0900 Subject: [PATCH 2/2] fix(ruler): modernize manager_test.go to fix check-modernize lint The lint job's check-modernize step failed because the notifier-leak regression tests used pre-modern idioms that `make modernize` (golang.org/x/tools modernize@v0.22.0) rewrites: - func() interface{} -> func() any (3x) - for _, x := range bytes.Split(...) -> for x := range bytes.SplitSeq(...) Apply exactly what the pinned modernize tool produces so `make check-modernize` reports a clean tree. No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Sandy Chen --- pkg/ruler/manager_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index a90f40b2567..4ed983af374 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -364,7 +364,7 @@ func TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError(t *testing.T) { // Its goroutines (started by rulerNotifier.run) must not leak. removeNotifier // -> stop() -> wg.Wait() is synchronous, so they are gone by now; poll back to // the baseline to absorb any scheduling latency. - test.Poll(t, 5*time.Second, before, func() interface{} { + test.Poll(t, 5*time.Second, before, func() any { return countNotifierRunGoroutines() }) } @@ -404,7 +404,7 @@ func TestGetOrCreateNotifierStopsNotifierOnApplyConfigError(t *testing.T) { m.notifiersMtx.Unlock() require.False(t, ok, "notifier must not be registered when applyConfig fails") - test.Poll(t, 5*time.Second, before, func() interface{} { + test.Poll(t, 5*time.Second, before, func() any { return countNotifierRunGoroutines() }) } @@ -446,7 +446,7 @@ func TestSyncRuleGroupsRecoversAfterManagerFactoryError(t *testing.T) { _, notifierExists := m.notifiers[user] m.notifiersMtx.Unlock() require.False(t, notifierExists) - test.Poll(t, 5*time.Second, before, func() interface{} { + test.Poll(t, 5*time.Second, before, func() any { return countNotifierRunGoroutines() }) @@ -467,7 +467,7 @@ func countNotifierRunGoroutines() int { var buf bytes.Buffer _ = pprof.Lookup("goroutine").WriteTo(&buf, 2) count := 0 - for _, stack := range bytes.Split(buf.Bytes(), []byte("\n\n")) { + for stack := range bytes.SplitSeq(buf.Bytes(), []byte("\n\n")) { // Matches both the run.funcN frames and the "created by ...run" line; each // stack block is counted at most once, so the count stays accurate even if // the closure frame is ever inlined away.