diff --git a/src/CloudTasksApi.php b/src/CloudTasksApi.php index 0b961fa..d25d666 100644 --- a/src/CloudTasksApi.php +++ b/src/CloudTasksApi.php @@ -12,6 +12,8 @@ * @method static void deleteTask(string $taskName) * @method static Task getTask(string $taskName) * @method static bool exists(string $taskName) + * @method static void pause(string $queue) + * @method static void resume(string $queue) */ class CloudTasksApi extends Facade { diff --git a/src/CloudTasksApiConcrete.php b/src/CloudTasksApiConcrete.php index b62c61e..639f71b 100644 --- a/src/CloudTasksApiConcrete.php +++ b/src/CloudTasksApiConcrete.php @@ -9,6 +9,8 @@ use Google\Cloud\Tasks\V2\GetTaskRequest; use Google\Cloud\Tasks\V2\CreateTaskRequest; use Google\Cloud\Tasks\V2\DeleteTaskRequest; +use Google\Cloud\Tasks\V2\PauseQueueRequest; +use Google\Cloud\Tasks\V2\ResumeQueueRequest; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; class CloudTasksApiConcrete implements CloudTasksApiContract @@ -65,4 +67,14 @@ public function exists(string $taskName): bool return false; } + + public function pause(string $queue): void + { + $this->client->pauseQueue(PauseQueueRequest::build($queue)); + } + + public function resume(string $queue): void + { + $this->client->resumeQueue(ResumeQueueRequest::build($queue)); + } } diff --git a/src/CloudTasksApiContract.php b/src/CloudTasksApiContract.php index 5f0af35..fdaa562 100644 --- a/src/CloudTasksApiContract.php +++ b/src/CloudTasksApiContract.php @@ -15,4 +15,8 @@ public function deleteTask(string $taskName): void; public function getTask(string $taskName): Task; public function exists(string $taskName): bool; + + public function pause(string $queue): void; + + public function resume(string $queue): void; } diff --git a/src/CloudTasksApiFake.php b/src/CloudTasksApiFake.php index 773fcdc..05b5cea 100644 --- a/src/CloudTasksApiFake.php +++ b/src/CloudTasksApiFake.php @@ -23,6 +23,11 @@ class CloudTasksApiFake implements CloudTasksApiContract */ public array $deletedTasks = []; + /** + * @var array + */ + public array $pausedQueues = []; + public function createTask(string $queueName, Task $task): Task { $this->createdTasks[] = compact('queueName', 'task'); @@ -51,6 +56,16 @@ public function exists(string $taskName): bool return false; } + public function pause(string $queue): void + { + $this->pausedQueues[$queue] = true; + } + + public function resume(string $queue): void + { + unset($this->pausedQueues[$queue]); + } + public function assertTaskDeleted(string $taskName): void { Assert::assertTrue( @@ -85,4 +100,14 @@ public function assertCreatedTaskCount(int $count): void { Assert::assertCount($count, $this->createdTasks); } + + public function assertQueuePaused(string $queue): void + { + Assert::assertTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to be paused, but is not'); + } + + public function assertQueueNotPaused(string $queue): void + { + Assert::assertNotTrue($this->pausedQueues[$queue] ?? null, 'Expected queue ['.$queue.'] to not be paused, but it is'); + } } diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index b23780b..11e1078 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -467,4 +467,18 @@ private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName) return $envVars; } + + public function pause(string $queue): void + { + $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); + + CloudTasksApi::pause($queue); + } + + public function resume(string $queue): void + { + $queueName = CloudTasksClient::queueName($this->config['project'], $this->config['location'], $queue); + + CloudTasksApi::resume($queue); + } } diff --git a/src/CloudTasksServiceProvider.php b/src/CloudTasksServiceProvider.php index 8e4833a..01e79d0 100644 --- a/src/CloudTasksServiceProvider.php +++ b/src/CloudTasksServiceProvider.php @@ -7,8 +7,11 @@ use Illuminate\Routing\Router; use Illuminate\Events\Dispatcher; use Illuminate\Queue\QueueManager; +use Illuminate\Support\Facades\Queue; use Illuminate\Foundation\Application; use Illuminate\Queue\Events\JobFailed; +use Illuminate\Queue\Events\QueuePaused; +use Illuminate\Queue\Events\QueueResumed; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Queue\Events\JobExceptionOccurred; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; @@ -113,6 +116,30 @@ private function registerEvents(): void return; } }); + + if (class_exists('Illuminate\Queue\Events\QueuePaused')) { + $events->listen(QueuePaused::class, function (QueuePaused $event) { // @phpstan-ignore-line + $queue = Queue::connection($event->connection); // @phpstan-ignore-line + + if (! $queue instanceof CloudTasksQueue) { + return; + } + + $queue->pause($event->queue); // @phpstan-ignore-line + }); + } + + if (class_exists('Illuminate\Queue\Events\QueueResumed')) { + $events->listen(QueueResumed::class, function (QueueResumed $event) { // @phpstan-ignore-line + $queue = Queue::connection($event->connection); // @phpstan-ignore-line + + if (! $queue instanceof CloudTasksQueue) { + return; + } + + $queue->resume($event->queue); // @phpstan-ignore-line + }); + } } private function registerCommands(): void diff --git a/tests/CloudTasksApiTest.php b/tests/CloudTasksApiTest.php index 5b42338..24a42bb 100644 --- a/tests/CloudTasksApiTest.php +++ b/tests/CloudTasksApiTest.php @@ -9,7 +9,9 @@ use Google\ApiCore\ApiException; use Google\Cloud\Tasks\V2\HttpMethod; use Google\Cloud\Tasks\V2\HttpRequest; +use Google\Cloud\Tasks\V2\Queue\State; use PHPUnit\Framework\Attributes\Test; +use Google\Cloud\Tasks\V2\GetQueueRequest; use Google\Cloud\Tasks\V2\Client\CloudTasksClient; use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi; @@ -138,4 +140,88 @@ public function test_delete_task() $this->expectExceptionMessage('NOT_FOUND'); CloudTasksApi::getTask($task->getName()); } + + #[Test] + public function it_can_pause_queues(): void + { + $queueName = $this->client->queueName( + env('CI_CLOUD_TASKS_PROJECT_ID'), + env('CI_CLOUD_TASKS_LOCATION'), + env('CI_CLOUD_TASKS_QUEUE').'-pause' + ); + + $this->ensureQueueIs($queueName, State::RUNNING); + + // Act + CloudTasksApi::pause($queueName); + + // Assert + $this->assertEquals(State::PAUSED, $this->waitForQueueState($queueName, State::PAUSED)); + } + + #[Test] + public function it_can_resume_queues(): void + { + $queueName = $this->client->queueName( + env('CI_CLOUD_TASKS_PROJECT_ID'), + env('CI_CLOUD_TASKS_LOCATION'), + env('CI_CLOUD_TASKS_QUEUE').'-pause' + ); + + $this->ensureQueueIs($queueName, State::PAUSED); + + // Act + CloudTasksApi::resume($queueName); + + // Assert + $this->assertEquals(State::RUNNING, $this->waitForQueueState($queueName, State::RUNNING)); + } + + private function getQueueState(string $queue): int + { + return $this->client->getQueue(GetQueueRequest::build($queue))->getState(); + } + + private function waitForQueueState(string $queue, int $waitForState): ?int + { + $state = null; + $attempts = 0; + + while ($state !== $waitForState) { + $state = $this->getQueueState($queue); + + if ($state === $waitForState) { + return $state; + } + + $attempts++; + + if ($attempts >= 10) { + break; + } + + sleep(1); + } + + return $state; + } + + private function ensureQueueIs(string $queue, int $desiredState): void + { + $currentState = $this->getQueueState($queue); + + if ($currentState === $desiredState) { + return; + } + + if ($currentState === State::RUNNING && $desiredState === State::PAUSED) { + CloudTasksApi::pause($queue); + } + + if ($currentState === State::PAUSED && $desiredState === State::RUNNING) { + CloudTasksApi::resume($queue); + } + + $this->assertEquals($desiredState, $this->waitForQueueState($queue, $desiredState)); + } } diff --git a/tests/PauseResumeQueueTest.php b/tests/PauseResumeQueueTest.php new file mode 100644 index 0000000..1c321fe --- /dev/null +++ b/tests/PauseResumeQueueTest.php @@ -0,0 +1,45 @@ +version() < 12) { + $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); + } + + CloudTasksApi::fake(); + + Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); + + // Assert + CloudTasksApi::assertQueuePaused('barbequeue'); + } + + #[Test] + public function queue_can_be_resumed(): void + { + // Arrange + if (app()->version() < 12) { + $this->markTestSkipped('This feature only exists in Laravel 12 and up.'); + } + + CloudTasksApi::fake(); + + Artisan::call('queue:pause my-cloudtasks-connection:barbequeue'); + Artisan::call('queue:continue my-cloudtasks-connection:barbequeue'); + + // Assert + CloudTasksApi::assertQueueNotPaused('barbequeue'); + } +}