diff --git a/app/Http/Controllers/Api/ActivityTaskController.php b/app/Http/Controllers/Api/ActivityTaskController.php index 861a2c0..8342c46 100644 --- a/app/Http/Controllers/Api/ActivityTaskController.php +++ b/app/Http/Controllers/Api/ActivityTaskController.php @@ -55,6 +55,7 @@ public function poll(Request $request): JsonResponse 'worker_id' => ['required', 'string'], 'task_queue' => ['required', 'string'], 'build_id' => ['nullable', 'string'], + 'poll_request_id' => ['nullable', 'string', 'max:255'], ]); $worker = $this->resolveRegisteredWorker( @@ -96,6 +97,7 @@ public function poll(Request $request): JsonResponse leaseOwner: $validated['worker_id'], buildId: $registeredBuildId, worker: $worker, + pollRequestId: $validated['poll_request_id'] ?? null, supportedActivityTypes: $supportedActivityTypes, ); } catch (\Throwable $exception) { diff --git a/app/Support/ActivityTaskPollRequestStore.php b/app/Support/ActivityTaskPollRequestStore.php new file mode 100644 index 0000000..8d75a45 --- /dev/null +++ b/app/Support/ActivityTaskPollRequestStore.php @@ -0,0 +1,254 @@ +store()->add( + $this->pendingKey($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId), + now()->toJSON(), + now()->addSeconds($this->pendingTtlSeconds()), + ); + } + + /** + * @return array{resolved: bool, task: array|null, poll_status: string|null} + */ + public function result( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): array { + $payload = $this->store()->get( + $this->resultKey($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId), + ); + + if (! is_array($payload) || ($payload['resolved'] ?? null) !== true) { + return [ + 'resolved' => false, + 'task' => null, + 'poll_status' => null, + ]; + } + + $task = $payload['task'] ?? null; + $pollStatus = $this->normalizePollStatus($payload['poll_status'] ?? null, $task); + + return [ + 'resolved' => true, + 'task' => is_array($task) ? $task : null, + 'poll_status' => $pollStatus, + ]; + } + + /** + * @return array{resolved: bool, task: array|null, poll_status: string|null} + */ + public function waitForResult( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ?int $timeoutMilliseconds = null, + ): array { + $pendingKey = $this->pendingKey($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId); + $deadline = microtime(true) + (($timeoutMilliseconds ?? $this->waitTimeoutMilliseconds()) / 1000); + + while (microtime(true) < $deadline) { + $result = $this->result($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId); + + if ($result['resolved']) { + return $result; + } + + if (! $this->store()->has($pendingKey)) { + return [ + 'resolved' => false, + 'task' => null, + 'poll_status' => null, + ]; + } + + $this->pause($this->pollIntervalMilliseconds()); + } + + return $this->result($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId); + } + + /** + * @param array|null $task + */ + public function rememberResult( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ?array $task, + string $pollStatus, + ): void { + $this->store()->put( + $this->resultKey($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId), + [ + 'resolved' => true, + 'task' => $task, + 'poll_status' => $this->normalizePollStatus($pollStatus, $task), + ], + now()->addSeconds($this->resultTtlSeconds($task)), + ); + + $this->forgetPending($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId); + } + + public function forgetResult( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): void { + $this->store()->forget( + $this->resultKey($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId), + ); + } + + public function forgetPending( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): void { + $this->store()->forget( + $this->pendingKey($namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId), + ); + } + + protected function pause(int $milliseconds): void + { + usleep(max(1, $milliseconds) * 1000); + } + + private function pendingTtlSeconds(): int + { + return max(5, (int) config('server.polling.timeout', 30) + 5); + } + + private function resultTtlSeconds(?array $task): int + { + $leaseExpiresAt = $this->leaseExpiresAt($task); + + if ($leaseExpiresAt instanceof Carbon && $leaseExpiresAt->gt(now())) { + return max(5, min(3600, now()->diffInSeconds($leaseExpiresAt) + 5)); + } + + return max(5, min(60, $this->pendingTtlSeconds())); + } + + private function waitTimeoutMilliseconds(): int + { + return max(250, ((int) config('server.polling.timeout', 30) * 1000) + 250); + } + + private function pollIntervalMilliseconds(): int + { + return max(10, min(50, (int) config('server.polling.signal_check_interval_ms', 100))); + } + + private function leaseExpiresAt(?array $task): ?Carbon + { + if (! is_array($task)) { + return null; + } + + $leaseExpiresAt = $task['lease_expires_at'] ?? null; + + if ($leaseExpiresAt instanceof \DateTimeInterface) { + return Carbon::instance($leaseExpiresAt); + } + + if (! is_string($leaseExpiresAt) || trim($leaseExpiresAt) === '') { + return null; + } + + try { + return Carbon::parse($leaseExpiresAt); + } catch (\Throwable) { + return null; + } + } + + /** + * @param array|null $task + */ + private function normalizePollStatus(mixed $pollStatus, ?array $task): string + { + if (is_string($pollStatus) && $pollStatus !== '') { + return $pollStatus; + } + + return is_array($task) ? 'leased' : 'empty'; + } + + private function pendingKey( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): string { + return $this->cacheKey('pending', $namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId); + } + + private function resultKey( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): string { + return $this->cacheKey('result', $namespace, $taskQueue, $buildId, $leaseOwner, $pollRequestId); + } + + private function cacheKey( + string $kind, + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): string { + return self::CACHE_PREFIX.sha1(json_encode([ + 'kind' => $kind, + 'namespace' => $namespace, + 'task_queue' => $taskQueue, + 'build_id' => $buildId, + 'lease_owner' => $leaseOwner, + 'poll_request_id' => $pollRequestId, + ], JSON_THROW_ON_ERROR)); + } + + private function store(): CacheRepository + { + return $this->cache->store(); + } +} \ No newline at end of file diff --git a/app/Support/ActivityTaskPoller.php b/app/Support/ActivityTaskPoller.php index 711049b..abc2699 100644 --- a/app/Support/ActivityTaskPoller.php +++ b/app/Support/ActivityTaskPoller.php @@ -6,6 +6,7 @@ use Illuminate\Support\Facades\DB; use InvalidArgumentException; use Workflow\V2\Contracts\ActivityTaskBridge as ActivityTaskBridgeContract; +use Workflow\V2\Enums\ActivityAttemptStatus; use Workflow\V2\Enums\TaskStatus; use Workflow\V2\Enums\TaskType; use Workflow\V2\Models\ActivityAttempt; @@ -21,6 +22,7 @@ final class ActivityTaskPoller public function __construct( private readonly LongPoller $longPoller, private readonly ActivityTaskBridgeContract $bridge, + private readonly ActivityTaskPollRequestStore $pollRequests, private readonly LongPollSignalStore $signals, private readonly TaskQueueAdmission $admission, private readonly WorkerSessionRegistry $workerSessions, @@ -33,6 +35,232 @@ public function __construct( * @return array{task: array|null, poll_status: string} */ public function poll( + string $namespace, + string $taskQueue, + string $leaseOwner, + ?string $buildId, + WorkerRegistration $worker, + ?string $pollRequestId = null, + array $supportedActivityTypes = [], + ): array { + $pollRequestId = $this->nonEmptyString($pollRequestId); + + if ($pollRequestId === null) { + return $this->performPoll( + namespace: $namespace, + taskQueue: $taskQueue, + leaseOwner: $leaseOwner, + buildId: $buildId, + worker: $worker, + supportedActivityTypes: $supportedActivityTypes, + ); + } + + return $this->coordinatedPoll( + namespace: $namespace, + taskQueue: $taskQueue, + leaseOwner: $leaseOwner, + buildId: $buildId, + worker: $worker, + pollRequestId: $pollRequestId, + supportedActivityTypes: $supportedActivityTypes, + ); + } + + /** + * @param list $supportedActivityTypes + * @return array{task: array|null, poll_status: string} + */ + private function coordinatedPoll( + string $namespace, + string $taskQueue, + string $leaseOwner, + ?string $buildId, + WorkerRegistration $worker, + string $pollRequestId, + array $supportedActivityTypes = [], + ): array { + for ($attempt = 0; $attempt < 3; $attempt++) { + $cached = $this->cachedPollResult( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + ); + + if ($cached['resolved']) { + return [ + 'task' => $cached['task'], + 'poll_status' => $cached['poll_status'] ?? $this->defaultPollStatus($cached['task']), + ]; + } + + if ($this->pollRequests->tryStart( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + )) { + return $this->runCoordinatedPollLeader( + namespace: $namespace, + taskQueue: $taskQueue, + leaseOwner: $leaseOwner, + buildId: $buildId, + worker: $worker, + pollRequestId: $pollRequestId, + supportedActivityTypes: $supportedActivityTypes, + ); + } + + $observed = $this->pollRequests->waitForResult( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + ); + + if ($observed['resolved']) { + return [ + 'task' => $observed['task'], + 'poll_status' => $observed['poll_status'] ?? $this->defaultPollStatus($observed['task']), + ]; + } + } + + $cached = $this->cachedPollResult( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + ); + + return [ + 'task' => $cached['task'], + 'poll_status' => $cached['poll_status'] ?? $this->defaultPollStatus($cached['task']), + ]; + } + + /** + * @return array{resolved: bool, task: array|null, poll_status: string|null} + */ + private function cachedPollResult( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + string $pollRequestId, + ): array { + $cached = $this->pollRequests->result( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + ); + + if (! $cached['resolved']) { + return $cached; + } + + if ($this->cachedTaskStillDeliverable( + namespace: $namespace, + taskQueue: $taskQueue, + buildId: $buildId, + leaseOwner: $leaseOwner, + task: $cached['task'], + )) { + $refreshedTask = $this->refreshCachedTaskPayload($cached['task']); + + if ($refreshedTask !== $cached['task']) { + $this->pollRequests->rememberResult( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + $refreshedTask, + $cached['poll_status'] ?? $this->defaultPollStatus($refreshedTask), + ); + } + + return [ + 'resolved' => true, + 'task' => $refreshedTask, + 'poll_status' => $cached['poll_status'] ?? $this->defaultPollStatus($refreshedTask), + ]; + } + + $this->pollRequests->forgetResult( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + ); + + return [ + 'resolved' => false, + 'task' => null, + 'poll_status' => null, + ]; + } + + /** + * @param list $supportedActivityTypes + * @return array{task: array|null, poll_status: string} + */ + private function runCoordinatedPollLeader( + string $namespace, + string $taskQueue, + string $leaseOwner, + ?string $buildId, + WorkerRegistration $worker, + string $pollRequestId, + array $supportedActivityTypes = [], + ): array { + try { + $task = $this->performPoll( + namespace: $namespace, + taskQueue: $taskQueue, + leaseOwner: $leaseOwner, + buildId: $buildId, + worker: $worker, + supportedActivityTypes: $supportedActivityTypes, + ); + } catch (\Throwable $exception) { + $this->pollRequests->forgetPending( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + ); + + throw $exception; + } + + $this->pollRequests->rememberResult( + $namespace, + $taskQueue, + $buildId, + $leaseOwner, + $pollRequestId, + $task['task'] ?? null, + $task['poll_status'] ?? $this->defaultPollStatus($task['task'] ?? null), + ); + + return $task; + } + + /** + * @param list $supportedActivityTypes + * @return array{task: array|null, poll_status: string} + */ + private function performPoll( string $namespace, string $taskQueue, string $leaseOwner, @@ -86,6 +314,116 @@ function () use ( ]; } + /** + * @param array|null $task + */ + private function cachedTaskStillDeliverable( + string $namespace, + string $taskQueue, + ?string $buildId, + string $leaseOwner, + ?array $task, + ): bool { + if ($task === null) { + return true; + } + + $taskId = $this->nonEmptyString($task['task_id'] ?? null); + $activityAttemptId = $this->nonEmptyString($task['activity_attempt_id'] ?? null); + + if ($taskId === null || $activityAttemptId === null) { + return false; + } + + $workflowTask = NamespaceWorkflowScope::task($namespace, $taskId); + + if (! $workflowTask instanceof WorkflowTask || $workflowTask->task_type !== TaskType::Activity) { + return false; + } + + if ($workflowTask->status !== TaskStatus::Leased) { + return false; + } + + if ($this->nonEmptyString($workflowTask->queue) !== $taskQueue) { + return false; + } + + if (! $this->matchesCompatibility($buildId, $workflowTask->compatibility)) { + return false; + } + + if ($this->nonEmptyString($workflowTask->lease_owner) !== $leaseOwner) { + return false; + } + + if ($workflowTask->lease_expires_at === null || $workflowTask->lease_expires_at->lte(now())) { + return false; + } + + $attempt = ActivityAttempt::query()->find($activityAttemptId); + + if (! $attempt instanceof ActivityAttempt) { + return false; + } + + if ($attempt->workflow_task_id !== $workflowTask->id) { + return false; + } + + if ($this->nonEmptyString($attempt->lease_owner) !== $leaseOwner) { + return false; + } + + if ($attempt->status !== ActivityAttemptStatus::Running) { + return false; + } + + if ($attempt->closed_at !== null) { + return false; + } + + if ($attempt->lease_expires_at === null || $attempt->lease_expires_at->lte(now())) { + return false; + } + + $attemptNumber = is_numeric($task['attempt_number'] ?? null) + ? (int) $task['attempt_number'] + : null; + + if ( + $attemptNumber !== null + && is_int($attempt->attempt_number) + && (int) $attempt->attempt_number !== $attemptNumber + ) { + return false; + } + + return true; + } + + /** + * @param array|null $task + * @return array|null + */ + private function refreshCachedTaskPayload(?array $task): ?array + { + if (! is_array($task)) { + return $task; + } + + $taskId = $this->nonEmptyString($task['task_id'] ?? null); + $leaseOwner = $this->nonEmptyString($task['lease_owner'] ?? null); + + if ($taskId === null || $leaseOwner === null) { + return $task; + } + + $refreshed = $this->rawActivityClaimPayload($taskId, $leaseOwner); + + return is_array($refreshed) ? $refreshed : $task; + } + /** * @param list $supportedActivityTypes * @return array{task: array|null, poll_status: string, next_probe_at: \DateTimeInterface|null} @@ -420,6 +758,15 @@ private function matchesActivityType(array $supported, mixed $activityType): boo return in_array(trim($activityType), $supported, true); } + private function matchesCompatibility(?string $buildId, mixed $compatibility): bool + { + if (! is_string($compatibility) || trim($compatibility) === '') { + return true; + } + + return $buildId !== null && $compatibility === $buildId; + } + private function applyWorkerCompatibility(string $namespace, ?string $buildId): void { config([ diff --git a/config/dw-bounded-growth.php b/config/dw-bounded-growth.php index 64e8eaf..c4efdfd 100644 --- a/config/dw-bounded-growth.php +++ b/config/dw-bounded-growth.php @@ -5,6 +5,7 @@ use App\Support\LongPollSignalStore; use App\Support\ProjectionDriftMetrics; use App\Support\ServerReadiness; +use App\Support\ActivityTaskPollRequestStore; use App\Support\TaskQueueAdmission; use App\Support\WorkerPollClaimGate; use App\Support\WorkflowQueryTaskBroker; @@ -63,6 +64,23 @@ 'eviction' => 'Pending keys are removed when a leader publishes a result; all pending and result keys also expire by TTL.', ], + 'activity_task_poll_requests' => [ + 'owner' => ActivityTaskPollRequestStore::class, + 'prefix' => 'server:activity-task-poll-request:', + 'dimensions' => [ + 'kind', + 'namespace', + 'task_queue', + 'build_id', + 'lease_owner', + 'poll_request_id', + ], + 'ttl' => 'Pending keys live max(server.polling.timeout + 5, 5) seconds. Empty result keys live at most 60 seconds; task result keys live through the active activity-attempt lease, capped at 3600 seconds.', + 'bound' => 'At most one pending key and one short replay-result key per idempotent activity worker poll request in the TTL window.', + 'admission' => 'Cache add elects a single activity poll leader for each idempotent request. Followers wait for the leader result and retry only while the pending marker exists.', + 'eviction' => 'Pending keys are removed when a leader publishes a result; all pending and result keys also expire by TTL.', + ], + 'sqlite_worker_poll_claim_gate' => [ 'owner' => WorkerPollClaimGate::class, 'prefix' => 'server:sqlite-worker-poll-claim:', diff --git a/tests/Feature/WorkflowWorkerProtocolTest.php b/tests/Feature/WorkflowWorkerProtocolTest.php index 9c8c90b..7cbb2cf 100644 --- a/tests/Feature/WorkflowWorkerProtocolTest.php +++ b/tests/Feature/WorkflowWorkerProtocolTest.php @@ -1020,6 +1020,88 @@ public function test_it_replays_cached_duplicate_poll_request_results_after_the_ } } + public function test_it_redelivers_the_same_activity_task_for_duplicate_poll_request_ids(): void + { + Queue::fake(); + + $this->configureWorkflowTypes(); + $this->createNamespace('default', 'Default namespace'); + + $start = $this->withHeaders($this->apiHeaders()) + ->postJson('/api/workflows', [ + 'workflow_id' => 'wf-duplicate-activity-poll-request', + 'workflow_type' => 'tests.external-greeting-workflow', + 'task_queue' => 'external-workflows', + 'input' => ['Ada'], + ]); + + $start->assertCreated(); + + $this->registerWorker('php-worker-activity-scheduler', 'external-workflows'); + $this->registerWorker( + 'php-worker-duplicate-activity-poll', + 'external-activities', + supportedActivityTypes: ['tests.external-greeting-activity'], + ); + + $workflowPoll = $this->withHeaders($this->workerHeaders()) + ->postJson('/api/worker/workflow-tasks/poll', [ + 'worker_id' => 'php-worker-activity-scheduler', + 'task_queue' => 'external-workflows', + ]); + + $workflowPoll->assertOk() + ->assertJsonPath('task.workflow_id', 'wf-duplicate-activity-poll-request') + ->assertJsonPath('task.workflow_task_attempt', 1); + + $workflowTaskId = (string) $workflowPoll->json('task.task_id'); + $workflowAttempt = (int) $workflowPoll->json('task.workflow_task_attempt'); + + $this->withHeaders($this->workerHeaders()) + ->postJson("/api/worker/workflow-tasks/{$workflowTaskId}/complete", [ + 'lease_owner' => 'php-worker-activity-scheduler', + 'workflow_task_attempt' => $workflowAttempt, + 'commands' => [ + [ + 'type' => 'schedule_activity', + 'activity_type' => 'tests.external-greeting-activity', + 'arguments' => Serializer::serializeWithCodec((string) config('workflows.serializer'), ['Ada']), + 'queue' => 'external-activities', + ], + ], + ]) + ->assertOk() + ->assertJsonPath('recorded', true) + ->assertJsonPath('run_status', 'waiting'); + + $firstPoll = $this->withHeaders($this->workerHeaders()) + ->postJson('/api/worker/activity-tasks/poll', [ + 'worker_id' => 'php-worker-duplicate-activity-poll', + 'task_queue' => 'external-activities', + 'poll_request_id' => 'activity-poll-request-1', + ]); + + $firstPoll->assertOk() + ->assertJsonPath('task.workflow_id', 'wf-duplicate-activity-poll-request') + ->assertJsonPath('task.activity_type', 'tests.external-greeting-activity') + ->assertJsonPath('task.lease_owner', 'php-worker-duplicate-activity-poll'); + + $activityTaskId = (string) $firstPoll->json('task.task_id'); + $activityAttemptId = (string) $firstPoll->json('task.activity_attempt_id'); + + $duplicatePoll = $this->withHeaders($this->workerHeaders()) + ->postJson('/api/worker/activity-tasks/poll', [ + 'worker_id' => 'php-worker-duplicate-activity-poll', + 'task_queue' => 'external-activities', + 'poll_request_id' => 'activity-poll-request-1', + ]); + + $duplicatePoll->assertOk() + ->assertJsonPath('task.task_id', $activityTaskId) + ->assertJsonPath('task.activity_attempt_id', $activityAttemptId) + ->assertJsonPath('task.lease_owner', 'php-worker-duplicate-activity-poll'); + } + public function test_it_does_not_replay_cached_duplicate_poll_results_after_the_task_is_completed(): void { Queue::fake();