From 91cfb49150f0bfe0618de055cb945395ab1ec5a5 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 8 Mar 2026 11:37:34 +1300 Subject: [PATCH] Introduce `Async::Loop` helper. --- lib/async.rb | 1 + lib/async/loop.rb | 84 +++++++++++++++++ releases.md | 4 + test/async/loop.rb | 220 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 309 insertions(+) create mode 100644 lib/async/loop.rb create mode 100644 test/async/loop.rb diff --git a/lib/async.rb b/lib/async.rb index c3ba03df..d72be6c6 100644 --- a/lib/async.rb +++ b/lib/async.rb @@ -6,6 +6,7 @@ require_relative "async/version" require_relative "async/reactor" +require_relative "async/loop" require_relative "kernel/async" require_relative "kernel/sync" diff --git a/lib/async/loop.rb b/lib/async/loop.rb new file mode 100644 index 00000000..80ceb5e0 --- /dev/null +++ b/lib/async/loop.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "console" + +module Async + # @namespace + module Loop + # Execute a block repeatedly at quantized (time-aligned) intervals. + # + # The alignment is computed modulo the current clock time in seconds. For example, with + # `interval: 60`, executions will occur at 00:00, 01:00, 02:00, etc., regardless of when + # the loop is started. With `interval: 300` (5 minutes), executions align to 00:00, 00:05, + # 00:10, etc. + # + # This is particularly useful for tasks that should run at predictable wall-clock times, + # such as metrics collection, periodic cleanup, or scheduled jobs that need to align + # across multiple processes. + # + # If an error occurs during block execution, it is logged and the loop continues. + # + # @example Run every minute at :00 seconds: + # Async::Loop.quantized(interval: 60) do + # puts "Current time: #{Time.now}" + # end + # + # @example Run every 5 minutes aligned to the hour: + # Async::Loop.quantized(interval: 300) do + # collect_metrics + # end + # + # @parameter interval [Numeric] The interval in seconds. Executions will align to multiples of this interval based on the current time. + # @yields The block to execute at each interval. + # + # @public Since *Async v2.37*. + def self.quantized(interval: 60, &block) + while true + # Compute the wait time to the next interval: + wait = interval - (Time.now.to_f % interval) + if wait.positive? + # Sleep until the next interval boundary: + sleep(wait) + end + + begin + yield + rescue => error + Console.error(self, "Loop error:", error) + end + end + end + + # Execute a block repeatedly with a fixed delay between executions. + # + # Unlike {quantized}, this method waits for the specified interval *after* each execution + # completes. This means the actual time between the start of successive executions will be + # `interval + execution_time`. + # + # If an error occurs during block execution, it is logged and the loop continues. + # + # @example Run every 5 seconds (plus execution time): + # Async::Loop.periodic(interval: 5) do + # process_queue + # end + # + # @parameter interval [Numeric] The delay in seconds between executions. + # @yields The block to execute periodically. + # + # @public Since *Async v2.37*. + def self.periodic(interval: 60, &block) + while true + begin + yield + rescue => error + Console.error(self, "Loop error:", error) + end + + sleep(interval) + end + end + end +end diff --git a/releases.md b/releases.md index 92c46fc5..04982106 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Introduce `Async::Loop` for robust, time-aligned loops. + ## v2.36.0 - Introduce `Task#wait_all` which recursively waits for all children and self, excepting the current task. diff --git a/test/async/loop.rb b/test/async/loop.rb new file mode 100644 index 00000000..57072a9d --- /dev/null +++ b/test/async/loop.rb @@ -0,0 +1,220 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "async/loop" +require "sus/fixtures/console" + +describe Async::Loop do + include Sus::Fixtures::Console::CapturedLogger + + with ".quantized" do + it "invokes the block at aligned intervals" do + queue = Thread::Queue.new + interval = 0.1 + + thread = Thread.new do + Async::Loop.quantized(interval: interval) do + queue << Time.now.to_f + end + end + + # Collect at least 3 executions + execution_times = [] + 3.times do + execution_times << queue.pop + end + ensure + thread&.kill + thread&.join + + expect(execution_times.size).to be >= 3 + end + + it "continues after an error and logs it" do + queue = Thread::Queue.new + interval = 0.05 + + thread = Thread.new do + iteration = 0 + Async::Loop.quantized(interval: interval) do + iteration += 1 + queue << iteration + raise "test error" if iteration == 1 + end + end + + # Wait for first iteration (raises), then at least one more (succeeds) + iterations = [] + 3.times do + iterations << queue.pop + end + ensure + thread&.kill + thread&.join + + expect(iterations).to be == [1, 2, 3] + expect_console.to have_logged( + severity: be == :error, + subject: be_equal(Async::Loop), + message: be =~ /Loop error:/ + ) + end + + it "aligns executions to interval boundaries" do + queue = Thread::Queue.new + interval = 0.1 + + thread = Thread.new do + Async::Loop.quantized(interval: interval) do + queue << Time.now.to_f + end + end + + # Collect several executions + execution_times = [] + 5.times do + execution_times << queue.pop + end + ensure + thread&.kill + thread&.join + + # Verify we got the expected number of executions + expect(execution_times.size).to be == 5 + end + end + + with ".periodic" do + it "executes the block repeatedly with fixed delays" do + queue = Thread::Queue.new + interval = 0.05 + + thread = Thread.new do + Async::Loop.periodic(interval: interval) do + queue << Time.now.to_f + end + end + + # Collect at least 3 executions + execution_times = [] + 3.times do + execution_times << queue.pop + end + ensure + thread&.kill + thread&.join + + expect(execution_times.size).to be == 3 + end + + it "waits after each execution completes" do + queue = Thread::Queue.new + interval = 0.05 + + thread = Thread.new do + Async::Loop.periodic(interval: interval) do + queue << Time.now.to_f + end + end + + # Collect several executions + execution_times = [] + 5.times do + execution_times << queue.pop + end + ensure + thread&.kill + thread&.join + + # Check that there's at least 'interval' time between executions + gaps = execution_times.each_cons(2).map{|a, b| b - a} + + gaps.each do |gap| + expect(gap).to be >= interval + end + end + + it "continues after an error and logs it" do + queue = Thread::Queue.new + interval = 0.05 + + thread = Thread.new do + iteration = 0 + Async::Loop.periodic(interval: interval) do + iteration += 1 + queue << iteration + raise "periodic error" if iteration == 2 + end + end + + # Collect iterations including the one that errors + iterations = [] + 4.times do + iterations << queue.pop + end + ensure + thread&.kill + thread&.join + + expect(iterations).to be == [1, 2, 3, 4] + expect_console.to have_logged( + severity: be == :error, + subject: be_equal(Async::Loop), + message: be =~ /Loop error:/ + ) + end + + it "executes immediately on first iteration" do + queue = Thread::Queue.new + start_time = Time.now.to_f + + thread = Thread.new do + Async::Loop.periodic(interval: 0.1) do + queue << Time.now.to_f + end + end + + # Get the first execution time + first_execution_time = queue.pop + ensure + thread&.kill + thread&.join + + # The first execution should happen almost immediately + elapsed = first_execution_time - start_time + expect(elapsed).to be < 0.05 + end + + it "accounts for execution time in the interval" do + queue = Thread::Queue.new + execution_duration = 0.03 + interval = 0.05 + + thread = Thread.new do + Async::Loop.periodic(interval: interval) do + queue << Time.now.to_f + sleep(execution_duration) + end + end + + # Collect several executions + execution_times = [] + 4.times do + execution_times << queue.pop + end + ensure + thread&.kill + thread&.join + + # Time between starts should be at least interval + execution_duration + gaps = execution_times.each_cons(2).map{|a, b| b - a} + expected_minimum = interval + execution_duration + + gaps.each do |gap| + expect(gap).to be >= expected_minimum + end + end + end +end