Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/horizon/src/Connectors/RedisConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function connect(array $config): RedisQueue
Arr::get($config, 'connection', $this->connection),
Arr::get($config, 'retry_after', 60),
Arr::get($config, 'block_for', null),
Arr::get($config, 'after_commit', null)
Arr::get($config, 'after_commit', false)
);
}
}
55 changes: 47 additions & 8 deletions src/queue/src/BackgroundQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Hypervel\Queue;

use DateInterval;
use DateTimeInterface;
use Hypervel\Coordinator\Timer;
use Hypervel\Coroutine\Coroutine;
use Throwable;

Expand All @@ -17,23 +20,38 @@ class BackgroundQueue extends SyncQueue
protected $exceptionCallback;

/**
* Push a new job onto the queue.
* The timer used to schedule delayed jobs.
*/
public function push(object|string $job, mixed $data = '', ?string $queue = null): mixed
protected Timer $timer;

/**
* Create a new background queue instance.
*/
public function __construct(
bool $dispatchAfterCommit = false,
?Timer $timer = null
) {
parent::__construct($dispatchAfterCommit);
$this->timer = $timer ?? new Timer;
}

/**
* Push a new job onto the queue after (n) seconds.
*/
public function later(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data = '', ?string $queue = null): mixed
{
if (
$this->shouldDispatchAfterCommit($job)
if ($this->shouldDispatchAfterCommit($job)
&& $this->container->has('db.transactions')
) {
$this->addUniqueJobRollbackCallback($job);

return $this->container->make('db.transactions')
->addCallback(
fn () => $this->executeJob($job, $data, $queue)
fn () => $this->scheduleTimer($delay, $job, $data, $queue)
);
}

$this->executeJob($job, $data, $queue);

return null;
return $this->scheduleTimer($delay, $job, $data, $queue);
}

/**
Expand All @@ -46,6 +64,27 @@ public function setExceptionCallback(?callable $callback): static
return $this;
}

/**
* Schedule the timer that will execute the job after the delay.
*
* Skips execution when the worker is closing — pending delayed jobs are
* dropped rather than racing against shutdown cleanup. Devs needing
* durability across worker restarts should use a persistent queue.
*/
protected function scheduleTimer(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data, ?string $queue): int
{
return $this->timer->after(
max(0.0, (float) $this->secondsUntil($delay)),
function (bool $isClosing = false) use ($job, $data, $queue) {
if ($isClosing) {
return;
}

$this->executeJob($job, $data, $queue);
}
);
}

/**
* Execute a new job in the background queue.
*/
Expand Down
6 changes: 1 addition & 5 deletions src/queue/src/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ public function __construct(
protected string $default,
protected int $timeToRun,
protected int $blockFor = 0,
protected ?bool $dispatchAfterCommit = false
bool $dispatchAfterCommit = false
) {
$this->default = $default;
$this->blockFor = $blockFor;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}

Expand Down
2 changes: 1 addition & 1 deletion src/queue/src/Connectors/SqsConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function connect(array $config): Queue
$config['queue'],
$config['prefix'] ?? '',
$config['suffix'] ?? '',
$config['after_commit'] ?? null
$config['after_commit'] ?? false
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/queue/src/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function __construct(
protected string $table,
protected string $default = 'default',
protected ?int $retryAfter = 60,
protected ?bool $dispatchAfterCommit = false
protected bool $dispatchAfterCommit = false
) {
}

Expand Down
64 changes: 40 additions & 24 deletions src/queue/src/DeferredQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use DateInterval;
use DateTimeInterface;
use Hypervel\Coordinator\Timer;
use Hypervel\Engine\Coroutine;
use Hypervel\Coroutine\Coroutine;
use Throwable;

class DeferredQueue extends SyncQueue
Expand All @@ -19,71 +19,87 @@ class DeferredQueue extends SyncQueue
*/
protected $exceptionCallback;

/**
* The timer used to schedule delayed jobs.
*/
protected Timer $timer;

/**
* Create a new deferred queue instance.
*/
public function __construct(
protected ?bool $dispatchAfterCommit = false,
protected ?Timer $timer = null
bool $dispatchAfterCommit = false,
?Timer $timer = null
) {
if (! $this->timer) {
$this->timer = new Timer;
}
parent::__construct($dispatchAfterCommit);
$this->timer = $timer ?? new Timer;
}

/**
* Push a new job onto the queue.
* Push a new job onto the queue after (n) seconds.
*/
public function push(object|string $job, mixed $data = '', ?string $queue = null): mixed
public function later(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data = '', ?string $queue = null): mixed
{
if ($this->shouldDispatchAfterCommit($job)
&& $this->container->has('db.transactions')
) {
$this->addUniqueJobRollbackCallback($job);

return $this->container->make('db.transactions')
->addCallback(
fn () => $this->deferJob($job, $data, $queue)
fn () => $this->scheduleTimer($delay, $job, $data, $queue)
);
}

$this->deferJob($job, $data, $queue);

return null;
return $this->scheduleTimer($delay, $job, $data, $queue);
}

/**
* Push a new job onto the queue after (n) seconds.
* Set the exception callback for the deferred queue.
*/
public function later(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data = '', ?string $queue = null): mixed
public function setExceptionCallback(?callable $callback): static
{
return $this->timer->after(
(float) $this->secondsUntil($delay),
fn () => $this->deferJob($job, $data, $queue)
);
$this->exceptionCallback = $callback;

return $this;
}

/**
* Set the exception callback for the deferred queue.
* Schedule the timer that will execute the job after the delay.
*
* Skips execution when the worker is closing — pending delayed jobs are
* dropped rather than racing against shutdown cleanup. Devs needing
* durability across worker restarts should use a persistent queue.
*/
public function setExceptionCallback(?callable $callback): static
protected function scheduleTimer(DateInterval|DateTimeInterface|int $delay, object|string $job, mixed $data, ?string $queue): int
{
$this->exceptionCallback = $callback;
return $this->timer->after(
max(0.0, (float) $this->secondsUntil($delay)),
function (bool $isClosing = false) use ($job, $data, $queue) {
if ($isClosing) {
return;
}

return $this;
$this->executeJob($job, $data, $queue);
}
);
}

/**
* Defer a new job onto the deferred queue.
*/
protected function deferJob(object|string $job, mixed $data = '', ?string $queue = null): void
protected function executeJob(object|string $job, mixed $data = '', ?string $queue = null): int
{
Coroutine::defer(function () use ($job, $data, $queue) {
try {
$this->executeJob($job, $data, $queue);
parent::executeJob($job, $data, $queue);
} catch (Throwable $e) {
if ($this->exceptionCallback) {
($this->exceptionCallback)($e);
}
}
});

return 0;
}
}
2 changes: 1 addition & 1 deletion src/queue/src/NullQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function reservedSize(?string $queue = null): int
*/
public function creationTimeOfOldestPendingJob(?string $queue = null): ?int
{
return 0;
return null;
}

