From 98658e0231206cee06e6fe60a12314827a8d4d6b Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 30 Apr 2026 14:02:42 +0800 Subject: [PATCH 1/6] fix(queue): close TOCTOU race between schedule() and start() Introduce an activeSlots counter protected by the existing mutex to track reserved worker slots. This eliminates the window where schedule() could observe a stale BusyWorkers() value and over-dispatch beyond workerCount. Closes #154 Co-Authored-By: Claude Opus 4.6 (1M context) --- queue.go | 20 ++++++++++++++++++-- ring_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/queue.go b/queue.go index 0fe4161..e6debc8 100644 --- a/queue.go +++ b/queue.go @@ -30,6 +30,7 @@ type ( metric *metric // Metrics collector for tracking queue and worker stats logger Logger // Logger for queue events and errors workerCount int64 // Number of worker goroutines to process jobs + activeSlots int64 // Number of reserved worker slots (scheduling guard) routineGroup *routineGroup // Group to manage and wait for goroutines quit chan struct{} // Channel to signal shutdown to all goroutines ready chan struct{} // Channel to signal worker readiness @@ -185,6 +186,9 @@ func (q *Queue) work(task core.TaskMessage) { // Defer block to handle panics, update metrics, and run afterFn callback. defer func() { q.metric.DecBusyWorker() + q.Lock() + q.activeSlots-- + q.Unlock() e := recover() if e != nil { q.logger.Fatalf("panic error: %v", e) @@ -313,12 +317,12 @@ func (q *Queue) UpdateWorkerCount(num int64) { q.schedule() } -// schedule checks if more workers can be started based on the current busy count. +// schedule checks if more workers can be started based on reserved slots. // If so, it signals readiness to start a new worker. func (q *Queue) schedule() { q.Lock() defer q.Unlock() - if q.BusyWorkers() >= q.workerCount { + if q.activeSlots >= q.workerCount { return } @@ -351,6 +355,15 @@ func (q *Queue) start() { return } + // Reserve a slot under the mutex to close the TOCTOU gap. + q.Lock() + if q.activeSlots >= q.workerCount { + q.Unlock() + continue + } + q.activeSlots++ + q.Unlock() + // Request a task from the worker in a background goroutine. q.routineGroup.Run(func() { for { @@ -386,6 +399,9 @@ func (q *Queue) start() { task, ok := <-tasks if !ok { + q.Lock() + q.activeSlots-- + q.Unlock() return } diff --git a/ring_test.go b/ring_test.go index b264c21..65441aa 100644 --- a/ring_test.go +++ b/ring_test.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "runtime" + "sync/atomic" "testing" "time" @@ -550,3 +551,38 @@ func BenchmarkRingQueue(b *testing.B) { } }) } + +func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { + const workerCount = 4 + const totalTasks = 100 + + var maxObserved int64 + + w := NewRing( + WithFn(func(ctx context.Context, m core.TaskMessage) error { + runtime.Gosched() + return nil + }), + ) + q, err := NewQueue( + WithWorker(w), + WithWorkerCount(workerCount), + ) + assert.NoError(t, err) + + q.Start() + for i := 0; i < totalTasks; i++ { + assert.NoError(t, q.Queue(mockMessage{message: "task"})) + busy := q.BusyWorkers() + for { + old := atomic.LoadInt64(&maxObserved) + if busy <= old || atomic.CompareAndSwapInt64(&maxObserved, old, busy) { + break + } + } + } + time.Sleep(2 * time.Second) + q.Release() + + assert.LessOrEqual(t, maxObserved, int64(workerCount)) +} From 09ffd3a8fa273988921448dff74b5c262815b0e4 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 30 Apr 2026 14:27:52 +0800 Subject: [PATCH 2/6] test(queue): improve BusyWorkers race test robustness - Use a gate channel to block tasks and create scheduling pressure - Monitor BusyWorkers continuously from a dedicated goroutine - Replace fixed 2s sleep with deterministic WaitGroup completion Co-Authored-By: Claude Opus 4.6 (1M context) --- ring_test.go | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/ring_test.go b/ring_test.go index 65441aa..ee381eb 100644 --- a/ring_test.go +++ b/ring_test.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "runtime" + "sync" "sync/atomic" "testing" "time" @@ -557,10 +558,14 @@ func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { const totalTasks = 100 var maxObserved int64 + var wg sync.WaitGroup + wg.Add(totalTasks) + gate := make(chan struct{}) w := NewRing( WithFn(func(ctx context.Context, m core.TaskMessage) error { - runtime.Gosched() + <-gate + wg.Done() return nil }), ) @@ -573,15 +578,37 @@ func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { q.Start() for i := 0; i < totalTasks; i++ { assert.NoError(t, q.Queue(mockMessage{message: "task"})) - busy := q.BusyWorkers() + } + + // Continuously monitor BusyWorkers while tasks execute. + stop := make(chan struct{}) + monitorDone := make(chan struct{}) + go func() { + defer close(monitorDone) for { - old := atomic.LoadInt64(&maxObserved) - if busy <= old || atomic.CompareAndSwapInt64(&maxObserved, old, busy) { - break + select { + case <-stop: + return + default: + busy := q.BusyWorkers() + for { + old := atomic.LoadInt64(&maxObserved) + if busy <= old || atomic.CompareAndSwapInt64(&maxObserved, old, busy) { + break + } + } + runtime.Gosched() } } + }() + + // Release tasks in batches to create scheduling pressure. + for i := 0; i < totalTasks; i++ { + gate <- struct{}{} } - time.Sleep(2 * time.Second) + wg.Wait() + close(stop) + <-monitorDone q.Release() assert.LessOrEqual(t, maxObserved, int64(workerCount)) From dce7a1ce384f243b9efacc70b3c1d983e59fd651 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 30 Apr 2026 14:34:43 +0800 Subject: [PATCH 3/6] test(queue): use ticker in monitor goroutine to reduce CPU spin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace tight select/default loop with a 100µs ticker for sampling BusyWorkers, avoiding unnecessary CPU consumption during tests. Co-Authored-By: Claude Opus 4.6 (1M context) --- ring_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ring_test.go b/ring_test.go index ee381eb..3a0ee2a 100644 --- a/ring_test.go +++ b/ring_test.go @@ -585,11 +585,13 @@ func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { monitorDone := make(chan struct{}) go func() { defer close(monitorDone) + ticker := time.NewTicker(100 * time.Microsecond) + defer ticker.Stop() for { select { case <-stop: return - default: + case <-ticker.C: busy := q.BusyWorkers() for { old := atomic.LoadInt64(&maxObserved) @@ -597,7 +599,6 @@ func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { break } } - runtime.Gosched() } } }() From f322612a576800b43b9b153696affcc084914c73 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 30 Apr 2026 14:44:14 +0800 Subject: [PATCH 4/6] test(queue): add context cancellation and timeout guards to race test - Honor ctx.Done() in task function to prevent goroutine leaks on early exit - Add 10s timeout when sending gate tokens to fail fast on deadlock Co-Authored-By: Claude Opus 4.6 (1M context) --- ring_test.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/ring_test.go b/ring_test.go index 3a0ee2a..4b4af22 100644 --- a/ring_test.go +++ b/ring_test.go @@ -564,9 +564,13 @@ func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { w := NewRing( WithFn(func(ctx context.Context, m core.TaskMessage) error { - <-gate - wg.Done() - return nil + defer wg.Done() + select { + case <-gate: + return nil + case <-ctx.Done(): + return ctx.Err() + } }), ) q, err := NewQueue( @@ -603,9 +607,14 @@ func TestBusyWorkersNeverExceedsWorkerCount(t *testing.T) { } }() - // Release tasks in batches to create scheduling pressure. + // Release tasks with a timeout to prevent hanging on regression. + timeout := time.After(10 * time.Second) for i := 0; i < totalTasks; i++ { - gate <- struct{}{} + select { + case gate <- struct{}{}: + case <-timeout: + t.Fatal("timed out sending gate tokens — possible scheduling deadlock") + } } wg.Wait() close(stop) From 81b0b64c3729204f94bc712b5e1f29044210f73e Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 30 Apr 2026 16:11:53 +0800 Subject: [PATCH 5/6] refactor(queue): extract slot helpers and fold release with ready signal - Extract tryReserveSlot, releaseSlot, signalReadyLocked helpers to replace three inline mutex+counter blocks - Fold work() defer's slot release and schedule signal into one critical section, saving a lock round-trip per task - Expand activeSlots field comment to clarify its lifecycle versus BusyWorkers Co-Authored-By: Claude Opus 4.7 (1M context) --- queue.go | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/queue.go b/queue.go index e6debc8..c8c6679 100644 --- a/queue.go +++ b/queue.go @@ -30,7 +30,7 @@ type ( metric *metric // Metrics collector for tracking queue and worker stats logger Logger // Logger for queue events and errors workerCount int64 // Number of worker goroutines to process jobs - activeSlots int64 // Number of reserved worker slots (scheduling guard) + activeSlots int64 // Reserved slots gating dispatch; incremented before worker.Request(), unlike BusyWorkers which counts only running tasks routineGroup *routineGroup // Group to manage and wait for goroutines quit chan struct{} // Channel to signal shutdown to all goroutines ready chan struct{} // Channel to signal worker readiness @@ -186,14 +186,11 @@ func (q *Queue) work(task core.TaskMessage) { // Defer block to handle panics, update metrics, and run afterFn callback. defer func() { q.metric.DecBusyWorker() - q.Lock() - q.activeSlots-- - q.Unlock() + q.releaseSlot() e := recover() if e != nil { q.logger.Fatalf("panic error: %v", e) } - q.schedule() // Update success or failure metrics based on execution result. if err == nil && e == nil { @@ -322,16 +319,42 @@ func (q *Queue) UpdateWorkerCount(num int64) { func (q *Queue) schedule() { q.Lock() defer q.Unlock() + q.signalReadyLocked() +} + +// signalReadyLocked sends a non-blocking ready signal if a slot is available. +// Caller must hold q.Lock(). +func (q *Queue) signalReadyLocked() { if q.activeSlots >= q.workerCount { return } - select { case q.ready <- struct{}{}: default: } } +// tryReserveSlot reserves a worker slot if one is available under the mutex, +// closing the TOCTOU gap between schedule() and dispatch. +func (q *Queue) tryReserveSlot() bool { + q.Lock() + defer q.Unlock() + if q.activeSlots >= q.workerCount { + return false + } + q.activeSlots++ + return true +} + +// releaseSlot frees a reserved slot and signals readiness in one critical +// section, saving a lock round-trip versus separate decrement + schedule(). +func (q *Queue) releaseSlot() { + q.Lock() + defer q.Unlock() + q.activeSlots-- + q.signalReadyLocked() +} + /* start launches the main worker loop, which manages job scheduling and execution. @@ -355,14 +378,9 @@ func (q *Queue) start() { return } - // Reserve a slot under the mutex to close the TOCTOU gap. - q.Lock() - if q.activeSlots >= q.workerCount { - q.Unlock() + if !q.tryReserveSlot() { continue } - q.activeSlots++ - q.Unlock() // Request a task from the worker in a background goroutine. q.routineGroup.Run(func() { @@ -399,9 +417,7 @@ func (q *Queue) start() { task, ok := <-tasks if !ok { - q.Lock() - q.activeSlots-- - q.Unlock() + q.releaseSlot() return } From 82b09e098edf16602cfe48b2ad116e4ade129077 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Thu, 30 Apr 2026 17:04:54 +0800 Subject: [PATCH 6/6] style(queue): trim activeSlots field comment to satisfy lll lint - Shorten trailing comment to under 120 chars; helper docs carry the detail Co-Authored-By: Claude Opus 4.7 (1M context) --- queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue.go b/queue.go index c8c6679..0b709fe 100644 --- a/queue.go +++ b/queue.go @@ -30,7 +30,7 @@ type ( metric *metric // Metrics collector for tracking queue and worker stats logger Logger // Logger for queue events and errors workerCount int64 // Number of worker goroutines to process jobs - activeSlots int64 // Reserved slots gating dispatch; incremented before worker.Request(), unlike BusyWorkers which counts only running tasks + activeSlots int64 // Reserved worker slots; see tryReserveSlot/releaseSlot routineGroup *routineGroup // Group to manage and wait for goroutines quit chan struct{} // Channel to signal shutdown to all goroutines ready chan struct{} // Channel to signal worker readiness