diff --git a/CHANGELOG.md b/CHANGELOG.md index e98f9555..048c1aa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236). + ## [0.35.1] - 2026-04-26 ### Fixed diff --git a/client.go b/client.go index b60addcd..d4e0ec40 100644 --- a/client.go +++ b/client.go @@ -636,6 +636,7 @@ type Client[TTx any] struct { periodicJobs *PeriodicJobBundle pilot riverpilot.Pilot producersByQueueName map[string]*producer + producersMu sync.RWMutex queueMaintainer *maintenance.QueueMaintainer queueMaintainerLeader *maintenance.QueueMaintainerLeader queues *QueueBundle @@ -1959,7 +1960,14 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*rivertype. // transaction, the producer wouldn't yet be able to access the new jobs that // triggered the notification because they're not committed yet. func (c *Client[TTx]) notifyProducerWithoutListenerJobFetch(_ context.Context, res []*rivertype.JobInsertResult) { - if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 { + if c.driver.SupportsListener() { + return + } + + c.producersMu.RLock() + defer c.producersMu.RUnlock() + + if len(c.producersByQueueName) < 1 { return } @@ -2169,6 +2177,9 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error { } func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*producer, error) { + c.producersMu.Lock() + defer c.producersMu.Unlock() + if _, alreadyExists := c.producersByQueueName[queueName]; alreadyExists { return nil, &QueueAlreadyAddedError{Name: queueName} } @@ -2722,7 +2733,14 @@ func (c *Client[TTx]) QueueUpdateTx(ctx context.Context, tx TTx, name string, pa // transaction, the producer wouldn't yet be able to access the state that // triggered the notification because it's not committed yet. func (c *Client[TTx]) notifyProducerWithoutListenerQueueControlEvent(queue string, controlEvent *controlEventPayload) { - if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 { + if c.driver.SupportsListener() { + return + } + + c.producersMu.RLock() + defer c.producersMu.RUnlock() + + if len(c.producersByQueueName) < 1 { return }