Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [BUGFIX] Ruler: Stop the per-tenant notifier (and its Alertmanager service-discovery goroutines) and remove the per-user metrics registry when rule manager creation fails, instead of leaking them until process shutdown. #7597
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
33 changes: 32 additions & 1 deletion pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,37 @@ func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID strin
reg := prometheus.NewRegistry()
r.userManagerMetrics.AddUserRegistry(userID, reg)

// getOrCreateNotifier starts the per-user notifier (and its discovery and
// notification goroutines) and registers it in r.notifiers before the rule
// manager is created. If anything below fails, this user is never added to
// r.userManagers, so the removal loop in SyncRuleGroups would never stop the
// notifier, leaking it and its goroutines until the process exits (issue
// #7595). Tear the partially-initialized state down on any early return; the
// cleanup is disarmed once a manager is successfully returned. removeNotifier
// takes r.notifiersMtx while this method runs under r.userManagerMtx; those two
// locks are always acquired in that order (userManagerMtx, then notifiersMtx)
// and never the reverse, so this cannot deadlock. removeNotifier is a no-op
// when no notifier was registered.
success := false
defer func() {
if !success {
r.removeNotifier(userID)
r.userManagerMetrics.RemoveUserRegistry(userID)
}
}()

notifier, err := r.getOrCreateNotifier(userID, reg)
if err != nil {
return nil, err
}

return r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg)
manager, err := r.managerFactory(ctx, userID, notifier, r.logger, r.frontendPool, reg)
if err != nil {
return nil, err
}

success = true
return manager, nil
}

func (r *DefaultMultiTenantManager) removeNotifier(userID string) {
Expand Down Expand Up @@ -374,6 +399,12 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag

// This should never fail, unless there's a programming mistake.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is ambiguous.. wdyt?

if err := n.applyConfig(r.notifierCfg); err != nil {
// n.run() already started the notifier's discovery and notification
// goroutines, but n has not been added to r.notifiers yet, so neither
// removeNotifier nor Stop would ever stop it. Stop it directly to avoid
// leaking those goroutines. We must not call removeNotifier here because
// we already hold r.notifiersMtx, which it would try to re-acquire.
n.stop()
return nil, err
}

Expand Down
197 changes: 197 additions & 0 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package ruler

import (
"bytes"
"context"
"errors"
"runtime/pprof"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
config_util "github.com/prometheus/common/config"
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/notifier"
Expand Down Expand Up @@ -304,6 +309,198 @@ func TestBackupRules(t *testing.T) {
require.Equal(t, userRules[user2], m.GetBackupRules(user2))
}

// TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError is a regression test for
// https://github.com/cortexproject/cortex/issues/7595. When the manager factory
// returns an error, newManager has already created and started the per-user
// notifier (via getOrCreateNotifier -> n.run()) and registered it in r.notifiers.
// Because the user is never added to r.userManagers, the removal loop in
// SyncRuleGroups never stops that notifier, so it and its discovery/notification
// goroutines used to leak until the process exited.
func TestSyncRuleGroupsCleansUpNotifierOnManagerFactoryError(t *testing.T) {
dir := t.TempDir()
const user = "testUser"

factoryErr := errors.New("manager factory failed")
failingFactory := func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ *client.Pool, _ prometheus.Registerer) (RulesManager, error) {
return nil, factoryErr
}

// Use a dedicated registry (not nil): a nil registry registers the notifier
// service-discovery metrics on the global default registerer, which can
// os.Exit(1) on duplicate registration when running alongside other tests.
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, failingFactory, nil, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)
t.Cleanup(m.Stop)

// Baseline notifier-run goroutines before the (failing) sync. Assert a delta
// back to this baseline rather than an absolute zero so the check is robust to
// any unrelated notifier goroutines from other tests sharing this process.
before := countNotifierRunGoroutines()

userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}
m.SyncRuleGroups(context.Background(), userRules)

// The factory failed, so the user must not be tracked as a live manager.
require.Nil(t, getManager(m, user))

// The notifier must have been stopped and removed from the map.
m.notifiersMtx.Lock()
_, notifierExists := m.notifiers[user]
m.notifiersMtx.Unlock()
require.False(t, notifierExists, "notifier must be removed after a managerFactory error")

// The per-user metrics registry must have been removed too.
require.False(t, hasUserManagerRegistry(t, m, user), "per-user metrics registry must be removed after a managerFactory error")

// Its goroutines (started by rulerNotifier.run) must not leak. removeNotifier
// -> stop() -> wg.Wait() is synchronous, so they are gone by now; poll back to
// the baseline to absorb any scheduling latency.
test.Poll(t, 5*time.Second, before, func() any {
return countNotifierRunGoroutines()
})
}

