diff --git a/CHANGELOG.md b/CHANGELOG.md index ea57cca48f..1b11dbad3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569 * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 +* [BUGFIX] Ruler: Stop the per-tenant notifier (and its Alertmanager service-discovery goroutines) and remove the per-user metrics registry when rule manager creation fails, instead of leaking them until process shutdown. #7597 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 8661120189..5094f8fda2 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -300,12 +300,37 @@ func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID strin reg := prometheus.NewRegistry() r.userManagerMetrics.AddUserRegistry(userID, reg) + // getOrCreateNotifier starts the per-user notifier (and its discovery and + // notification goroutines) and registers it in r.notifiers before the rule + // manager is created. If anything below fails, this user is never added to + // r.userManagers, so the removal loop in SyncRuleGroups would never stop the + // notifier, leaking it and its goroutines until the process exits (issue + // #7595). Tear the partially-initialized state down on any early return; the + // cleanup is disarmed once a manager is successfully returned. removeNotifier + // takes r.notifiersMtx while this method runs under r.userManagerMtx; those two + // locks are always acquired in that order (userManagerMtx, then notifiersMtx) + // and never the reverse, so this cannot deadlock. removeNotifier is a no-op + // when no notifier was registered. + success := false + defer func() { + if !success { + r.removeNotifier(userID) + r.userManagerMetrics.RemoveUserRegistry(userID) + } + }() + notifier, err := r.getOrCreateNotifier(userID, reg) if err != nil { return nil, err } - return r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg) + manager, err := r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg) + if err != nil { + return nil, err + } + + success = true + return manager, nil } func (r *DefaultMultiTenantManager) removeNotifier(userID string) { @@ -374,6 +399,12 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag // This should never fail, unless there's a programming mistake. if err := n.applyConfig(r.notifierCfg); err != nil { + // n.run() already started the notifier's discovery and notification + // goroutines, but n has not been added to r.notifiers yet, so neither + // removeNotifier nor Stop would ever stop it. Stop it directly to avoid + // leaking those goroutines. We must not call removeNotifier here because + // we already hold r.notifiersMtx, which it would try to re-acquire. + n.stop() return nil, err } diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index 845d0ffc20..4ed983af37 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -1,13 +1,18 @@ package ruler import ( + "bytes" "context" + "errors" + "runtime/pprof" "sync" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" + promConfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/notifier" @@ -304,6 +309,198 @@ func TestBackupRules(t *testing.T) { require.Equal(t, userRules[user2], m.GetBackupRules(user2)) } +// TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError is a regression test for +// https://github.com/cortexproject/cortex/issues/7595. When the manager factory +// returns an error, newManager has already created and started the per-user +// notifier (via getOrCreateNotifier -> n.run()) and registered it in r.notifiers. +// Because the user is never added to r.userManagers, the removal loop in +// SyncRuleGroups never stops that notifier, so it and its discovery/notification +// goroutines used to leak until the process exited. +func TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError(t *testing.T) { + dir := t.TempDir() + const user = "testUser" + + factoryErr := errors.New("manager factory failed") + failingFactory := func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ *client.Pool, _ prometheus.Registerer) (RulesManager, error) { + return nil, factoryErr + } + + // Use a dedicated registry (not nil): a nil registry registers the notifier + // service-discovery metrics on the global default registerer, which can + // os.Exit(1) on duplicate registration when running alongside other tests. + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, failingFactory, nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + t.Cleanup(m.Stop) + + // Baseline notifier-run goroutines before the (failing) sync. Assert a delta + // back to this baseline rather than an absolute zero so the check is robust to + // any unrelated notifier goroutines from other tests sharing this process. + before := countNotifierRunGoroutines() + + userRules := map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + }, + } + m.SyncRuleGroups(context.Background(), userRules) + + // The factory failed, so the user must not be tracked as a live manager. + require.Nil(t, getManager(m, user)) + + // The notifier must have been stopped and removed from the map. + m.notifiersMtx.Lock() + _, notifierExists := m.notifiers[user] + m.notifiersMtx.Unlock() + require.False(t, notifierExists, "notifier must be removed after a managerFactory error") + + // The per-user metrics registry must have been removed too. + require.False(t, hasUserManagerRegistry(t, m, user), "per-user metrics registry must be removed after a managerFactory error") + + // Its goroutines (started by rulerNotifier.run) must not leak. removeNotifier + // -> stop() -> wg.Wait() is synchronous, so they are gone by now; poll back to + // the baseline to absorb any scheduling latency. + test.Poll(t, 5*time.Second, before, func() any { + return countNotifierRunGoroutines() + }) +} + +// TestGetOrCreateNotifierStopsNotifierOnApplyConfigError is a regression test for +// the secondary leak path in https://github.com/cortexproject/cortex/issues/7595: +// getOrCreateNotifier starts the notifier with n.run() before calling +// n.applyConfig. If applyConfig fails, the notifier was never inserted into +// r.notifiers, so it must be stopped directly to avoid leaking its goroutines. +func TestGetOrCreateNotifierStopsNotifierOnApplyConfigError(t *testing.T) { + const user = "testUser" + m, err := NewDefaultMultiTenantManager(Config{RulePath: t.TempDir()}, &ruleLimits{}, RuleManagerFactory(nil, nil), nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + t.Cleanup(m.Stop) + + // Force applyConfig to fail by pointing the Alertmanager TLS config at a CA + // file that does not exist. + m.notifierCfg = &promConfig.Config{ + AlertingConfig: promConfig.AlertingConfig{ + AlertmanagerConfigs: promConfig.AlertmanagerConfigs{ + { + HTTPClientConfig: config_util.HTTPClientConfig{ + TLSConfig: config_util.TLSConfig{CAFile: "/does/not/exist"}, + }, + APIVersion: promConfig.AlertmanagerAPIVersionV2, + }, + }, + }, + } + + before := countNotifierRunGoroutines() + _, err = m.getOrCreateNotifier(user, prometheus.NewRegistry()) + require.Error(t, err) + + m.notifiersMtx.Lock() + _, ok := m.notifiers[user] + m.notifiersMtx.Unlock() + require.False(t, ok, "notifier must not be registered when applyConfig fails") + + test.Poll(t, 5*time.Second, before, func() any { + return countNotifierRunGoroutines() + }) +} + +// TestSyncRuleGroupsRecoversAfterManagerFactoryError verifies that a user whose +// first manager creation failed — and whose notifier was therefore cleaned up by +// the fix for https://github.com/cortexproject/cortex/issues/7595 — is created +// normally on a later sync once the factory succeeds, i.e. the failure cleanup +// does not leave the user in an unrecoverable state. +func TestSyncRuleGroupsRecoversAfterManagerFactoryError(t *testing.T) { + dir := t.TempDir() + const user = "testUser" + + fail := atomic.NewBool(true) + base := RuleManagerFactory([][]*promRules.Group{{}, {}}, []time.Duration{time.Millisecond, time.Millisecond}) + factory := func(ctx context.Context, userID string, n *notifier.Manager, logger log.Logger, p *client.Pool, reg prometheus.Registerer) (RulesManager, error) { + if fail.Load() { + return nil, errors.New("manager factory failed") + } + return base(ctx, userID, n, logger, p, reg) + } + + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, factory, nil, prometheus.NewRegistry(), log.NewNopLogger()) + require.NoError(t, err) + t.Cleanup(m.Stop) + + before := countNotifierRunGoroutines() + + userRules := map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{Name: "group1", Namespace: "ns", Interval: 1 * time.Minute, User: user}, + }, + } + + // First sync fails: no manager is tracked and the notifier must be cleaned up. + m.SyncRuleGroups(context.Background(), userRules) + require.Nil(t, getManager(m, user)) + m.notifiersMtx.Lock() + _, notifierExists := m.notifiers[user] + m.notifiersMtx.Unlock() + require.False(t, notifierExists) + test.Poll(t, 5*time.Second, before, func() any { + return countNotifierRunGoroutines() + }) + + // Once the factory succeeds, the user is created normally and gets a notifier. + fail.Store(false) + m.SyncRuleGroups(context.Background(), userRules) + require.NotNil(t, getManager(m, user)) + m.notifiersMtx.Lock() + _, notifierExists = m.notifiers[user] + m.notifiersMtx.Unlock() + require.True(t, notifierExists, "notifier should be created when the user recovers") +} + +// countNotifierRunGoroutines returns the number of goroutines currently running +// inside rulerNotifier.run (its discovery and notification loops). It is used to +// assert that a notifier's goroutines have been stopped rather than leaked. +func countNotifierRunGoroutines() int { + var buf bytes.Buffer + _ = pprof.Lookup("goroutine").WriteTo(&buf, 2) + count := 0 + for stack := range bytes.SplitSeq(buf.Bytes(), []byte("\n\n")) { + // Matches both the run.funcN frames and the "created by ...run" line; each + // stack block is counted at most once, so the count stays accurate even if + // the closure frame is ever inlined away. + if bytes.Contains(stack, []byte("github.com/cortexproject/cortex/pkg/ruler.(*rulerNotifier).run")) { + count++ + } + } + return count +} + +// hasUserManagerRegistry reports whether a per-user metrics registry for the +// given user is still registered with the manager's metrics aggregator. The +// notifier registers prometheus_notifications_* metrics into that per-user +// registry, so its presence is observable via the aggregated, user-labelled +// output. +func hasUserManagerRegistry(t *testing.T, m *DefaultMultiTenantManager, user string) bool { + t.Helper() + tmp := prometheus.NewRegistry() + tmp.MustRegister(m.userManagerMetrics) + mfs, err := tmp.Gather() + require.NoError(t, err) + for _, mf := range mfs { + for _, metric := range mf.GetMetric() { + for _, lp := range metric.GetLabel() { + if lp.GetName() == "user" && lp.GetValue() == user { + return true + } + } + } + } + return false +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.RLock() defer m.userManagerMtx.RUnlock()