Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require_relative "async/version"
require_relative "async/reactor"
require_relative "async/loop"

require_relative "kernel/async"
require_relative "kernel/sync"
Expand Down
84 changes: 84 additions & 0 deletions lib/async/loop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "console"

module Async
# @namespace
module Loop
# Execute a block repeatedly at quantized (time-aligned) intervals.
#
# The alignment is computed modulo the current clock time in seconds. For example, with
# `interval: 60`, executions will occur at 00:00, 01:00, 02:00, etc., regardless of when
# the loop is started. With `interval: 300` (5 minutes), executions align to 00:00, 00:05,
# 00:10, etc.
#
# This is particularly useful for tasks that should run at predictable wall-clock times,
# such as metrics collection, periodic cleanup, or scheduled jobs that need to align
# across multiple processes.
#
# If an error occurs during block execution, it is logged and the loop continues.
#
# @example Run every minute at :00 seconds:
# Async::Loop.quantized(interval: 60) do
# puts "Current time: #{Time.now}"
# end
#
# @example Run every 5 minutes aligned to the hour:
# Async::Loop.quantized(interval: 300) do
# collect_metrics
# end
#
# @parameter interval [Numeric] The interval in seconds. Executions will align to multiples of this interval based on the current time.
# @yields The block to execute at each interval.
#
# @public Since *Async v2.37*.
def self.quantized(interval: 60, &block)
while true
# Compute the wait time to the next interval:
wait = interval - (Time.now.to_f % interval)
if wait.positive?
# Sleep until the next interval boundary:
sleep(wait)
end

begin
yield
rescue => error
Console.error(self, "Loop error:", error)
end
end
end

# Execute a block repeatedly with a fixed delay between executions.
#
# Unlike {quantized}, this method waits for the specified interval *after* each execution
# completes. This means the actual time between the start of successive executions will be
# `interval + execution_time`.
#
# If an error occurs during block execution, it is logged and the loop continues.
#
# @example Run every 5 seconds (plus execution time):
# Async::Loop.periodic(interval: 5) do
# process_queue
# end
#
# @parameter interval [Numeric] The delay in seconds between executions.
# @yields The block to execute periodically.
#
# @public Since *Async v2.37*.
def self.periodic(interval: 60, &block)
while true
begin
yield
rescue => error
Console.error(self, "Loop error:", error)
end

sleep(interval)
end
end
end
end
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- Introduce `Async::Loop` for robust, time-aligned loops.

## v2.36.0

- Introduce `Task#wait_all` which recursively waits for all children and self, excepting the current task.
Expand Down
220 changes: 220 additions & 0 deletions test/async/loop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Samuel Williams.

require "async/loop"
require "sus/fixtures/console"

describe Async::Loop do
include Sus::Fixtures::Console::CapturedLogger

with ".quantized" do
it "invokes the block at aligned intervals" do
queue = Thread::Queue.new
interval = 0.1

thread = Thread.new do
Async::Loop.quantized(interval: interval) do
queue << Time.now.to_f
end
end

# Collect at least 3 executions
execution_times = []
3.times do
execution_times << queue.pop
end
ensure
thread&.kill
thread&.join

expect(execution_times.size).to be >= 3
end

it "continues after an error and logs it" do
queue = Thread::Queue.new
interval = 0.05

thread = Thread.new do
iteration = 0
Async::Loop.quantized(interval: interval) do
iteration += 1
queue << iteration
raise "test error" if iteration == 1
end
end

# Wait for first iteration (raises), then at least one more (succeeds)
iterations = []
3.times do
iterations << queue.pop
end
ensure
thread&.kill
thread&.join

expect(iterations).to be == [1, 2, 3]
expect_console.to have_logged(
severity: be == :error,
subject: be_equal(Async::Loop),
message: be =~ /Loop error:/
)
end

it "aligns executions to interval boundaries" do
queue = Thread::Queue.new
interval = 0.1

thread = Thread.new do
Async::Loop.quantized(interval: interval) do
queue << Time.now.to_f
end
end

# Collect several executions
execution_times = []
5.times do
execution_times << queue.pop
end
ensure
thread&.kill
thread&.join

# Verify we got the expected number of executions
expect(execution_times.size).to be == 5
end
end

with ".periodic" do
it "executes the block repeatedly with fixed delays" do
queue = Thread::Queue.new
interval = 0.05

thread = Thread.new do
Async::Loop.periodic(interval: interval) do
queue << Time.now.to_f
end
end

# Collect at least 3 executions
execution_times = []
3.times do
execution_times << queue.pop
end
ensure
thread&.kill
thread&.join

expect(execution_times.size).to be == 3
end

it "waits after each execution completes" do
queue = Thread::Queue.new
interval = 0.05

thread = Thread.new do
Async::Loop.periodic(interval: interval) do
queue << Time.now.to_f
end
end

# Collect several executions
execution_times = []
5.times do
execution_times << queue.pop
end
ensure
thread&.kill
thread&.join

# Check that there's at least 'interval' time between executions
gaps = execution_times.each_cons(2).map{|a, b| b - a}

gaps.each do |gap|
expect(gap).to be >= interval
end
end

it "continues after an error and logs it" do
queue = Thread::Queue.new
interval = 0.05

thread = Thread.new do
iteration = 0
Async::Loop.periodic(interval: interval) do
iteration += 1
queue << iteration
raise "periodic error" if iteration == 2
end
end

# Collect iterations including the one that errors
iterations = []
4.times do
iterations << queue.pop
end
ensure
thread&.kill
thread&.join

expect(iterations).to be == [1, 2, 3, 4]
expect_console.to have_logged(
severity: be == :error,
subject: be_equal(Async::Loop),
message: be =~ /Loop error:/
)
end

it "executes immediately on first iteration" do
queue = Thread::Queue.new
start_time = Time.now.to_f

thread = Thread.new do
Async::Loop.periodic(interval: 0.1) do
queue << Time.now.to_f
end
end

# Get the first execution time
first_execution_time = queue.pop
ensure
thread&.kill
thread&.join

# The first execution should happen almost immediately
elapsed = first_execution_time - start_time
expect(elapsed).to be < 0.05
end

it "accounts for execution time in the interval" do
queue = Thread::Queue.new
execution_duration = 0.03
interval = 0.05

thread = Thread.new do
Async::Loop.periodic(interval: interval) do
queue << Time.now.to_f
sleep(execution_duration)
end
end

# Collect several executions
execution_times = []
4.times do
execution_times << queue.pop
end
ensure
thread&.kill
thread&.join

# Time between starts should be at least interval + execution_duration
gaps = execution_times.each_cons(2).map{|a, b| b - a}
expected_minimum = interval + execution_duration

gaps.each do |gap|
expect(gap).to be >= expected_minimum
end
end
end
end
Loading