From 6c8d5be2fb8668f16441842d991baf5f71f19a25 Mon Sep 17 00:00:00 2001 From: Tiago Silva <3629062+tigrato@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:43:13 +0100 Subject: [PATCH] Add QueueBundle.Remove() to stop and remove queues Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal. Signed-off-by: Tiago Silva <3629062+tigrato@users.noreply.github.com> --- client.go | 36 ++++++++ client_test.go | 246 +++++++++++++++++++++++++++++++++++++++++++++++++ error.go | 15 +++ 3 files changed, 297 insertions(+) diff --git a/client.go b/client.go index d4e0ec40..c62959c1 100644 --- a/client.go +++ b/client.go @@ -768,6 +768,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.queues = &QueueBundle{ addProducer: client.addProducer, + removeProducer: client.removeProducer, clientFetchCooldown: config.FetchCooldown, clientFetchPollInterval: config.FetchPollInterval, clientWillExecuteJobs: config.willExecuteJobs(), @@ -2209,6 +2210,21 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p return producer, nil } +func (c *Client[TTx]) removeProducer(queueName string) error { + c.producersMu.Lock() + defer c.producersMu.Unlock() + + producer, ok := c.producersByQueueName[queueName] + if !ok { + return &QueueNotFoundError{Name: queueName} + } + + producer.Stop() + delete(c.producersByQueueName, queueName) + + return nil +} + var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`) func validateQueueName(queueName string) error { @@ -2799,6 +2815,8 @@ type QueueBundle struct { // Function that adds a producer to the associated client. addProducer func(queueName string, queueConfig QueueConfig) (*producer, error) + removeProducer func(queueName string) error + clientFetchCooldown time.Duration clientFetchPollInterval time.Duration @@ -2844,6 +2862,24 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { return nil } +// Remove removes a queue from the client, stopping the producer if the client +// is running. The function will block until all jobs currently being worked in +// the queue have completed. This blocking behavior may affect other operations, +// including shutdown timing. +// +// Returns an error if the client is not configured to execute jobs or if the +// specified queue does not exist. +func (b *QueueBundle) Remove(queueName string) error { + if !b.clientWillExecuteJobs { + return errors.New("client is not configured to execute jobs, cannot remove queue") + } + + b.startStopMu.Lock() + defer b.startStopMu.Unlock() + + return b.removeProducer(queueName) +} + // Generates a default client ID using the current hostname and time. func defaultClientID(startedAt time.Time) string { host, _ := os.Hostname() diff --git a/client_test.go b/client_test.go index f8d035c4..551e6160 100644 --- a/client_test.go +++ b/client_test.go @@ -406,6 +406,252 @@ func Test_Client_Common(t *testing.T) { wg.Wait() }) + t.Run("Queues_Remove_BeforeStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + queueName := "remove_before_start_queue" + err := client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + err = client.Queues().Remove(queueName) + require.NoError(t, err) + + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + + // Verify job stays available by checking another queue's job completes + _, err = client.Insert(ctx, &noOpArgs{}, nil) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.NotEqual(t, insertRes.Job.ID, event.Job.ID) + + // Job on removed queue should still be available + job, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job.State) + }) + + t.Run("Queues_Remove_AfterStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + queueName := "remove_after_start_queue" + err := client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + + err = client.Queues().Remove(queueName) + require.NoError(t, err) + + insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + + // Verify job stays available by checking another queue's job completes + _, err = client.Insert(ctx, &noOpArgs{}, nil) + require.NoError(t, err) + event = riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.NotEqual(t, insertRes.Job.ID, event.Job.ID) + + // Job on removed queue should still be available + job, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job.State) + }) + + t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + err := client.Queues().Remove("non_existent_queue") + require.Error(t, err) + var queueNotFoundErr *QueueNotFoundError + require.ErrorAs(t, err, &queueNotFoundErr) + require.Equal(t, "non_existent_queue", queueNotFoundErr.Name) + }) + + t.Run("Queues_Remove_WhenClientWontExecuteJobs", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = nil + config.Workers = nil + client := newTestClient(t, bundle.dbPool, config) + + err := client.Queues().Remove("any_queue") + require.Error(t, err) + require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot remove queue") + }) + + t.Run("Queues_Remove_DefaultQueue", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}} + client := newTestClient(t, bundle.dbPool, config) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + startClient(ctx, t, client) + + _, err := client.Insert(ctx, &JobArgs{}, nil) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + + err = client.Queues().Remove(QueueDefault) + require.NoError(t, err) + + insertRes, err := client.Insert(ctx, &JobArgs{}, nil) + require.NoError(t, err) + + // Verify no more jobs complete by using a short timeout + select { + case <-subscribeChan: + t.Fatal("expected job to not be worked after default queue removal") + case <-time.After(500 * time.Millisecond): + } + + // Job should still be available + job, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job.State) + }) + + t.Run("Queues_Remove_ThenAddAgain", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + queueName := "remove_then_add_queue" + err := client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + + err = client.Queues().Remove(queueName) + require.NoError(t, err) + + err = client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + event = riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + }) + + t.Run("Queues_Remove_JobWaitsUntilReAdded", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{"test_queue": {MaxWorkers: 10}} + client := newTestClient(t, bundle.dbPool, config) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes1.Job.ID, event.Job.ID) + + require.NoError(t, client.Queues().Remove("test_queue")) + + insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"}) + require.NoError(t, err) + + select { + case <-subscribeChan: + t.Fatal("expected job 2 to not start on removed queue") + case <-time.After(500 * time.Millisecond): + } + + require.NoError(t, client.Queues().Add("test_queue", QueueConfig{MaxWorkers: 10})) + + event = riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes2.Job.ID, event.Job.ID) + }) + t.Run("JobCancelErrorReturned", func(t *testing.T) { t.Parallel() diff --git a/error.go b/error.go index eb74f1a7..7e634d65 100644 --- a/error.go +++ b/error.go @@ -59,6 +59,21 @@ func (e *QueueAlreadyAddedError) Is(target error) bool { return ok } +// QueueNotFoundError is returned when attempting to remove a queue that does +// not exist on the Client. +type QueueNotFoundError struct { + Name string +} + +func (e *QueueNotFoundError) Error() string { + return fmt.Sprintf("queue %q not found", e.Name) +} + +func (e *QueueNotFoundError) Is(target error) bool { + _, ok := target.(*QueueNotFoundError) + return ok +} + // UnknownJobKindError is returned when a Client fetches and attempts to // work a job that has not been registered on the Client's Workers bundle (using AddWorker). type UnknownJobKindError = rivertype.UnknownJobKindError