diff --git a/client.go b/client.go index d4e0ec40..c62959c1 100644 --- a/client.go +++ b/client.go @@ -768,6 +768,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.queues = &QueueBundle{ addProducer: client.addProducer, + removeProducer: client.removeProducer, clientFetchCooldown: config.FetchCooldown, clientFetchPollInterval: config.FetchPollInterval, clientWillExecuteJobs: config.willExecuteJobs(), @@ -2209,6 +2210,21 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p return producer, nil } +func (c *Client[TTx]) removeProducer(queueName string) error { + c.producersMu.Lock() + defer c.producersMu.Unlock() + + producer, ok := c.producersByQueueName[queueName] + if !ok { + return &QueueNotFoundError{Name: queueName} + } + + producer.Stop() + delete(c.producersByQueueName, queueName) + + return nil +} + var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`) func validateQueueName(queueName string) error { @@ -2799,6 +2815,8 @@ type QueueBundle struct { // Function that adds a producer to the associated client. addProducer func(queueName string, queueConfig QueueConfig) (*producer, error) + removeProducer func(queueName string) error + clientFetchCooldown time.Duration clientFetchPollInterval time.Duration @@ -2844,6 +2862,24 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error { return nil } +// Remove removes a queue from the client, stopping the producer if the client +// is running. The function will block until all jobs currently being worked in +// the queue have completed. This blocking behavior may affect other operations, +// including shutdown timing. +// +// Returns an error if the client is not configured to execute jobs or if the +// specified queue does not exist. +func (b *QueueBundle) Remove(queueName string) error { + if !b.clientWillExecuteJobs { + return errors.New("client is not configured to execute jobs, cannot remove queue") + } + + b.startStopMu.Lock() + defer b.startStopMu.Unlock() + + return b.removeProducer(queueName) +} + // Generates a default client ID using the current hostname and time. func defaultClientID(startedAt time.Time) string { host, _ := os.Hostname() diff --git a/client_test.go b/client_test.go index f8d035c4..551e6160 100644 --- a/client_test.go +++ b/client_test.go @@ -406,6 +406,252 @@ func Test_Client_Common(t *testing.T) { wg.Wait() }) + t.Run("Queues_Remove_BeforeStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + queueName := "remove_before_start_queue" + err := client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + err = client.Queues().Remove(queueName) + require.NoError(t, err) + + startClient(ctx, t, client) + + insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + + // Verify job stays available by checking another queue's job completes + _, err = client.Insert(ctx, &noOpArgs{}, nil) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.NotEqual(t, insertRes.Job.ID, event.Job.ID) + + // Job on removed queue should still be available + job, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job.State) + }) + + t.Run("Queues_Remove_AfterStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + queueName := "remove_after_start_queue" + err := client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + + err = client.Queues().Remove(queueName) + require.NoError(t, err) + + insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + + // Verify job stays available by checking another queue's job completes + _, err = client.Insert(ctx, &noOpArgs{}, nil) + require.NoError(t, err) + event = riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.NotEqual(t, insertRes.Job.ID, event.Job.ID) + + // Job on removed queue should still be available + job, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job.State) + }) + + t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + err := client.Queues().Remove("non_existent_queue") + require.Error(t, err) + var queueNotFoundErr *QueueNotFoundError + require.ErrorAs(t, err, &queueNotFoundErr) + require.Equal(t, "non_existent_queue", queueNotFoundErr.Name) + }) + + t.Run("Queues_Remove_WhenClientWontExecuteJobs", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = nil + config.Workers = nil + client := newTestClient(t, bundle.dbPool, config) + + err := client.Queues().Remove("any_queue") + require.Error(t, err) + require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot remove queue") + }) + + t.Run("Queues_Remove_DefaultQueue", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}} + client := newTestClient(t, bundle.dbPool, config) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + startClient(ctx, t, client) + + _, err := client.Insert(ctx, &JobArgs{}, nil) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + + err = client.Queues().Remove(QueueDefault) + require.NoError(t, err) + + insertRes, err := client.Insert(ctx, &JobArgs{}, nil) + require.NoError(t, err) + + // Verify no more jobs complete by using a short timeout + select { + case <-subscribeChan: + t.Fatal("expected job to not be worked after default queue removal") + case <-time.After(500 * time.Millisecond): + } + + // Job should still be available + job, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job.State) + }) + + t.Run("Queues_Remove_ThenAddAgain", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + subscribeChan := subscribe(t, client) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + queueName := "remove_then_add_queue" + err := client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + + err = client.Queues().Remove(queueName) + require.NoError(t, err) + + err = client.Queues().Add(queueName, QueueConfig{ + MaxWorkers: 2, + }) + require.NoError(t, err) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + event = riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + }) + + t.Run("Queues_Remove_JobWaitsUntilReAdded", func(t *testing.T) { + t.Parallel() + + config, bundle := setupConfig(t) + config.Queues = map[string]QueueConfig{"test_queue": {MaxWorkers: 10}} + client := newTestClient(t, bundle.dbPool, config) + + subscribeChan := subscribe(t, client) + startClient(ctx, t, client) + + insertRes1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes1.Job.ID, event.Job.ID) + + require.NoError(t, client.Queues().Remove("test_queue")) + + insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"}) + require.NoError(t, err) + + select { + case <-subscribeChan: + t.Fatal("expected job 2 to not start on removed queue") + case <-time.After(500 * time.Millisecond): + } + + require.NoError(t, client.Queues().Add("test_queue", QueueConfig{MaxWorkers: 10})) + + event = riversharedtest.WaitOrTimeout(t, subscribeChan) + require.Equal(t, EventKindJobCompleted, event.Kind) + require.Equal(t, insertRes2.Job.ID, event.Job.ID) + }) + t.Run("JobCancelErrorReturned", func(t *testing.T) { t.Parallel() diff --git a/error.go b/error.go index eb74f1a7..7e634d65 100644 --- a/error.go +++ b/error.go @@ -59,6 +59,21 @@ func (e *QueueAlreadyAddedError) Is(target error) bool { return ok } +// QueueNotFoundError is returned when attempting to remove a queue that does +// not exist on the Client. +type QueueNotFoundError struct { + Name string +} + +func (e *QueueNotFoundError) Error() string { + return fmt.Sprintf("queue %q not found", e.Name) +} + +func (e *QueueNotFoundError) Is(target error) bool { + _, ok := target.(*QueueNotFoundError) + return ok +} + // UnknownJobKindError is returned when a Client fetches and attempts to // work a job that has not been registered on the Client's Workers bundle (using AddWorker). type UnknownJobKindError = rivertype.UnknownJobKindError