Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client

client.queues = &QueueBundle{
addProducer: client.addProducer,
removeProducer: client.removeProducer,
clientFetchCooldown: config.FetchCooldown,
clientFetchPollInterval: config.FetchPollInterval,
clientWillExecuteJobs: config.willExecuteJobs(),
Expand Down Expand Up @@ -2209,6 +2210,21 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p
return producer, nil
}

func (c *Client[TTx]) removeProducer(queueName string) error {
c.producersMu.Lock()
defer c.producersMu.Unlock()

producer, ok := c.producersByQueueName[queueName]
if !ok {
return &QueueNotFoundError{Name: queueName}
}

producer.Stop()
delete(c.producersByQueueName, queueName)

return nil
}

var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)

func validateQueueName(queueName string) error {
Expand Down Expand Up @@ -2799,6 +2815,8 @@ type QueueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) (*producer, error)

removeProducer func(queueName string) error

clientFetchCooldown time.Duration
clientFetchPollInterval time.Duration

Expand Down Expand Up @@ -2844,6 +2862,24 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
return nil
}

// Remove removes a queue from the client, stopping the producer if the client
// is running. The function will block until all jobs currently being worked in
// the queue have completed. This blocking behavior may affect other operations,
// including shutdown timing.
//
// Returns an error if the client is not configured to execute jobs or if the
// specified queue does not exist.
func (b *QueueBundle) Remove(queueName string) error {
if !b.clientWillExecuteJobs {
return errors.New("client is not configured to execute jobs, cannot remove queue")
}

b.startStopMu.Lock()
defer b.startStopMu.Unlock()

return b.removeProducer(queueName)
}

// Generates a default client ID using the current hostname and time.
func defaultClientID(startedAt time.Time) string {
host, _ := os.Hostname()
Expand Down
246 changes: 246 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,252 @@ func Test_Client_Common(t *testing.T) {
wg.Wait()
})

t.Run("Queues_Remove_BeforeStart", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

subscribeChan := subscribe(t, client)

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

queueName := "remove_before_start_queue"
err := client.Queues().Add(queueName, QueueConfig{
MaxWorkers: 2,
})
require.NoError(t, err)

err = client.Queues().Remove(queueName)
require.NoError(t, err)

startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)

// Verify job stays available by checking another queue's job completes
_, err = client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.NotEqual(t, insertRes.Job.ID, event.Job.ID)

// Job on removed queue should still be available
job, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, job.State)
})

t.Run("Queues_Remove_AfterStart", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

subscribeChan := subscribe(t, client)

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

queueName := "remove_after_start_queue"
err := client.Queues().Add(queueName, QueueConfig{
MaxWorkers: 2,
})
require.NoError(t, err)

startClient(ctx, t, client)
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())

_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)

err = client.Queues().Remove(queueName)
require.NoError(t, err)

insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)

// Verify job stays available by checking another queue's job completes
_, err = client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.NotEqual(t, insertRes.Job.ID, event.Job.ID)

// Job on removed queue should still be available
job, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, job.State)
})

t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

err := client.Queues().Remove("non_existent_queue")
require.Error(t, err)
var queueNotFoundErr *QueueNotFoundError
require.ErrorAs(t, err, &queueNotFoundErr)
require.Equal(t, "non_existent_queue", queueNotFoundErr.Name)
})

t.Run("Queues_Remove_WhenClientWontExecuteJobs", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
config.Queues = nil
config.Workers = nil
client := newTestClient(t, bundle.dbPool, config)

err := client.Queues().Remove("any_queue")
require.Error(t, err)
require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot remove queue")
})

t.Run("Queues_Remove_DefaultQueue", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}}
client := newTestClient(t, bundle.dbPool, config)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

subscribeChan := subscribe(t, client)

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

startClient(ctx, t, client)

_, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)

err = client.Queues().Remove(QueueDefault)
require.NoError(t, err)

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

// Verify no more jobs complete by using a short timeout
select {
case <-subscribeChan:
t.Fatal("expected job to not be worked after default queue removal")
case <-time.After(500 * time.Millisecond):
}

// Job should still be available
job, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, job.State)
})

t.Run("Queues_Remove_ThenAddAgain", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

subscribeChan := subscribe(t, client)

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
return nil
}))

queueName := "remove_then_add_queue"
err := client.Queues().Add(queueName, QueueConfig{
MaxWorkers: 2,
})
require.NoError(t, err)

startClient(ctx, t, client)
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())

_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)

err = client.Queues().Remove(queueName)
require.NoError(t, err)

err = client.Queues().Add(queueName, QueueConfig{
MaxWorkers: 2,
})
require.NoError(t, err)

_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
Queue: queueName,
})
require.NoError(t, err)
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
})

t.Run("Queues_Remove_JobWaitsUntilReAdded", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
config.Queues = map[string]QueueConfig{"test_queue": {MaxWorkers: 10}}
client := newTestClient(t, bundle.dbPool, config)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertRes1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes1.Job.ID, event.Job.ID)

require.NoError(t, client.Queues().Remove("test_queue"))

insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
require.NoError(t, err)

select {
case <-subscribeChan:
t.Fatal("expected job 2 to not start on removed queue")
case <-time.After(500 * time.Millisecond):
}

require.NoError(t, client.Queues().Add("test_queue", QueueConfig{MaxWorkers: 10}))

event = riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, insertRes2.Job.ID, event.Job.ID)
})

t.Run("JobCancelErrorReturned", func(t *testing.T) {
t.Parallel()

Expand Down
15 changes: 15 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ func (e *QueueAlreadyAddedError) Is(target error) bool {
return ok
}

// QueueNotFoundError is returned when attempting to remove a queue that does
// not exist on the Client.
type QueueNotFoundError struct {
Name string
}

func (e *QueueNotFoundError) Error() string {
return fmt.Sprintf("queue %q not found", e.Name)
}

func (e *QueueNotFoundError) Is(target error) bool {
_, ok := target.(*QueueNotFoundError)
return ok
}

// UnknownJobKindError is returned when a Client fetches and attempts to
// work a job that has not been registered on the Client's Workers bundle (using AddWorker).
type UnknownJobKindError = rivertype.UnknownJobKindError
Loading