/**
Expand Down
28 changes: 19 additions & 9 deletions src/queue/src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ abstract class Queue
/**
* Indicates that jobs should be dispatched after all database transactions have committed.
*/
protected ?bool $dispatchAfterCommit = false;
protected bool $dispatchAfterCommit = false;

/**
* The create payload callbacks.
Expand Down Expand Up @@ -314,13 +314,7 @@ protected function enqueueUsing(object|string $job, ?string $payload, ?string $q
if ($this->shouldDispatchAfterCommit($job)
&& $this->container->has('db.transactions')
) {
if ($job instanceof ShouldBeUnique) {
$this->container->make('db.transactions')->addCallbackForRollback(
function () use ($job) {
(new UniqueLock($this->container->make(Cache::class)))->release($job);
}
);
}
$this->addUniqueJobRollbackCallback($job);

return $this->container->make('db.transactions')
->addCallback(
Expand Down Expand Up @@ -356,7 +350,23 @@ protected function shouldDispatchAfterCommit(object|string $job): bool
return $job->afterCommit;
}

return $this->dispatchAfterCommit ?? false;
return $this->dispatchAfterCommit;
}

/**
* Register a transaction rollback callback that releases the unique lock for the given job.
*/
protected function addUniqueJobRollbackCallback(object|string $job): void
{
if (! $job instanceof ShouldBeUnique) {
return;
}

$this->container->make('db.transactions')->addCallbackForRollback(
function () use ($job) {
(new UniqueLock($this->container->make(Cache::class)))->release($job);
}
);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/queue/src/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function __construct(
protected ?string $connection = null,
protected ?int $retryAfter = 60,
protected ?int $blockFor = null,
protected ?bool $dispatchAfterCommit = false,
protected bool $dispatchAfterCommit = false,
protected int $migrationBatchSize = -1
) {
}
Expand Down
6 changes: 1 addition & 5 deletions src/queue/src/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@ public function __construct(
protected string $default,
protected string $prefix = '',
protected string $suffix = '',
protected ?bool $dispatchAfterCommit = false
bool $dispatchAfterCommit = false
) {
$this->sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
$this->suffix = $suffix;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}

Expand Down
13 changes: 2 additions & 11 deletions src/queue/src/SyncQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

use DateInterval;
use DateTimeInterface;
use Hypervel\Bus\UniqueLock;
use Hypervel\Contracts\Cache\Repository as Cache;
use Hypervel\Contracts\Queue\Job as JobContract;
use Hypervel\Contracts\Queue\Queue as QueueContract;
use Hypervel\Contracts\Queue\ShouldBeUnique;
use Hypervel\Queue\Events\JobAttempted;
use Hypervel\Queue\Events\JobExceptionOccurred;
use Hypervel\Queue\Events\JobProcessed;
Expand All @@ -24,7 +21,7 @@ class SyncQueue extends Queue implements QueueContract
* Create a new sync queue instance.
*/
public function __construct(
protected ?bool $dispatchAfterCommit = false
protected bool $dispatchAfterCommit = false
) {
}

Expand Down Expand Up @@ -78,13 +75,7 @@ public function push(object|string $job, mixed $data = '', ?string $queue = null
if ($this->shouldDispatchAfterCommit($job)
&& $this->container->has('db.transactions')
) {
if ($job instanceof ShouldBeUnique) {
$this->container->make('db.transactions')->addCallbackForRollback(
function () use ($job) {
(new UniqueLock($this->container->make(Cache::class)))->release($job);
}
);
}
$this->addUniqueJobRollbackCallback($job);

return $this->container->make('db.transactions')
->addCallback(
Expand Down
26 changes: 26 additions & 0 deletions tests/Horizon/Unit/RedisConnectorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Hypervel\Tests\Horizon\Unit;

use Hypervel\Contracts\Redis\Factory as Redis;
use Hypervel\Horizon\Connectors\RedisConnector;
use Hypervel\Horizon\RedisQueue;
use Hypervel\Tests\Horizon\UnitTestCase;
use Mockery as m;

class RedisConnectorTest extends UnitTestCase
{
public function testConnectSucceedsWithoutAfterCommitConfig()
{
$redis = m::mock(Redis::class);
$connector = new RedisConnector($redis);

$queue = $connector->connect([
'queue' => 'default',
]);

$this->assertInstanceOf(RedisQueue::class, $queue);
}
}
Loading