Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,32 @@ type Config struct {
// setting of Postgres `search_path`.
Schema string

// SoftStopTimeout is the maximum amount of time that the client will wait
// for running jobs to finish during a stop before their contexts are
// cancelled. After the timeout elapses, the client escalates to a hard stop
// by cancelling the context of all running jobs. This applies regardless of
// how stop is initiated — whether by calling Stop, StopAndCancel, or by
// cancelling the context passed to Start.
//
// In combination with signal.NotifyContext on the context passed to Start,
// this can simplify graceful stop to:
//
// ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
// defer stop()
//
// if err := client.Start(ctx); err != nil { ... }
// <-client.Stopped()
//
// The signal cancels the Start context, which initiates a soft stop. If
// running jobs haven't finished after SoftStopTimeout, their contexts are
// automatically cancelled to trigger a hard stop.
//
// StopAndCancel bypasses the timeout entirely and cancels job contexts
// immediately.
//
// Defaults to no timeout (wait indefinitely for jobs to finish).
SoftStopTimeout time.Duration

// SkipJobKindValidation causes the job kind format validation check to be
// skipped. This is available as an interim stopgap for users that have
// invalid job kind names, but would rather disable the check rather than
Expand Down Expand Up @@ -457,6 +483,7 @@ func (c *Config) WithDefaults() *Config {
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
Schema: c.Schema,
SoftStopTimeout: c.SoftStopTimeout,
SkipJobKindValidation: c.SkipJobKindValidation,
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
Test: c.Test,
Expand Down Expand Up @@ -1081,10 +1108,19 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
workCtx, workCancel := context.WithCancelCause(ctx)
// We use separate contexts for fetching and working to allow for a
// graceful stop. When SoftStopTimeout is configured, the work context
// is detached from the start context so that cancelling the start
// context initiates a soft stop (with timeout escalation) rather than
// an immediate hard stop. When SoftStopTimeout is not configured, the
// work context inherits from the start context to preserve the
// existing behavior where cancelling the start context is equivalent
// to StopAndCancel.
workParentCtx := ctx
if c.config.SoftStopTimeout > 0 {
workParentCtx = context.WithoutCancel(ctx)
}
workCtx, workCancel := context.WithCancelCause(workParentCtx)

// Client available to executors and to various service hooks.
fetchCtx := withClient(fetchCtx, c)
Expand Down Expand Up @@ -1148,6 +1184,18 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

// If SoftStopTimeout is configured, start a timer that will cancel
// the work context (escalating to a hard stop) if producers don't
// finish in time. StopAndCancel also calls workCancel, in which case
// this timer is a harmless no-op because the context is already done.
if c.config.SoftStopTimeout > 0 {
softStopTimer := time.AfterFunc(c.config.SoftStopTimeout, func() {
c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Soft stop timeout; cancelling remaining job contexts", slog.Duration("soft_stop_timeout", c.config.SoftStopTimeout))
c.workCancel(rivercommon.ErrStop)
})
defer softStopTimer.Stop()
}

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
startstop.StopAllParallel(producersAsServices()...)
Expand Down Expand Up @@ -1180,6 +1228,10 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// complete before exiting. If the provided context is done before shutdown has
// completed, Stop will return immediately with the context's error.
//
// If SoftStopTimeout is configured, running job contexts will be automatically
// cancelled after the timeout elapses, escalating to a hard stop. This also
// applies when stop is initiated by cancelling the context passed to Start.
//
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
Expand Down Expand Up @@ -1208,6 +1260,12 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
// This can also be initiated by cancelling the context passed to Start. There is
// no need to call this method if the context passed to Start is cancelled
// instead.
//
// In most cases, using Stop with SoftStopTimeout configured is preferable to
// calling StopAndCancel directly. SoftStopTimeout gives running jobs a chance
// to finish before automatically escalating to context cancellation, providing
// graceful stop semantics without requiring manual orchestration of Stop and
// StopAndCancel.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrStop)
Expand Down
116 changes: 116 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,122 @@ func Test_Client_StopAndCancel(t *testing.T) {
})
}

func Test_Client_SoftStopTimeout(t *testing.T) {
t.Parallel()

ctx := context.Background()

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

t.Run("EscalatesToHardStopAfterTimeout", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 100 * time.Millisecond

jobDoneChan := make(chan struct{})
jobStartedChan := make(chan struct{})
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
close(jobStartedChan)
<-ctx.Done() // only finishes when context is cancelled
close(jobDoneChan)
return nil
}))

client := runNewTestClient(ctx, t, config)

_, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, jobStartedChan)

// Stop initiates a soft stop. The job won't finish on its own, but
// SoftStopTimeout should escalate to a hard stop after 100ms.
require.NoError(t, client.Stop(ctx))

// Verify the job's context was indeed cancelled.
select {
case <-jobDoneChan:
default:
t.Fatal("expected job to have been cancelled by soft stop timeout")
}
})

t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 5 * time.Second

jobStartedChan := make(chan struct{})
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
close(jobStartedChan)
return nil // finishes immediately
}))

client := runNewTestClient(ctx, t, config)

_, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, jobStartedChan)