// TestGetOrCreateNotifierStopsNotifierOnApplyConfigError is a regression test for
// the secondary leak path in https://github.com/cortexproject/cortex/issues/7595:
// getOrCreateNotifier starts the notifier with n.run() before calling
// n.applyConfig. If applyConfig fails, the notifier was never inserted into
// r.notifiers, so it must be stopped directly to avoid leaking its goroutines.
func TestGetOrCreateNotifierStopsNotifierOnApplyConfigError(t *testing.T) {
const user = "testUser"
m, err := NewDefaultMultiTenantManager(Config{RulePath: t.TempDir()}, &ruleLimits{}, RuleManagerFactory(nil, nil), nil, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)
t.Cleanup(m.Stop)

// Force applyConfig to fail by pointing the Alertmanager TLS config at a CA
// file that does not exist.
m.notifierCfg = &promConfig.Config{
AlertingConfig: promConfig.AlertingConfig{
AlertmanagerConfigs: promConfig.AlertmanagerConfigs{
{
HTTPClientConfig: config_util.HTTPClientConfig{
TLSConfig: config_util.TLSConfig{CAFile: "/does/not/exist"},
},
APIVersion: promConfig.AlertmanagerAPIVersionV2,
},
},
},
}

before := countNotifierRunGoroutines()
_, err = m.getOrCreateNotifier(user, prometheus.NewRegistry())
require.Error(t, err)

m.notifiersMtx.Lock()
_, ok := m.notifiers[user]
m.notifiersMtx.Unlock()
require.False(t, ok, "notifier must not be registered when applyConfig fails")

test.Poll(t, 5*time.Second, before, func() any {
return countNotifierRunGoroutines()
})
}

// TestSyncRuleGroupsRecoversAfterManagerFactoryError verifies that a user whose
// first manager creation failed — and whose notifier was therefore cleaned up by
// the fix for https://github.com/cortexproject/cortex/issues/7595 — is created
// normally on a later sync once the factory succeeds, i.e. the failure cleanup
// does not leave the user in an unrecoverable state.
func TestSyncRuleGroupsRecoversAfterManagerFactoryError(t *testing.T) {
dir := t.TempDir()
const user = "testUser"

fail := atomic.NewBool(true)
base := RuleManagerFactory([][]*promRules.Group{{}, {}}, []time.Duration{time.Millisecond, time.Millisecond})
factory := func(ctx context.Context, userID string, n *notifier.Manager, logger log.Logger, p *client.Pool, reg prometheus.Registerer) (RulesManager, error) {
if fail.Load() {
return nil, errors.New("manager factory failed")
}
return base(ctx, userID, n, logger, p, reg)
}

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, &ruleLimits{}, factory, nil, prometheus.NewRegistry(), log.NewNopLogger())
require.NoError(t, err)
t.Cleanup(m.Stop)

before := countNotifierRunGoroutines()

userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{Name: "group1", Namespace: "ns", Interval: 1 * time.Minute, User: user},
},
}

// First sync fails: no manager is tracked and the notifier must be cleaned up.
m.SyncRuleGroups(context.Background(), userRules)
require.Nil(t, getManager(m, user))
m.notifiersMtx.Lock()
_, notifierExists := m.notifiers[user]
m.notifiersMtx.Unlock()
require.False(t, notifierExists)
test.Poll(t, 5*time.Second, before, func() any {
return countNotifierRunGoroutines()
})

// Once the factory succeeds, the user is created normally and gets a notifier.
fail.Store(false)
m.SyncRuleGroups(context.Background(), userRules)
require.NotNil(t, getManager(m, user))
m.notifiersMtx.Lock()
_, notifierExists = m.notifiers[user]
m.notifiersMtx.Unlock()
require.True(t, notifierExists, "notifier should be created when the user recovers")
}

// countNotifierRunGoroutines returns the number of goroutines currently running
// inside rulerNotifier.run (its discovery and notification loops). It is used to
// assert that a notifier's goroutines have been stopped rather than leaked.
func countNotifierRunGoroutines() int {
var buf bytes.Buffer
_ = pprof.Lookup("goroutine").WriteTo(&buf, 2)
count := 0
for stack := range bytes.SplitSeq(buf.Bytes(), []byte("\n\n")) {
// Matches both the run.funcN frames and the "created by ...run" line; each
// stack block is counted at most once, so the count stays accurate even if
// the closure frame is ever inlined away.
if bytes.Contains(stack, []byte("github.com/cortexproject/cortex/pkg/ruler.(*rulerNotifier).run")) {
count++
}
}
return count
}

// hasUserManagerRegistry reports whether a per-user metrics registry for the
// given user is still registered with the manager's metrics aggregator. The
// notifier registers prometheus_notifications_* metrics into that per-user
// registry, so its presence is observable via the aggregated, user-labelled
// output.
func hasUserManagerRegistry(t *testing.T, m *DefaultMultiTenantManager, user string) bool {
t.Helper()
tmp := prometheus.NewRegistry()
tmp.MustRegister(m.userManagerMetrics)
mfs, err := tmp.Gather()
require.NoError(t, err)
for _, mf := range mfs {
for _, metric := range mf.GetMetric() {
for _, lp := range metric.GetLabel() {
if lp.GetName() == "user" && lp.GetValue() == user {
return true
}
}
}
}
return false
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()
Expand Down
Loading