Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Actions/BaseAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ public function execute(WorkflowContext $context): ActionResult
]);

throw $e;
} catch (\Exception $e) {
// Log and return failure result for general exceptions
// The Executor will convert this to a StepExecutionException with Step context
$this->logger->error('Action failed with unexpected exception', [
} catch (\Throwable $e) {
// Log and return failure result for general throwables (exceptions and errors).
// The Executor will convert this to a StepExecutionException with Step context.
$this->logger->error('Action failed with unexpected throwable', [
'action' => static::class,
'workflow_id' => $context->getWorkflowId(),
'step_id' => $context->getStepId(),
Expand Down
1 change: 0 additions & 1 deletion src/Actions/ConditionAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace SolutionForest\WorkflowEngine\Actions;

use SolutionForest\WorkflowEngine\Attributes\Condition;
use SolutionForest\WorkflowEngine\Attributes\WorkflowStep;
use SolutionForest\WorkflowEngine\Core\ActionResult;
use SolutionForest\WorkflowEngine\Core\WorkflowContext;
Expand Down
28 changes: 23 additions & 5 deletions src/Actions/HttpAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ protected function doExecute(WorkflowContext $context): ActionResult
$method = strtoupper($this->getConfig('method', 'GET'));
$data = $this->getConfig('data', []);
$headers = $this->getConfig('headers', []);
$timeout = $this->getConfig('timeout', 30);
$timeout = (int) $this->getConfig('timeout', 30);
$connectTimeout = (int) $this->getConfig('connect_timeout', min(10, $timeout));
$verifyTls = (bool) $this->getConfig('verify_tls', true);
$maxRedirects = (int) $this->getConfig('max_redirects', 3);

if (! $url) {
return ActionResult::failure('URL is required for HTTP action');
Expand All @@ -59,7 +62,22 @@ protected function doExecute(WorkflowContext $context): ActionResult
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_TIMEOUT, $timeout);
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $connectTimeout);
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, $maxRedirects > 0);
curl_setopt($ch, CURLOPT_MAXREDIRS, $maxRedirects);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, $verifyTls);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, $verifyTls ? 2 : 0);
// Restrict redirects to HTTP/HTTPS so a Location header cannot
// hand the request off to file://, gopher://, etc. The constants
// were renamed in cURL 7.85 — prefer the new one when present.
if (defined('CURLOPT_PROTOCOLS_STR')) {
curl_setopt($ch, CURLOPT_PROTOCOLS_STR, 'http,https');
curl_setopt($ch, CURLOPT_REDIR_PROTOCOLS_STR, 'http,https');
} elseif (defined('CURLPROTO_HTTP') && defined('CURLPROTO_HTTPS')) {
$allowed = CURLPROTO_HTTP | CURLPROTO_HTTPS;
curl_setopt($ch, CURLOPT_PROTOCOLS, $allowed);
curl_setopt($ch, CURLOPT_REDIR_PROTOCOLS, $allowed);
}

// Set HTTP method and body
if ($method !== 'GET') {
Expand Down Expand Up @@ -99,9 +117,9 @@ protected function doExecute(WorkflowContext $context): ActionResult
$error = curl_error($ch);
curl_close($ch);

if ($error) {
if ($responseBody === false || $error) {
return ActionResult::failure(
"HTTP request failed: {$error}",
'HTTP request failed: '.($error ?: 'unknown cURL error'),
[
'error' => $error,
'url' => $url,
Expand Down Expand Up @@ -137,7 +155,7 @@ protected function doExecute(WorkflowContext $context): ActionResult
]
);

} catch (\Exception $e) {
} catch (\Throwable $e) {
return ActionResult::failure(
"HTTP request exception: {$e->getMessage()}",
[
Expand Down
183 changes: 123 additions & 60 deletions src/Core/Executor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace SolutionForest\WorkflowEngine\Core;

use Exception;
use SolutionForest\WorkflowEngine\Contracts\EventDispatcher;
use SolutionForest\WorkflowEngine\Contracts\Logger;
use SolutionForest\WorkflowEngine\Contracts\WorkflowAction;
Expand Down Expand Up @@ -121,7 +120,7 @@ public function execute(WorkflowInstance $instance): void
{
try {
$this->processWorkflow($instance);
} catch (Exception $e) {
} catch (\Throwable $e) {
$this->logger->error('Workflow execution failed', [
'workflow_id' => $instance->getId(),
'workflow_name' => $instance->getDefinition()->getName(),
Expand All @@ -134,16 +133,18 @@ public function execute(WorkflowInstance $instance): void
$this->stateManager->setError($instance, $e->getMessage());
$this->eventDispatcher->dispatch(new WorkflowFailedEvent($instance, $e));

// Re-throw the original exception to maintain the error context
// Re-throw the original throwable to maintain the error context
throw $e;
}
}

/**
* Process workflow execution by managing state transitions and step execution.
* Process workflow execution by iterating over runnable steps until none remain.
*
* This private method handles the core workflow processing logic, including
* state management, step scheduling, and completion detection.
* Each iteration asks the instance for its next runnable steps, executes them,
* and loops again. The loop is bounded by the total number of steps in the
* definition (x2 to allow for conditional skips) to defend against
* pathological definitions that would otherwise loop forever.
*
* @param WorkflowInstance $instance The workflow instance to process
*
Expand All @@ -158,36 +159,60 @@ private function processWorkflow(WorkflowInstance $instance): void
$this->stateManager->save($instance);
}

// Get next steps to execute
$nextSteps = $instance->getNextSteps();
$totalSteps = count($instance->getDefinition()->getSteps());
// Upper bound on iterations: each step can be visited at most once as a
// runnable step and once as a skip. The +1 guards the degenerate zero-step
// workflow from tripping the safety check immediately.
$maxIterations = max(1, $totalSteps * 2 + 1);
$iterations = 0;

while (true) {
if (++$iterations > $maxIterations) {
throw new \RuntimeException(
"Workflow '{$instance->getId()}' exceeded maximum execution iterations ({$maxIterations}); ".
'this usually indicates a cycle in the transition graph.'
);
}

if (empty($nextSteps)) {
// Workflow completed successfully
$instance->setState(WorkflowState::COMPLETED);
$this->stateManager->save($instance);
$this->eventDispatcher->dispatch(new WorkflowCompletedEvent($instance));
$nextSteps = $instance->getNextSteps();

$this->logger->info('Workflow completed successfully', [
'workflow_id' => $instance->getId(),
'workflow_name' => $instance->getDefinition()->getName(),
'completed_steps' => count($instance->getCompletedSteps()),
'execution_time' => abs($instance->getUpdatedAt()->getTimestamp() - $instance->getCreatedAt()->getTimestamp()).'s',
]);
if (empty($nextSteps)) {
// Workflow completed successfully
$instance->setState(WorkflowState::COMPLETED);
$this->stateManager->save($instance);
$this->eventDispatcher->dispatch(new WorkflowCompletedEvent($instance));

return;
}
$this->logger->info('Workflow completed successfully', [
'workflow_id' => $instance->getId(),
'workflow_name' => $instance->getDefinition()->getName(),
'completed_steps' => count($instance->getCompletedSteps()),
'execution_time' => abs($instance->getUpdatedAt()->getTimestamp() - $instance->getCreatedAt()->getTimestamp()).'s',
]);

// Execute each next step
foreach ($nextSteps as $step) {
if ($instance->isStepCompleted($step->getId())) {
continue; // Skip already completed steps
return;
}

if (! $instance->canExecuteStep($step->getId())) {
continue; // Skip steps that can't be executed yet
$progressed = false;

foreach ($nextSteps as $step) {
if ($instance->isStepCompleted($step->getId())) {
continue; // Skip already completed steps
}

if (! $instance->canExecuteStep($step->getId())) {
continue; // Skip steps that can't be executed yet
}

$this->executeStep($instance, $step);
$progressed = true;
}

$this->executeStep($instance, $step);
// If no steps made progress this iteration, the workflow is stuck
// (e.g. all next steps were blocked on unmet prerequisites). Exit
// the loop and let the next resume() reattempt.
if (! $progressed) {
return;
}
}
}

Expand All @@ -206,6 +231,22 @@ private function processWorkflow(WorkflowInstance $instance): void
*/
private function executeStep(WorkflowInstance $instance, Step $step): void
{
// Evaluate step conditions. Steps whose conditions don't match the current
// workflow data are skipped (marked completed without running the action)
// so that downstream transitions continue to flow.
if (! $step->canExecute($instance->getData())) {
$this->logger->info('Skipping workflow step; conditions not met', [
'workflow_id' => $instance->getId(),
'step_id' => $step->getId(),
'conditions' => $step->getConditions(),
]);

$instance->setCurrentStepId($step->getId());
$this->stateManager->markStepCompleted($instance, $step->getId());

return;
}

$this->logger->info('Executing workflow step', [
'workflow_id' => $instance->getId(),
'workflow_name' => $instance->getDefinition()->getName(),
Expand All @@ -230,11 +271,7 @@ private function executeStep(WorkflowInstance $instance, Step $step): void
'workflow_id' => $instance->getId(),
'step_id' => $step->getId(),
]);

// Continue execution recursively
$this->processWorkflow($instance);

} catch (Exception $e) {
} catch (\Throwable $e) {
$context = new WorkflowContext(
workflowId: $instance->getId(),
stepId: $step->getId(),
Expand All @@ -243,13 +280,11 @@ private function executeStep(WorkflowInstance $instance, Step $step): void
instance: $instance
);

// Create detailed step execution exception
$stepException = match (true) {
$e instanceof ActionNotFoundException => $e,
str_contains($e->getMessage(), 'does not exist') => ActionNotFoundException::classNotFound($step->getActionClass(), $step, $context),
str_contains($e->getMessage(), 'must implement') => ActionNotFoundException::invalidInterface($step->getActionClass(), $step, $context),
default => StepExecutionException::fromException($e, $step, $context)
};
// Wrap non-typed throwables in a StepExecutionException while preserving
// ActionNotFoundException (and other domain exceptions) as-is.
$stepException = $e instanceof ActionNotFoundException
? $e
: StepExecutionException::fromException($e, $step, $context);

$this->logger->error('Workflow step execution failed', [
'workflow_id' => $instance->getId(),
Expand Down Expand Up @@ -278,6 +313,13 @@ private function executeStep(WorkflowInstance $instance, Step $step): void
* @throws ActionNotFoundException If the action class doesn't exist
* @throws StepExecutionException If all retry attempts are exhausted
*/
/**
* Maximum backoff sleep between retries, in microseconds. Caps the
* exponential growth so a misconfigured step cannot block a worker for
* minutes.
*/
private const MAX_BACKOFF_MICROSECONDS = 2_000_000; // 2 seconds

private function executeActionWithRetry(WorkflowInstance $instance, Step $step): void
{
$maxAttempts = $step->getRetryAttempts() + 1; // +1 for initial attempt
Expand All @@ -289,16 +331,12 @@ private function executeActionWithRetry(WorkflowInstance $instance, Step $step):
return;
}

$lastException = null;

for ($attempt = 1; $attempt <= $maxAttempts; $attempt++) {
try {
$this->executeAction($instance, $step);

return; // Success — exit retry loop
} catch (\Exception $e) {
$lastException = $e;

} catch (\Throwable $e) {
if ($attempt === $maxAttempts) {
$this->logger->error('Step failed after all retry attempts', [
'workflow_id' => $instance->getId(),
Expand All @@ -311,11 +349,14 @@ private function executeActionWithRetry(WorkflowInstance $instance, Step $step):
throw $e; // Final attempt failed — propagate
}

$backoffMicroseconds = $this->calculateBackoff($attempt);

$this->logger->warning('Step failed, retrying', [
'workflow_id' => $instance->getId(),
'step_id' => $step->getId(),
'attempt' => $attempt,
'max_attempts' => $maxAttempts,
'backoff_ms' => (int) ($backoffMicroseconds / 1000),
'error' => $e->getMessage(),
]);

Expand All @@ -327,51 +368,73 @@ private function executeActionWithRetry(WorkflowInstance $instance, Step $step):
$e
));

// Exponential backoff: 100ms, 200ms, 400ms... (keep short for a library)
$backoffMicroseconds = (int) (100000 * pow(2, $attempt - 1));
usleep($backoffMicroseconds);
}
}
}

/**
* Calculate exponential backoff delay between retry attempts.
*
* Doubles each attempt starting at 100ms, capped at MAX_BACKOFF_MICROSECONDS
* to prevent runaway worker blocking.
*
* @param int $attempt 1-based attempt number
* @return int Delay in microseconds
*/
private function calculateBackoff(int $attempt): int
{
$base = 100_000; // 100ms
$delay = (int) ($base * (2 ** ($attempt - 1)));

return min($delay, self::MAX_BACKOFF_MICROSECONDS);
}

/**
* Execute a callback with a timeout constraint.
*
* Uses pcntl_alarm when available, otherwise logs a warning and executes without timeout.
* Uses pcntl_alarm when the pcntl extension is loaded. pcntl is generally
* only available under the CLI SAPI, so for web/FPM contexts this method
* logs a warning and runs the callback unbounded — long-running workflow
* steps should be dispatched via a queue worker instead.
*
* @param callable $callback The callback to execute
* @param int $timeoutSeconds Maximum execution time in seconds
* @return mixed The callback's return value
*
* @throws StepExecutionException If the timeout is exceeded
* @throws \RuntimeException If the timeout is exceeded while running under pcntl
*/
private function executeWithTimeout(callable $callback, int $timeoutSeconds): mixed
{
if (! function_exists('pcntl_alarm') || ! function_exists('pcntl_signal')) {
if (! function_exists('pcntl_alarm') || ! function_exists('pcntl_signal') || ! function_exists('pcntl_async_signals')) {
$this->logger->warning('pcntl extension not available, timeout not enforced', [
'timeout_seconds' => $timeoutSeconds,
'hint' => 'Execute workflows via CLI or queue workers to enforce step timeouts.',
]);

return $callback();
}

pcntl_signal(SIGALRM, function () use ($timeoutSeconds) {
// Ensure the signal handler runs at the VM tick rather than waiting for
// an explicit pcntl_signal_dispatch() call. Without this, SIGALRM can
// be delivered unpredictably or not at all.
$previousAsync = pcntl_async_signals(true);
$previousHandler = pcntl_signal_get_handler(SIGALRM);

pcntl_signal(SIGALRM, function () use ($timeoutSeconds): never {
throw new \RuntimeException("Step execution timed out after {$timeoutSeconds} seconds");
});

pcntl_alarm($timeoutSeconds);

try {
$result = $callback();
pcntl_alarm(0);

return $result;
} catch (\Exception $e) {
pcntl_alarm(0);

throw $e;
return $callback();
} finally {
pcntl_signal(SIGALRM, SIG_DFL);
// Always clear the alarm and restore the previous signal handler,
// even if the callback threw or the alarm fired.
pcntl_alarm(0);
pcntl_signal(SIGALRM, $previousHandler ?: SIG_DFL);
pcntl_async_signals($previousAsync);
}
}

Expand Down
Loading