Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [BUGFIX] Query Scheduler: Stop the fragment table's background cleanup goroutine (and its ticker) on shutdown; it previously had no stop mechanism and lived for the process lifetime. #7599
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ require (
github.com/tjhop/slog-gokit v0.1.4
go.opentelemetry.io/collector/pdata v1.45.0
go.uber.org/automaxprocs v1.6.0
go.uber.org/goleak v1.3.0
google.golang.org/protobuf v1.36.11
)

Expand Down Expand Up @@ -283,7 +284,6 @@ require (
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
Expand Down
20 changes: 18 additions & 2 deletions pkg/scheduler/fragment_table/fragment_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type FragmentTable struct {
mappings map[distributed_execution.FragmentKey]*fragmentEntry
mu sync.RWMutex
expiration time.Duration
done chan struct{}
closeOnce sync.Once
}

// NewFragmentTable creates a new FragmentTable with the specified expiration duration.
Expand All @@ -27,6 +29,7 @@ func NewFragmentTable(expiration time.Duration) *FragmentTable {
ft := &FragmentTable{
mappings: make(map[distributed_execution.FragmentKey]*fragmentEntry),
expiration: expiration,
done: make(chan struct{}),
}
go ft.periodicCleanup()
return ft
Expand Down Expand Up @@ -55,6 +58,14 @@ func (f *FragmentTable) GetAddrByID(queryID uint64, fragmentID uint64) (string,
return "", false
}

// Close stops the background cleanup goroutine started by NewFragmentTable.
// It is safe to call more than once.
func (f *FragmentTable) Close() {
f.closeOnce.Do(func() {
close(f.done)
})
}

func (f *FragmentTable) cleanupExpired() {
f.mu.Lock()
defer f.mu.Unlock()
Expand All @@ -73,7 +84,12 @@ func (f *FragmentTable) cleanupExpired() {
func (f *FragmentTable) periodicCleanup() {
ticker := time.NewTicker(f.expiration / 2)
defer ticker.Stop()
for range ticker.C {
f.cleanupExpired()
for {
select {
case <-ticker.C:
f.cleanupExpired()
case <-f.done:
return
}
}
}
24 changes: 24 additions & 0 deletions pkg/scheduler/fragment_table/fragment_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

// TestMain runs the package tests under goleak so that any FragmentTable whose
// cleanup goroutine is not stopped (via Close) fails the package.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestNewFragmentTable(t *testing.T) {
tests := []struct {
name string
Expand All @@ -27,6 +34,7 @@ func TestNewFragmentTable(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ft := NewFragmentTable(tt.expiration)
t.Cleanup(ft.Close)
require.NotNil(t, ft)
require.NotNil(t, ft.mappings)
assert.Equal(t, tt.expiration, ft.expiration)
Expand All @@ -36,6 +44,7 @@ func TestNewFragmentTable(t *testing.T) {

func TestFragmentTable_AddAndGetAddress(t *testing.T) {
ft := NewFragmentTable(time.Hour)
t.Cleanup(ft.Close)

tests := []struct {
name string
Expand Down Expand Up @@ -84,6 +93,7 @@ func TestFragmentTable_AddAndGetAddress(t *testing.T) {
func TestFragmentTable_Expiration(t *testing.T) {
expiration := 100 * time.Millisecond
ft := NewFragmentTable(expiration)
t.Cleanup(ft.Close)

t.Run("entries expire after timeout", func(t *testing.T) {
ft.AddAddressByID(1, 1, "addr1")
Expand All @@ -106,6 +116,7 @@ func TestFragmentTable_Expiration(t *testing.T) {

func TestFragmentTable_ConcurrentAccess(t *testing.T) {
ft := NewFragmentTable(time.Hour)
t.Cleanup(ft.Close)

const (
numGoroutines = 10
Expand Down Expand Up @@ -140,6 +151,7 @@ func TestFragmentTable_ConcurrentAccess(t *testing.T) {
func TestFragmentTable_PeriodicCleanup(t *testing.T) {
expiration := 100 * time.Millisecond
ft := NewFragmentTable(expiration)
t.Cleanup(ft.Close)

ft.AddAddressByID(1, 1, "addr1")
ft.AddAddressByID(1, 2, "addr2")
Expand All @@ -163,3 +175,15 @@ func TestFragmentTable_PeriodicCleanup(t *testing.T) {
_, ok = ft.GetAddrByID(1, 2)
require.False(t, ok)
}

// TestFragmentTable_Close verifies that Close stops the background cleanup
// goroutine — asserted package-wide by goleak in TestMain — and is safe to call
// more than once.
func TestFragmentTable_Close(t *testing.T) {
ft := NewFragmentTable(time.Hour)

ft.Close()

// Close is idempotent: a second call must not panic (e.g. double close).
require.NotPanics(t, ft.Close)
}
2 changes: 2 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ func (s *Scheduler) running(ctx context.Context) error {

// Close the Scheduler.
func (s *Scheduler) stopping(_ error) error {
// Stop the fragment table's background cleanup goroutine.
s.fragmentTable.Close()
// This will also stop the requests queue, which stop accepting new requests and errors out any tracked requests.
return services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
}
Expand Down
Loading