// Stop should succeed quickly since the job finishes on its own.
// The 5s timeout should not fire.
require.NoError(t, client.Stop(ctx))
})

t.Run("ContextCancellationEscalatesAfterTimeout", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 100 * time.Millisecond

jobDoneChan := make(chan struct{})
jobStartedChan := make(chan struct{})
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
close(jobStartedChan)
<-ctx.Done()
close(jobDoneChan)
return nil
}))

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
)
config.Schema = schema

client, err := NewClient(driver, config)
require.NoError(t, err)

startCtx, startCtxCancel := context.WithCancel(ctx)
defer startCtxCancel()

require.NoError(t, client.Start(startCtx))

_, err = client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, jobStartedChan)

// Cancel the start context. This should initiate a soft stop, then
// escalate to hard stop after SoftStopTimeout.
startCtxCancel()

riversharedtest.WaitOrTimeout(t, client.Stopped())

select {
case <-jobDoneChan:
default:
t.Fatal("expected job to have been cancelled by soft stop timeout")
}
})
}

type callbackWithCustomTimeoutArgs struct {
TimeoutValue time.Duration `json:"timeout"`
}
Expand Down
112 changes: 26 additions & 86 deletions example_graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package river_test

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -44,13 +43,10 @@ func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[Wait
return ctx.Err()
}

// Example_gracefulShutdown demonstrates a realistic-looking stop loop for
// River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C
// locally or on a platform like Heroku to stop a process) and when received,
// tries a soft stop that waits for work to finish. If it doesn't finish in
// time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs
// using context cancellation. A third will give up on the stop procedure and
// exit uncleanly.
// Example_gracefulShutdown demonstrates graceful stop using SoftStopTimeout.
// When a SIGINT/SIGTERM arrives, the start context is cancelled, which
// initiates a soft stop. If running jobs don't finish within the configured
// SoftStopTimeout, their contexts are automatically cancelled (hard stop).
func Example_gracefulShutdown() {
ctx := context.Background()

Expand All @@ -66,13 +62,14 @@ func Example_gracefulShutdown() {
river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTimeJobID})),
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError, ReplaceAttr: slogutil.NoLevelTimeJobID})),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
SoftStopTimeout: 100 * time.Millisecond,
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
Expand All @@ -83,89 +80,32 @@ func Example_gracefulShutdown() {
panic(err)
}

// Use signal.NotifyContext to cancel the start context on SIGINT/SIGTERM.
// When the signal fires, the client initiates a soft stop. If running jobs
// don't finish within SoftStopTimeout, their contexts are automatically
// cancelled.
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

sigintOrTerm := make(chan os.Signal, 1)
signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)

// This is meant to be a realistic-looking stop goroutine that might go in a
// real program. It waits for SIGINT/SIGTERM and when received, tries to stop
// gracefully by allowing a chance for jobs to finish. But if that isn't
// working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and
// it'll issue a hard stop that cancels the context of all active jobs. In
// case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure
// completely and exits uncleanly.
go func() {
<-sigintOrTerm
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")

softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer softStopCtxCancel()

go func() {
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")
softStopCtxCancel()
case <-softStopCtx.Done():
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")
}
}()

err := riverClient.Stop(softStopCtx)
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
panic(err)
}
if err == nil {
fmt.Printf("Soft stop succeeded\n")
return
}

hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer hardStopCtxCancel()

// As long as all jobs respect context cancellation, StopAndCancel will
// always work. However, in the case of a bug where a job blocks despite
// being cancelled, it may be necessary to either ignore River's stop
// result (what's shown here) or have a supervisor kill the process.
err = riverClient.StopAndCancel(hardStopCtx)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")
} else if err != nil {
panic(err)
}

// hard stop succeeded
}()

// Make sure our job starts being worked before doing anything else.
<-jobStarted

// Cheat a little by sending a SIGTERM manually for the purpose of this
// example (normally this will be sent by user or supervisory process). The
// first SIGTERM tries a soft stop in which jobs are given a chance to
// finish up.
sigintOrTerm <- syscall.SIGTERM

// The soft stop will never work in this example because our job only
// respects context cancellation, but wait a short amount of time to give it
// a chance. After it elapses, send another SIGTERM to initiate a hard stop.
select {
case <-riverClient.Stopped():
// Will never be reached in this example because our job will only ever
// finish on context cancellation.
fmt.Printf("Soft stop succeeded\n")

case <-time.After(100 * time.Millisecond):
sigintOrTerm <- syscall.SIGTERM
<-riverClient.Stopped()
}
// Cheat a little by sending ourselves a SIGTERM for the purpose of this
// example (normally this is sent by user or supervisory process). The signal
// cancels the start context, initiating a soft stop.
selfProcess, _ := os.FindProcess(os.Getpid())
_ = selfProcess.Signal(syscall.SIGTERM)

// Wait for the client to stop. The job won't finish on its own (it only
// responds to context cancellation), so after SoftStopTimeout elapses the
// client will automatically cancel job contexts (hard stop).
<-riverClient.Stopped()

// Output:
// Working job that doesn't finish until cancelled
// Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)
// Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)
// Job cancelled
}
Loading