diff --git a/src/horizon/src/Connectors/RedisConnector.php b/src/horizon/src/Connectors/RedisConnector.php index 01b0cc059..3b90d4d09 100644 --- a/src/horizon/src/Connectors/RedisConnector.php +++ b/src/horizon/src/Connectors/RedisConnector.php @@ -21,7 +21,7 @@ public function connect(array $config): RedisQueue Arr::get($config, 'connection', $this->connection), Arr::get($config, 'retry_after', 60), Arr::get($config, 'block_for', null), - Arr::get($config, 'after_commit', null) + Arr::get($config, 'after_commit', false) ); } } diff --git a/src/queue/src/BackgroundQueue.php b/src/queue/src/BackgroundQueue.php index cbc945e24..6f305c3cb 100644 --- a/src/queue/src/BackgroundQueue.php +++ b/src/queue/src/BackgroundQueue.php @@ -4,6 +4,9 @@ namespace Hypervel\Queue; +use DateInterval; +use DateTimeInterface; +use Hypervel\Coordinator\Timer; use Hypervel\Coroutine\Coroutine; use Throwable; @@ -17,23 +20,38 @@ class BackgroundQueue extends SyncQueue protected $exceptionCallback; /** - * Push a new job onto the queue. + * The timer used to schedule delayed jobs. */ - public function push(object|string $job, mixed $data = '', ?string $queue = null): mixed + protected Timer $timer; + + /** + * Create a new background queue instance. + */ + public function __construct( + bool $dispatchAfterCommit = false, + ?Timer $timer = null + ) { + parent::__construct($dispatchAfterCommit); + $this->timer = $timer ?? new Timer; + } + + /** + * Push a new job onto the queue after (n) seconds. + */ + public function later(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data = '', ?string $queue = null): mixed { - if ( - $this->shouldDispatchAfterCommit($job) + if ($this->shouldDispatchAfterCommit($job) && $this->container->has('db.transactions') ) { + $this->addUniqueJobRollbackCallback($job); + return $this->container->make('db.transactions') ->addCallback( - fn () => $this->executeJob($job, $data, $queue) + fn () => $this->scheduleTimer($delay, $job, $data, $queue) ); } - $this->executeJob($job, $data, $queue); - - return null; + return $this->scheduleTimer($delay, $job, $data, $queue); } /** @@ -46,6 +64,27 @@ public function setExceptionCallback(?callable $callback): static return $this; } + /** + * Schedule the timer that will execute the job after the delay. + * + * Skips execution when the worker is closing — pending delayed jobs are + * dropped rather than racing against shutdown cleanup. Devs needing + * durability across worker restarts should use a persistent queue. + */ + protected function scheduleTimer(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data, ?string $queue): int + { + return $this->timer->after( + max(0.0, (float) $this->secondsUntil($delay)), + function (bool $isClosing = false) use ($job, $data, $queue) { + if ($isClosing) { + return; + } + + $this->executeJob($job, $data, $queue); + } + ); + } + /** * Execute a new job in the background queue. */ diff --git a/src/queue/src/BeanstalkdQueue.php b/src/queue/src/BeanstalkdQueue.php index d76792189..5a48604aa 100644 --- a/src/queue/src/BeanstalkdQueue.php +++ b/src/queue/src/BeanstalkdQueue.php @@ -31,12 +31,8 @@ public function __construct( protected string $default, protected int $timeToRun, protected int $blockFor = 0, - protected ?bool $dispatchAfterCommit = false + bool $dispatchAfterCommit = false ) { - $this->default = $default; - $this->blockFor = $blockFor; - $this->timeToRun = $timeToRun; - $this->pheanstalk = $pheanstalk; $this->dispatchAfterCommit = $dispatchAfterCommit; } diff --git a/src/queue/src/Connectors/SqsConnector.php b/src/queue/src/Connectors/SqsConnector.php index b9537158e..637ef7f07 100644 --- a/src/queue/src/Connectors/SqsConnector.php +++ b/src/queue/src/Connectors/SqsConnector.php @@ -33,7 +33,7 @@ public function connect(array $config): Queue $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? '', - $config['after_commit'] ?? null + $config['after_commit'] ?? false ); } diff --git a/src/queue/src/DatabaseQueue.php b/src/queue/src/DatabaseQueue.php index 66ef8665a..9b7b285e1 100644 --- a/src/queue/src/DatabaseQueue.php +++ b/src/queue/src/DatabaseQueue.php @@ -37,7 +37,7 @@ public function __construct( protected string $table, protected string $default = 'default', protected ?int $retryAfter = 60, - protected ?bool $dispatchAfterCommit = false + protected bool $dispatchAfterCommit = false ) { } diff --git a/src/queue/src/DeferredQueue.php b/src/queue/src/DeferredQueue.php index 1457747ae..c0287edb2 100644 --- a/src/queue/src/DeferredQueue.php +++ b/src/queue/src/DeferredQueue.php @@ -7,7 +7,7 @@ use DateInterval; use DateTimeInterface; use Hypervel\Coordinator\Timer; -use Hypervel\Engine\Coroutine; +use Hypervel\Coroutine\Coroutine; use Throwable; class DeferredQueue extends SyncQueue @@ -19,71 +19,87 @@ class DeferredQueue extends SyncQueue */ protected $exceptionCallback; + /** + * The timer used to schedule delayed jobs. + */ + protected Timer $timer; + /** * Create a new deferred queue instance. */ public function __construct( - protected ?bool $dispatchAfterCommit = false, - protected ?Timer $timer = null + bool $dispatchAfterCommit = false, + ?Timer $timer = null ) { - if (! $this->timer) { - $this->timer = new Timer; - } + parent::__construct($dispatchAfterCommit); + $this->timer = $timer ?? new Timer; } /** - * Push a new job onto the queue. + * Push a new job onto the queue after (n) seconds. */ - public function push(object|string $job, mixed $data = '', ?string $queue = null): mixed + public function later(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data = '', ?string $queue = null): mixed { if ($this->shouldDispatchAfterCommit($job) && $this->container->has('db.transactions') ) { + $this->addUniqueJobRollbackCallback($job); + return $this->container->make('db.transactions') ->addCallback( - fn () => $this->deferJob($job, $data, $queue) + fn () => $this->scheduleTimer($delay, $job, $data, $queue) ); } - $this->deferJob($job, $data, $queue); - - return null; + return $this->scheduleTimer($delay, $job, $data, $queue); } /** - * Push a new job onto the queue after (n) seconds. + * Set the exception callback for the deferred queue. */ - public function later(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data = '', ?string $queue = null): mixed + public function setExceptionCallback(?callable $callback): static { - return $this->timer->after( - (float) $this->secondsUntil($delay), - fn () => $this->deferJob($job, $data, $queue) - ); + $this->exceptionCallback = $callback; + + return $this; } /** - * Set the exception callback for the deferred queue. + * Schedule the timer that will execute the job after the delay. + * + * Skips execution when the worker is closing — pending delayed jobs are + * dropped rather than racing against shutdown cleanup. Devs needing + * durability across worker restarts should use a persistent queue. */ - public function setExceptionCallback(?callable $callback): static + protected function scheduleTimer(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data, ?string $queue): int { - $this->exceptionCallback = $callback; + return $this->timer->after( + max(0.0, (float) $this->secondsUntil($delay)), + function (bool $isClosing = false) use ($job, $data, $queue) { + if ($isClosing) { + return; + } - return $this; + $this->executeJob($job, $data, $queue); + } + ); } /** * Defer a new job onto the deferred queue. */ - protected function deferJob(object|string $job, mixed $data = '', ?string $queue = null): void + protected function executeJob(object|string $job, mixed $data = '', ?string $queue = null): int { Coroutine::defer(function () use ($job, $data, $queue) { try { - $this->executeJob($job, $data, $queue); + parent::executeJob($job, $data, $queue); } catch (Throwable $e) { if ($this->exceptionCallback) { ($this->exceptionCallback)($e); } } }); + + return 0; } } diff --git a/src/queue/src/NullQueue.php b/src/queue/src/NullQueue.php index 683e18c0a..5bb08fefe 100644 --- a/src/queue/src/NullQueue.php +++ b/src/queue/src/NullQueue.php @@ -48,7 +48,7 @@ public function reservedSize(?string $queue = null): int */ public function creationTimeOfOldestPendingJob(?string $queue = null): ?int { - return 0; + return null; } /** diff --git a/src/queue/src/Queue.php b/src/queue/src/Queue.php index ec60f8607..6733f33fa 100644 --- a/src/queue/src/Queue.php +++ b/src/queue/src/Queue.php @@ -55,7 +55,7 @@ abstract class Queue /** * Indicates that jobs should be dispatched after all database transactions have committed. */ - protected ?bool $dispatchAfterCommit = false; + protected bool $dispatchAfterCommit = false; /** * The create payload callbacks. @@ -314,13 +314,7 @@ protected function enqueueUsing(object|string $job, ?string $payload, ?string $q if ($this->shouldDispatchAfterCommit($job) && $this->container->has('db.transactions') ) { - if ($job instanceof ShouldBeUnique) { - $this->container->make('db.transactions')->addCallbackForRollback( - function () use ($job) { - (new UniqueLock($this->container->make(Cache::class)))->release($job); - } - ); - } + $this->addUniqueJobRollbackCallback($job); return $this->container->make('db.transactions') ->addCallback( @@ -356,7 +350,23 @@ protected function shouldDispatchAfterCommit(object|string $job): bool return $job->afterCommit; } - return $this->dispatchAfterCommit ?? false; + return $this->dispatchAfterCommit; + } + + /** + * Register a transaction rollback callback that releases the unique lock for the given job. + */ + protected function addUniqueJobRollbackCallback(object|string $job): void + { + if (! $job instanceof ShouldBeUnique) { + return; + } + + $this->container->make('db.transactions')->addCallbackForRollback( + function () use ($job) { + (new UniqueLock($this->container->make(Cache::class)))->release($job); + } + ); } /** diff --git a/src/queue/src/RedisQueue.php b/src/queue/src/RedisQueue.php index 11aaa9df1..384b601e7 100644 --- a/src/queue/src/RedisQueue.php +++ b/src/queue/src/RedisQueue.php @@ -39,7 +39,7 @@ public function __construct( protected ?string $connection = null, protected ?int $retryAfter = 60, protected ?int $blockFor = null, - protected ?bool $dispatchAfterCommit = false, + protected bool $dispatchAfterCommit = false, protected int $migrationBatchSize = -1 ) { } diff --git a/src/queue/src/SqsQueue.php b/src/queue/src/SqsQueue.php index 036a12533..e9ec8f968 100644 --- a/src/queue/src/SqsQueue.php +++ b/src/queue/src/SqsQueue.php @@ -28,12 +28,8 @@ public function __construct( protected string $default, protected string $prefix = '', protected string $suffix = '', - protected ?bool $dispatchAfterCommit = false + bool $dispatchAfterCommit = false ) { - $this->sqs = $sqs; - $this->prefix = $prefix; - $this->default = $default; - $this->suffix = $suffix; $this->dispatchAfterCommit = $dispatchAfterCommit; } diff --git a/src/queue/src/SyncQueue.php b/src/queue/src/SyncQueue.php index 59018efc2..1382884e1 100644 --- a/src/queue/src/SyncQueue.php +++ b/src/queue/src/SyncQueue.php @@ -6,11 +6,8 @@ use DateInterval; use DateTimeInterface; -use Hypervel\Bus\UniqueLock; -use Hypervel\Contracts\Cache\Repository as Cache; use Hypervel\Contracts\Queue\Job as JobContract; use Hypervel\Contracts\Queue\Queue as QueueContract; -use Hypervel\Contracts\Queue\ShouldBeUnique; use Hypervel\Queue\Events\JobAttempted; use Hypervel\Queue\Events\JobExceptionOccurred; use Hypervel\Queue\Events\JobProcessed; @@ -24,7 +21,7 @@ class SyncQueue extends Queue implements QueueContract * Create a new sync queue instance. */ public function __construct( - protected ?bool $dispatchAfterCommit = false + protected bool $dispatchAfterCommit = false ) { } @@ -78,13 +75,7 @@ public function push(object|string $job, mixed $data = '', ?string $queue = null if ($this->shouldDispatchAfterCommit($job) && $this->container->has('db.transactions') ) { - if ($job instanceof ShouldBeUnique) { - $this->container->make('db.transactions')->addCallbackForRollback( - function () use ($job) { - (new UniqueLock($this->container->make(Cache::class)))->release($job); - } - ); - } + $this->addUniqueJobRollbackCallback($job); return $this->container->make('db.transactions') ->addCallback( diff --git a/tests/Horizon/Unit/RedisConnectorTest.php b/tests/Horizon/Unit/RedisConnectorTest.php new file mode 100644 index 000000000..fbc781aa0 --- /dev/null +++ b/tests/Horizon/Unit/RedisConnectorTest.php @@ -0,0 +1,26 @@ +connect([ + 'queue' => 'default', + ]); + + $this->assertInstanceOf(RedisQueue::class, $queue); + } +} diff --git a/tests/Queue/QueueBackgroundQueueTest.php b/tests/Queue/QueueBackgroundQueueTest.php index cde0c5d3c..f00111bcd 100644 --- a/tests/Queue/QueueBackgroundQueueTest.php +++ b/tests/Queue/QueueBackgroundQueueTest.php @@ -4,15 +4,19 @@ namespace Hypervel\Tests\Queue; +use DateInterval; use Exception; use Hypervel\Container\Container; use Hypervel\Contracts\Events\Dispatcher; use Hypervel\Contracts\Queue\QueueableEntity; +use Hypervel\Contracts\Queue\ShouldBeUnique; use Hypervel\Contracts\Queue\ShouldQueueAfterCommit; +use Hypervel\Coordinator\Timer; use Hypervel\Database\DatabaseTransactionsManager; use Hypervel\Queue\BackgroundQueue; use Hypervel\Queue\InteractsWithQueue; use Hypervel\Queue\Jobs\SyncJob; +use Hypervel\Support\Carbon; use Hypervel\Tests\TestCase; use Mockery as m; @@ -90,6 +94,280 @@ public function testItAddsATransactionCallbackForInterfaceBasedAfterCommitJobs() run(fn () => $background->push(new BackgroundQueueAfterCommitInterfaceJob)); } + public function testItAddsATransactionCallbackForAfterCommitUniqueJobs() + { + $background = new BackgroundQueue; + $background->setConnectionName('background'); + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + + $background->setContainer($container); + run(fn () => $background->push(new BackgroundQueueAfterCommitUniqueJob)); + } + + public function testItAddsATransactionCallbackForInterfaceBasedAfterCommitUniqueJobs() + { + $background = new BackgroundQueue; + $background->setConnectionName('background'); + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + + $background->setContainer($container); + run(fn () => $background->push(new BackgroundQueueAfterCommitInterfaceUniqueJob)); + } + + public function testLaterSchedulesJobWithDelay() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(5.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(); + return 1; + }); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + $background->setContainer($this->getContainer()); + + unset($_SERVER['__background.later.test']); + + run(fn () => $background->later(5, BackgroundQueueLaterTestHandler::class, ['foo' => 'bar'])); + + $this->assertInstanceOf(SyncJob::class, $_SERVER['__background.later.test'][0]); + $this->assertEquals(['foo' => 'bar'], $_SERVER['__background.later.test'][1]); + } + + public function testLaterWithDateInterval() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(10.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(); + return 1; + }); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + $background->setContainer($this->getContainer()); + + unset($_SERVER['__background.later.test']); + + run(fn () => $background->later(new DateInterval('PT10S'), BackgroundQueueLaterTestHandler::class, ['baz' => 'qux'])); + + $this->assertInstanceOf(SyncJob::class, $_SERVER['__background.later.test'][0]); + $this->assertEquals(['baz' => 'qux'], $_SERVER['__background.later.test'][1]); + } + + public function testLaterWithDateTime() + { + Carbon::setTestNow('2024-01-01 12:00:00'); + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(15.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(); + return 1; + }); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + $background->setContainer($this->getContainer()); + + unset($_SERVER['__background.later.test']); + + run(fn () => $background->later(Carbon::parse('2024-01-01 12:00:15'), BackgroundQueueLaterTestHandler::class, ['test' => 'data'])); + + $this->assertInstanceOf(SyncJob::class, $_SERVER['__background.later.test'][0]); + $this->assertEquals(['test' => 'data'], $_SERVER['__background.later.test'][1]); + + Carbon::setTestNow(); + } + + public function testLaterAddsTransactionCallbackForAfterCommitJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $container->instance('db.transactions', $transactionManager); + $background->setContainer($container); + + run(fn () => $background->later(5, new BackgroundQueueAfterCommitJob)); + } + + public function testLaterAddsTransactionCallbackForInterfaceBasedAfterCommitJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $container->instance('db.transactions', $transactionManager); + $background->setContainer($container); + + run(fn () => $background->later(5, new BackgroundQueueAfterCommitInterfaceJob)); + } + + public function testLaterAddsTransactionCallbackForAfterCommitUniqueJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + $background->setContainer($container); + + run(fn () => $background->later(5, new BackgroundQueueAfterCommitUniqueJob)); + } + + public function testLaterAddsTransactionCallbackForInterfaceBasedAfterCommitUniqueJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + $background->setContainer($container); + + run(fn () => $background->later(5, new BackgroundQueueAfterCommitInterfaceUniqueJob)); + } + + public function testLaterClampsNegativeIntegerDelay() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(0.0, m::type('Closure'))->andReturn(1); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + $background->setContainer($this->getContainer()); + + run(fn () => $background->later(-5, BackgroundQueueLaterTestHandler::class)); + } + + public function testLaterClampsPastDateTimeInterface() + { + Carbon::setTestNow('2024-01-01 12:00:00'); + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(0.0, m::type('Closure'))->andReturn(1); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + $background->setContainer($this->getContainer()); + + run(fn () => $background->later(Carbon::parse('2024-01-01 11:59:50'), BackgroundQueueLaterTestHandler::class)); + + Carbon::setTestNow(); + } + + public function testLaterFailedJobGetsHandledWhenAnExceptionIsThrown() + { + unset($_SERVER['__background.failed']); + + $result = null; + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(5.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(); + return 1; + }); + + $background = new BackgroundQueue(timer: $timer); + $background->setExceptionCallback(function ($exception) use (&$result) { + $result = $exception; + }); + $background->setConnectionName('background'); + $container = $this->getContainer(); + $events = m::mock(Dispatcher::class); + $events->shouldReceive('dispatch')->times(4); + $container->instance('events', $events); + $container->instance(Dispatcher::class, $events); + $background->setContainer($container); + + run(fn () => $background->later(5, FailingBackgroundQueueTestHandler::class, ['foo' => 'bar'])); + + $this->assertInstanceOf(Exception::class, $result); + $this->assertTrue($_SERVER['__background.failed']); + } + + public function testLaterDoesNotExecuteJobWhenWorkerIsClosing() + { + unset($_SERVER['__background.later.test']); + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(5.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(true); + return 1; + }); + + $background = new BackgroundQueue(timer: $timer); + $background->setConnectionName('background'); + $background->setContainer($this->getContainer()); + + run(fn () => $background->later(5, BackgroundQueueLaterTestHandler::class, ['foo' => 'bar'])); + + $this->assertArrayNotHasKey('__background.later.test', $_SERVER); + } + protected function getContainer(): Container { return new Container; @@ -154,3 +432,31 @@ public function handle() { } } + +class BackgroundQueueAfterCommitUniqueJob implements ShouldBeUnique +{ + use InteractsWithQueue; + + public $afterCommit = true; + + public function handle(): void + { + } +} + +class BackgroundQueueAfterCommitInterfaceUniqueJob implements ShouldBeUnique, ShouldQueueAfterCommit +{ + use InteractsWithQueue; + + public function handle(): void + { + } +} + +class BackgroundQueueLaterTestHandler +{ + public function fire(SyncJob $job, mixed $data): void + { + $_SERVER['__background.later.test'] = func_get_args(); + } +} diff --git a/tests/Queue/QueueDeferredQueueTest.php b/tests/Queue/QueueDeferredQueueTest.php index 8d7e874e2..1a672a94d 100644 --- a/tests/Queue/QueueDeferredQueueTest.php +++ b/tests/Queue/QueueDeferredQueueTest.php @@ -9,6 +9,7 @@ use Hypervel\Container\Container; use Hypervel\Contracts\Events\Dispatcher; use Hypervel\Contracts\Queue\QueueableEntity; +use Hypervel\Contracts\Queue\ShouldBeUnique; use Hypervel\Contracts\Queue\ShouldQueueAfterCommit; use Hypervel\Coordinator\Timer; use Hypervel\Database\DatabaseTransactionsManager; @@ -93,6 +94,34 @@ public function testItAddsATransactionCallbackForInterfaceBasedAfterCommitJobs() run(fn () => $deferred->push(new DeferredQueueAfterCommitInterfaceJob)); } + public function testItAddsATransactionCallbackForAfterCommitUniqueJobs() + { + $deferred = new DeferredQueue; + $deferred->setConnectionName('deferred'); + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + + $deferred->setContainer($container); + run(fn () => $deferred->push(new DeferredQueueAfterCommitUniqueJob)); + } + + public function testItAddsATransactionCallbackForInterfaceBasedAfterCommitUniqueJobs() + { + $deferred = new DeferredQueue; + $deferred->setConnectionName('deferred'); + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback')->once()->andReturn(null); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + + $deferred->setContainer($container); + run(fn () => $deferred->push(new DeferredQueueAfterCommitInterfaceUniqueJob)); + } + public function testLaterSchedulesJobWithDelay() { $timer = m::mock(Timer::class); @@ -131,12 +160,12 @@ public function testLaterWithDateInterval() $deferred->setConnectionName('deferred'); $deferred->setContainer($this->getContainer()); - unset($_SERVER['__deferred.later.interval.test']); + unset($_SERVER['__deferred.later.test']); - run(fn () => $deferred->later(new DateInterval('PT10S'), DeferredQueueLaterIntervalTestHandler::class, ['baz' => 'qux'])); + run(fn () => $deferred->later(new DateInterval('PT10S'), DeferredQueueLaterTestHandler::class, ['baz' => 'qux'])); - $this->assertInstanceOf(SyncJob::class, $_SERVER['__deferred.later.interval.test'][0]); - $this->assertEquals(['baz' => 'qux'], $_SERVER['__deferred.later.interval.test'][1]); + $this->assertInstanceOf(SyncJob::class, $_SERVER['__deferred.later.test'][0]); + $this->assertEquals(['baz' => 'qux'], $_SERVER['__deferred.later.test'][1]); } public function testLaterWithDateTime() @@ -156,16 +185,189 @@ public function testLaterWithDateTime() $deferred->setConnectionName('deferred'); $deferred->setContainer($this->getContainer()); - unset($_SERVER['__deferred.later.datetime.test']); + unset($_SERVER['__deferred.later.test']); + + run(fn () => $deferred->later(Carbon::parse('2024-01-01 12:00:15'), DeferredQueueLaterTestHandler::class, ['test' => 'data'])); + + $this->assertInstanceOf(SyncJob::class, $_SERVER['__deferred.later.test'][0]); + $this->assertEquals(['test' => 'data'], $_SERVER['__deferred.later.test'][1]); + + Carbon::setTestNow(); + } + + public function testLaterAddsTransactionCallbackForAfterCommitJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $container->instance('db.transactions', $transactionManager); + $deferred->setContainer($container); + + run(fn () => $deferred->later(5, new DeferredQueueAfterCommitJob)); + } + + public function testLaterAddsTransactionCallbackForInterfaceBasedAfterCommitJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $container->instance('db.transactions', $transactionManager); + $deferred->setContainer($container); + + run(fn () => $deferred->later(5, new DeferredQueueAfterCommitInterfaceJob)); + } + + public function testLaterAddsTransactionCallbackForAfterCommitUniqueJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + $deferred->setContainer($container); + + run(fn () => $deferred->later(5, new DeferredQueueAfterCommitUniqueJob)); + } + + public function testLaterAddsTransactionCallbackForInterfaceBasedAfterCommitUniqueJobs() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(5.0, m::type('Closure'))->andReturn(1); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + + $container = $this->getContainer(); + $transactionManager = m::mock(DatabaseTransactionsManager::class); + $transactionManager->shouldReceive('addCallback') + ->once() + ->andReturnUsing(function ($callback) { + $callback(); + return null; + }); + $transactionManager->shouldReceive('addCallbackForRollback')->once()->andReturn(null); + $container->instance('db.transactions', $transactionManager); + $deferred->setContainer($container); + + run(fn () => $deferred->later(5, new DeferredQueueAfterCommitInterfaceUniqueJob)); + } + + public function testLaterClampsNegativeIntegerDelay() + { + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(0.0, m::type('Closure'))->andReturn(1); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + $deferred->setContainer($this->getContainer()); + + run(fn () => $deferred->later(-5, DeferredQueueLaterTestHandler::class)); + } + + public function testLaterClampsPastDateTimeInterface() + { + Carbon::setTestNow('2024-01-01 12:00:00'); + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after')->once()->with(0.0, m::type('Closure'))->andReturn(1); - run(fn () => $deferred->later(Carbon::parse('2024-01-01 12:00:15'), DeferredQueueLaterDateTimeTestHandler::class, ['test' => 'data'])); + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + $deferred->setContainer($this->getContainer()); - $this->assertInstanceOf(SyncJob::class, $_SERVER['__deferred.later.datetime.test'][0]); - $this->assertEquals(['test' => 'data'], $_SERVER['__deferred.later.datetime.test'][1]); + run(fn () => $deferred->later(Carbon::parse('2024-01-01 11:59:50'), DeferredQueueLaterTestHandler::class)); Carbon::setTestNow(); } + public function testLaterFailedJobGetsHandledWhenAnExceptionIsThrown() + { + unset($_SERVER['__deferred.failed']); + + $result = null; + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(5.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(); + return 1; + }); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setExceptionCallback(function ($exception) use (&$result) { + $result = $exception; + }); + $deferred->setConnectionName('deferred'); + $container = $this->getContainer(); + $events = m::mock(Dispatcher::class); + $events->shouldReceive('dispatch')->times(4); + $container->instance('events', $events); + $container->instance(Dispatcher::class, $events); + $deferred->setContainer($container); + + run(fn () => $deferred->later(5, FailingDeferredQueueTestHandler::class, ['foo' => 'bar'])); + + $this->assertInstanceOf(Exception::class, $result); + $this->assertTrue($_SERVER['__deferred.failed']); + } + + public function testLaterDoesNotExecuteJobWhenWorkerIsClosing() + { + unset($_SERVER['__deferred.later.test']); + + $timer = m::mock(Timer::class); + $timer->shouldReceive('after') + ->once() + ->with(5.0, m::type('Closure')) + ->andReturnUsing(function ($delay, $callback) { + $callback(true); + return 1; + }); + + $deferred = new DeferredQueue(timer: $timer); + $deferred->setConnectionName('deferred'); + $deferred->setContainer($this->getContainer()); + + run(fn () => $deferred->later(5, DeferredQueueLaterTestHandler::class, ['foo' => 'bar'])); + + $this->assertArrayNotHasKey('__deferred.later.test', $_SERVER); + } + protected function getContainer(): Container { return new Container; @@ -231,26 +433,30 @@ public function handle() } } -class DeferredQueueLaterTestHandler +class DeferredQueueAfterCommitUniqueJob implements ShouldBeUnique { - public function fire(SyncJob $job, mixed $data): void + use InteractsWithQueue; + + public $afterCommit = true; + + public function handle(): void { - $_SERVER['__deferred.later.test'] = func_get_args(); } } -class DeferredQueueLaterIntervalTestHandler +class DeferredQueueAfterCommitInterfaceUniqueJob implements ShouldBeUnique, ShouldQueueAfterCommit { - public function fire(SyncJob $job, mixed $data): void + use InteractsWithQueue; + + public function handle(): void { - $_SERVER['__deferred.later.interval.test'] = func_get_args(); } } -class DeferredQueueLaterDateTimeTestHandler +class DeferredQueueLaterTestHandler { public function fire(SyncJob $job, mixed $data): void { - $_SERVER['__deferred.later.datetime.test'] = func_get_args(); + $_SERVER['__deferred.later.test'] = func_get_args(); } } diff --git a/tests/Queue/QueueDelayTest.php b/tests/Queue/QueueDelayTest.php index 03f6213a0..d3a28a09c 100644 --- a/tests/Queue/QueueDelayTest.php +++ b/tests/Queue/QueueDelayTest.php @@ -9,8 +9,8 @@ use Hypervel\Contracts\Bus\Dispatcher; use Hypervel\Contracts\Queue\ShouldQueue; use Hypervel\Foundation\Bus\PendingDispatch; +use Hypervel\Tests\TestCase; use Mockery as m; -use PHPUnit\Framework\TestCase; class QueueDelayTest extends TestCase { diff --git a/tests/Queue/QueueNullQueueTest.php b/tests/Queue/QueueNullQueueTest.php new file mode 100644 index 000000000..d4d76525c --- /dev/null +++ b/tests/Queue/QueueNullQueueTest.php @@ -0,0 +1,19 @@ +assertNull($queue->creationTimeOfOldestPendingJob()); + $this->assertNull($queue->creationTimeOfOldestPendingJob('custom')); + } +} diff --git a/tests/Queue/QueueSqsConnectorTest.php b/tests/Queue/QueueSqsConnectorTest.php new file mode 100644 index 000000000..13d1605d5 --- /dev/null +++ b/tests/Queue/QueueSqsConnectorTest.php @@ -0,0 +1,24 @@ +connect([ + 'queue' => 'default', + 'region' => 'us-east-1', + ]); + + $this->assertInstanceOf(SqsQueue::class, $queue); + } +} diff --git a/tests/Queue/QueueSyncQueueTest.php b/tests/Queue/QueueSyncQueueTest.php index f966a7a67..75c7b89c2 100644 --- a/tests/Queue/QueueSyncQueueTest.php +++ b/tests/Queue/QueueSyncQueueTest.php @@ -17,6 +17,7 @@ use Hypervel\Contracts\Queue\ShouldQueueAfterCommit; use Hypervel\Database\DatabaseTransactionsManager; use Hypervel\Events\Dispatcher as EventsDispatcher; +use Hypervel\Queue\CallQueuedHandler; use Hypervel\Queue\InteractsWithQueue; use Hypervel\Queue\Jobs\SyncJob; use Hypervel\Queue\SyncQueue; @@ -26,6 +27,15 @@ class QueueSyncQueueTest extends TestCase { + protected function setUp(): void + { + parent::setUp(); + + if (! class_exists('Illuminate\Queue\CallQueuedHandler', autoload: false)) { + class_alias(CallQueuedHandler::class, 'Illuminate\Queue\CallQueuedHandler'); + } + } + protected function tearDown(): void { SyncQueue::createPayloadUsing(null);