diff --git a/client.go b/client.go index d4e0ec40..54dc4802 100644 --- a/client.go +++ b/client.go @@ -328,6 +328,32 @@ type Config struct { // setting of Postgres `search_path`. Schema string + // SoftStopTimeout is the maximum amount of time that the client will wait + // for running jobs to finish during a stop before their contexts are + // cancelled. After the timeout elapses, the client escalates to a hard stop + // by cancelling the context of all running jobs. This applies regardless of + // how stop is initiated — whether by calling Stop, StopAndCancel, or by + // cancelling the context passed to Start. + // + // In combination with signal.NotifyContext on the context passed to Start, + // this can simplify graceful stop to: + // + // ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + // defer stop() + // + // if err := client.Start(ctx); err != nil { ... } + // <-client.Stopped() + // + // The signal cancels the Start context, which initiates a soft stop. If + // running jobs haven't finished after SoftStopTimeout, their contexts are + // automatically cancelled to trigger a hard stop. + // + // StopAndCancel bypasses the timeout entirely and cancels job contexts + // immediately. + // + // Defaults to no timeout (wait indefinitely for jobs to finish). + SoftStopTimeout time.Duration + // SkipJobKindValidation causes the job kind format validation check to be // skipped. This is available as an interim stopgap for users that have // invalid job kind names, but would rather disable the check rather than @@ -457,6 +483,7 @@ func (c *Config) WithDefaults() *Config { RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter), RetryPolicy: retryPolicy, Schema: c.Schema, + SoftStopTimeout: c.SoftStopTimeout, SkipJobKindValidation: c.SkipJobKindValidation, SkipUnknownJobCheck: c.SkipUnknownJobCheck, Test: c.Test, @@ -1081,10 +1108,19 @@ func (c *Client[TTx]) Start(ctx context.Context) error { return err } - // We use separate contexts for fetching and working to allow for a graceful - // stop. Both inherit from the provided context, so if it's cancelled, a - // more aggressive stop will be initiated. - workCtx, workCancel := context.WithCancelCause(ctx) + // We use separate contexts for fetching and working to allow for a + // graceful stop. When SoftStopTimeout is configured, the work context + // is detached from the start context so that cancelling the start + // context initiates a soft stop (with timeout escalation) rather than + // an immediate hard stop. When SoftStopTimeout is not configured, the + // work context inherits from the start context to preserve the + // existing behavior where cancelling the start context is equivalent + // to StopAndCancel. + workParentCtx := ctx + if c.config.SoftStopTimeout > 0 { + workParentCtx = context.WithoutCancel(ctx) + } + workCtx, workCancel := context.WithCancelCause(workParentCtx) // Client available to executors and to various service hooks. fetchCtx := withClient(fetchCtx, c) @@ -1148,6 +1184,18 @@ func (c *Client[TTx]) Start(ctx context.Context) error { c.queues.startStopMu.Lock() defer c.queues.startStopMu.Unlock() + // If SoftStopTimeout is configured, start a timer that will cancel + // the work context (escalating to a hard stop) if producers don't + // finish in time. StopAndCancel also calls workCancel, in which case + // this timer is a harmless no-op because the context is already done. + if c.config.SoftStopTimeout > 0 { + softStopTimer := time.AfterFunc(c.config.SoftStopTimeout, func() { + c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Soft stop timeout; cancelling remaining job contexts", slog.Duration("soft_stop_timeout", c.config.SoftStopTimeout)) + c.workCancel(rivercommon.ErrStop) + }) + defer softStopTimer.Stop() + } + // On stop, have the producers stop fetching first of all. c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers") startstop.StopAllParallel(producersAsServices()...) @@ -1180,6 +1228,10 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // complete before exiting. If the provided context is done before shutdown has // completed, Stop will return immediately with the context's error. // +// If SoftStopTimeout is configured, running job contexts will be automatically +// cancelled after the timeout elapses, escalating to a hard stop. This also +// applies when stop is initiated by cancelling the context passed to Start. +// // There's no need to call this method if a hard stop has already been initiated // by cancelling the context passed to Start or by calling StopAndCancel. func (c *Client[TTx]) Stop(ctx context.Context) error { @@ -1208,6 +1260,12 @@ func (c *Client[TTx]) Stop(ctx context.Context) error { // This can also be initiated by cancelling the context passed to Start. There is // no need to call this method if the context passed to Start is cancelled // instead. +// +// In most cases, using Stop with SoftStopTimeout configured is preferable to +// calling StopAndCancel directly. SoftStopTimeout gives running jobs a chance +// to finish before automatically escalating to context cancellation, providing +// graceful stop semantics without requiring manual orchestration of Stop and +// StopAndCancel. func (c *Client[TTx]) StopAndCancel(ctx context.Context) error { c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work") c.workCancel(rivercommon.ErrStop) diff --git a/client_test.go b/client_test.go index f8d035c4..39395529 100644 --- a/client_test.go +++ b/client_test.go @@ -1933,6 +1933,122 @@ func Test_Client_StopAndCancel(t *testing.T) { }) } +func Test_Client_SoftStopTimeout(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + t.Run("EscalatesToHardStopAfterTimeout", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.SoftStopTimeout = 100 * time.Millisecond + + jobDoneChan := make(chan struct{}) + jobStartedChan := make(chan struct{}) + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + close(jobStartedChan) + <-ctx.Done() // only finishes when context is cancelled + close(jobDoneChan) + return nil + })) + + client := runNewTestClient(ctx, t, config) + + _, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + riversharedtest.WaitOrTimeout(t, jobStartedChan) + + // Stop initiates a soft stop. The job won't finish on its own, but + // SoftStopTimeout should escalate to a hard stop after 100ms. + require.NoError(t, client.Stop(ctx)) + + // Verify the job's context was indeed cancelled. + select { + case <-jobDoneChan: + default: + t.Fatal("expected job to have been cancelled by soft stop timeout") + } + }) + + t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.SoftStopTimeout = 5 * time.Second + + jobStartedChan := make(chan struct{}) + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + close(jobStartedChan) + return nil // finishes immediately + })) + + client := runNewTestClient(ctx, t, config) + + _, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + riversharedtest.WaitOrTimeout(t, jobStartedChan) + + // Stop should succeed quickly since the job finishes on its own. + // The 5s timeout should not fire. + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("ContextCancellationEscalatesAfterTimeout", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.SoftStopTimeout = 100 * time.Millisecond + + jobDoneChan := make(chan struct{}) + jobStartedChan := make(chan struct{}) + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + close(jobStartedChan) + <-ctx.Done() + close(jobDoneChan) + return nil + })) + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + config.Schema = schema + + client, err := NewClient(driver, config) + require.NoError(t, err) + + startCtx, startCtxCancel := context.WithCancel(ctx) + defer startCtxCancel() + + require.NoError(t, client.Start(startCtx)) + + _, err = client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + riversharedtest.WaitOrTimeout(t, jobStartedChan) + + // Cancel the start context. This should initiate a soft stop, then + // escalate to hard stop after SoftStopTimeout. + startCtxCancel() + + riversharedtest.WaitOrTimeout(t, client.Stopped()) + + select { + case <-jobDoneChan: + default: + t.Fatal("expected job to have been cancelled by soft stop timeout") + } + }) +} + type callbackWithCustomTimeoutArgs struct { TimeoutValue time.Duration `json:"timeout"` } diff --git a/example_graceful_shutdown_test.go b/example_graceful_shutdown_test.go index 5c281007..c55c3abc 100644 --- a/example_graceful_shutdown_test.go +++ b/example_graceful_shutdown_test.go @@ -2,7 +2,6 @@ package river_test import ( "context" - "errors" "fmt" "log/slog" "os" @@ -44,13 +43,10 @@ func (w *WaitsForCancelOnlyWorker) Work(ctx context.Context, job *river.Job[Wait return ctx.Err() } -// Example_gracefulShutdown demonstrates a realistic-looking stop loop for -// River. It listens for SIGINT/SIGTERM (like might be received by a Ctrl+C -// locally or on a platform like Heroku to stop a process) and when received, -// tries a soft stop that waits for work to finish. If it doesn't finish in -// time, a second SIGINT/SIGTERM will initiate a hard stop that cancels all jobs -// using context cancellation. A third will give up on the stop procedure and -// exit uncleanly. +// Example_gracefulShutdown demonstrates graceful stop using SoftStopTimeout. +// When a SIGINT/SIGTERM arrives, the start context is cancelled, which +// initiates a soft stop. If running jobs don't finish within the configured +// SoftStopTimeout, their contexts are automatically cancelled (hard stop). func Example_gracefulShutdown() { ctx := context.Background() @@ -66,13 +62,14 @@ func Example_gracefulShutdown() { river.AddWorker(workers, &WaitsForCancelOnlyWorker{jobStarted: jobStarted}) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ - Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTimeJobID})), + Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError, ReplaceAttr: slogutil.NoLevelTimeJobID})), Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, - Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test - TestOnly: true, // suitable only for use in tests; remove for live environments - Workers: workers, + Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test + SoftStopTimeout: 100 * time.Millisecond, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, }) if err != nil { panic(err) @@ -83,89 +80,32 @@ func Example_gracefulShutdown() { panic(err) } + // Use signal.NotifyContext to cancel the start context on SIGINT/SIGTERM. + // When the signal fires, the client initiates a soft stop. If running jobs + // don't finish within SoftStopTimeout, their contexts are automatically + // cancelled. + ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + if err := riverClient.Start(ctx); err != nil { panic(err) } - sigintOrTerm := make(chan os.Signal, 1) - signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM) - - // This is meant to be a realistic-looking stop goroutine that might go in a - // real program. It waits for SIGINT/SIGTERM and when received, tries to stop - // gracefully by allowing a chance for jobs to finish. But if that isn't - // working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and - // it'll issue a hard stop that cancels the context of all active jobs. In - // case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure - // completely and exits uncleanly. - go func() { - <-sigintOrTerm - fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") - - softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) - defer softStopCtxCancel() - - go func() { - select { - case <-sigintOrTerm: - fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") - softStopCtxCancel() - case <-softStopCtx.Done(): - fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") - } - }() - - err := riverClient.Stop(softStopCtx) - if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - panic(err) - } - if err == nil { - fmt.Printf("Soft stop succeeded\n") - return - } - - hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) - defer hardStopCtxCancel() - - // As long as all jobs respect context cancellation, StopAndCancel will - // always work. However, in the case of a bug where a job blocks despite - // being cancelled, it may be necessary to either ignore River's stop - // result (what's shown here) or have a supervisor kill the process. - err = riverClient.StopAndCancel(hardStopCtx) - if err != nil && errors.Is(err, context.DeadlineExceeded) { - fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n") - } else if err != nil { - panic(err) - } - - // hard stop succeeded - }() - // Make sure our job starts being worked before doing anything else. <-jobStarted - // Cheat a little by sending a SIGTERM manually for the purpose of this - // example (normally this will be sent by user or supervisory process). The - // first SIGTERM tries a soft stop in which jobs are given a chance to - // finish up. - sigintOrTerm <- syscall.SIGTERM - - // The soft stop will never work in this example because our job only - // respects context cancellation, but wait a short amount of time to give it - // a chance. After it elapses, send another SIGTERM to initiate a hard stop. - select { - case <-riverClient.Stopped(): - // Will never be reached in this example because our job will only ever - // finish on context cancellation. - fmt.Printf("Soft stop succeeded\n") - - case <-time.After(100 * time.Millisecond): - sigintOrTerm <- syscall.SIGTERM - <-riverClient.Stopped() - } + // Cheat a little by sending ourselves a SIGTERM for the purpose of this + // example (normally this is sent by user or supervisory process). The signal + // cancels the start context, initiating a soft stop. + selfProcess, _ := os.FindProcess(os.Getpid()) + _ = selfProcess.Signal(syscall.SIGTERM) + + // Wait for the client to stop. The job won't finish on its own (it only + // responds to context cancellation), so after SoftStopTimeout elapses the + // client will automatically cancel job contexts (hard stop). + <-riverClient.Stopped() // Output: // Working job that doesn't finish until cancelled - // Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish) - // Received SIGINT/SIGTERM again; initiating hard stop (cancel everything) // Job cancelled }