diff --git a/CHANGELOG.md b/CHANGELOG.md index c37960b..f278963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **`Scope::allowZombies(): Scope`** — opt back into safe disposal on a scope (sets `DISPOSE_SAFELY`); returns `$this` for chaining. Inverse of the existing `asNotSafely()`. Use after `new Scope()` when the scope is expected to outlive coroutines parked in `delay()`/`recv()`/etc., turning them into zombies instead of cancelling them at dispose time. ### Fixed +- **#154 `ThreadPool` worker crash on `exit()`/`die()` in a task or bootloader** — a graceful `exit()`/`die()` inside a submitted task or the bootloader threw an unwind-exit token that the worker either passed to `reject()` or re-raised via `zend_bailout()` — both crash the worker fiber with `"Error transfer requires a throwable value"` (assert + ASAN stack-use-after-return at `zend_fibers.c:491`). Fixed in `thread_pool.c`: the sync task call and the bootloader call now run under a `zend_try`, and the worker checks `zend_is_unwind_exit()`/`zend_is_graceful_exit()` on `EG(exception)`. Behaviour: `exit()`/`die()` in a **task** is graceful "this task is done" — the worker's request survives it, so the task's future resolves to **`null`** and the worker keeps serving subsequent tasks (verified mixing `exit()` with throwing and normal tasks on a single worker). A real fatal error (e.g. OOM `zend_bailout`) or `exit()`/`die()` in the **bootloader** instead delivers an `Async\ThreadTransferException` to pending awaiters and tears the pool down — the worker can't safely continue. Regression tests `tests/thread_pool/061-bootloader_exit.phpt`, `062-task_exit_sync.phpt`, `064-task_exit_worker_survives.phpt`. +- **#154 `ThreadPool` swallowed the real error when a `$this`-bound bootloader could not load on the worker** — a bootloader bound to `$this` of a class not defined on the worker (or whose body threw) had its exception converted to an `E_WARNING` and discarded, while pending tasks were rejected with a generic `"task was cancelled before execution"` cancellation — hiding the cause. `thread_pool_drain_tasks` gained a `reject_with` parameter so the worker now propagates the actual error (e.g. `Cannot load transferred object: class "C" not found`, or the thrown exception) to every awaiter. Regression tests `tests/thread_pool/060-bootloader_this_transfer_error.phpt`, `063-bootloader_exception.phpt`. - **#146 Thread-pool task freed under a still-running worker (cross-thread UAF)** — `libuv_queue_task` took no reference on the `zend_async_task_t` it handed to `uv_queue_work`; the work wrapper held only a raw pointer. A coroutine awaiting the task that was **cancelled while the worker thread was still inside the task's `run()`** (e.g. blocked in a contended `flock()`) released its refs and freed the task — and its inline-tail data — out from under the detached worker. When the worker's syscall returned it wrote into freed memory (`heap-use-after-free in php_stdiop_flock_task_run`, `main/streams/plain_wrapper.c:1208`). Only reproducible under real multi-core scheduling (CI Linux x64 ASAN), not single-host WSL2. Fixed in `ext/async/libuv_reactor.c`: the in-flight work now owns a reference — `ZEND_ASYNC_EVENT_ADD_REF` after `uv_queue_work`, matched by `ZEND_ASYNC_EVENT_RELEASE` in `libuv_task_after_work_cb` (which libuv runs only after the worker returns). The task now outlives its worker regardless of coroutine cancellation — a general fix for every thread-pool task, not just `flock()`. This completes the earlier #146 work (inline-tail task data + pin-across-SUSPEND in php-src `plain_wrapper.c`), which was necessary but did not cover the cancel-vs-blocked-worker race. Regression backstop: the cancel-mid-flock scenarios in `fuzzy-tests/io/flock_chaos.feature`. - **#139 Late await() on coroutine that finished with an exception no longer double-throws** — when a coroutine body threw without an awaiter subscribed at that moment, `coroutine.c` rethrew the exception immediately into the parent frame *and* stored it on the handle. A subsequent `await($coro)` then delivered a second copy through future-replay, the catch ran, but the first copy surfaced as `Uncaught Fatal` once the catch unwound. Reworked to Future-style semantics: the exception is held on the coroutine and only surfaced (a) immediately when nobody can ever observe it — refcount ≤ 1 or `{main}` — or (b) at handle destruction via fire-and-forget safety net. Sticky observation tracked via the existing `EXC_CAUGHT` event flag, lifted from per-NOTIFY `EXCEPTION_HANDLED` after `CALLBACKS_NOTIFY` and set by `zend_async_resume_when` on every await subscription. Edge case verified: when a scope dies before its coroutine throws, child coroutines receive `AsyncCancellation` (handled by spec), so the refcount heuristic cannot misfire. Regression test `tests/coroutine/039-await_after_finished_with_exception.phpt`. - **#143 Async\iterate: heap-use-after-free with refcounted values/keys (e.g. generator yielding fresh strings)** — `iterate()`'s userland-callback path had two refcount bugs colliding in `ext/async/iterator.c`. (1) After `zval_ptr_dtor(&fci.params[0])` the slot was not reset to `IS_UNDEF`, so the post-loop cleanup at line 487 dtor'd the same slot a second time → UAF on the just-freed string. (2) `ZVAL_COPY_VALUE(&fci.params[1], &key)` aliased the key into params[1] without bumping refcount, but both `&key` (line 450) and `&fci.params[1]` (line 472) were dtor'd → double-free on string keys. Both bugs were silent for int values / interned-string keys (the existing 009-iterate_generator test used `'a'/'b'/'c' => 100/200/300`), so neither was caught until the chaos suite started yielding `FAST_CONCAT` strings. Surfaced as `heap-use-after-free in zend_gc_delref` from `zend_generator_free_storage` under ASAN. Fix: `ZVAL_UNDEF(&fci.params[0])` after dtor, and `ZVAL_COPY` (not `_VALUE`) for params[1] — symmetric with params[0]. Regression test `tests/iterate/015-iterate_generator_refcounted_values.phpt`. diff --git a/tests/thread_channel/038-this_bound_closure.phpt b/tests/thread_channel/038-this_bound_closure.phpt new file mode 100644 index 0000000..768df55 --- /dev/null +++ b/tests/thread_channel/038-this_bound_closure.phpt @@ -0,0 +1,44 @@ +--TEST-- +ThreadChannel: $this-bound closure transferred to a worker thread +--SKIPIF-- + +--FILE-- +n + 100; + }; + } +} + +$boot = function() { + eval('class C { public int $n = 21; }'); +}; + +spawn(function() use ($boot) { + $ch = new ThreadChannel(1); + + $t = spawn_thread(function() use ($ch) { + $cl = $ch->recv(); + echo "worker got: ", $cl(), "\n"; + }, bootloader: $boot); + + $c = new C(); + $ch->send($c->makeClosure()); + await($t); +}); +?> +--EXPECT-- +worker got: 121 diff --git a/tests/thread_channel/039-method_as_closure.phpt b/tests/thread_channel/039-method_as_closure.phpt new file mode 100644 index 0000000..7c3697a --- /dev/null +++ b/tests/thread_channel/039-method_as_closure.phpt @@ -0,0 +1,42 @@ +--TEST-- +ThreadChannel: first-class callable method ($obj->method(...)) transferred to a worker thread +--SKIPIF-- + +--FILE-- +n * 2; + } +} + +$boot = function() { + eval('class C { public int $n = 30; function work(): int { return $this->n * 2; } }'); +}; + +spawn(function() use ($boot) { + $ch = new ThreadChannel(1); + + $t = spawn_thread(function() use ($ch) { + $cl = $ch->recv(); + echo "worker got: ", $cl(), "\n"; + }, bootloader: $boot); + + $c = new C(); + $ch->send($c->work(...)); + await($t); +}); +?> +--EXPECT-- +worker got: 60 diff --git a/tests/thread_pool/042-submit_this_basic.phpt b/tests/thread_pool/042-submit_this_basic.phpt new file mode 100644 index 0000000..4af83ae --- /dev/null +++ b/tests/thread_pool/042-submit_this_basic.phpt @@ -0,0 +1,46 @@ +--TEST-- +ThreadPool: submit() $this-bound closure transfers $this as a deep copy +--SKIPIF-- + +--FILE-- +submit(function() { + $before = "n={$this->n} tag={$this->tag} class=" . get_class($this); + $this->n = 999; + return $before . " -> n={$this->n}"; + }); + return await($f); + } +} + +$boot = function() { + eval('class Runner { public int $n = 0; public string $tag = "hi"; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + $r = new Runner(); + $r->n = 42; + echo $r->go($pool), "\n"; + echo "parent n={$r->n}\n"; + $pool->close(); +}); +?> +--EXPECT-- +n=42 tag=hi class=Runner -> n=999 +parent n=42 diff --git a/tests/thread_pool/043-submit_this_private_protected.phpt b/tests/thread_pool/043-submit_this_private_protected.phpt new file mode 100644 index 0000000..3e57697 --- /dev/null +++ b/tests/thread_pool/043-submit_this_private_protected.phpt @@ -0,0 +1,38 @@ +--TEST-- +ThreadPool: submit() $this-bound closure can access protected/private properties +--SKIPIF-- + +--FILE-- +submit(function() { + return "secret={$this->secret} p={$this->p} => " . ($this->secret * 2); + }); + return await($f); + } +} + +$boot = function() { + eval('class C { private int $secret = 7; protected string $p = "prot"; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +secret=7 p=prot => 14 diff --git a/tests/thread_pool/044-submit_this_self_cycle.phpt b/tests/thread_pool/044-submit_this_self_cycle.phpt new file mode 100644 index 0000000..5ba889d --- /dev/null +++ b/tests/thread_pool/044-submit_this_self_cycle.phpt @@ -0,0 +1,39 @@ +--TEST-- +ThreadPool: submit() $this with a self-cycle ($this->self === $this) +--SKIPIF-- + +--FILE-- +self = $this; + $f = $pool->submit(function() { + return var_export($this->self === $this, true) . ":" . $this->n; + }); + return await($f); + } +} + +$boot = function() { + eval('class C { public $self; public int $n = 5; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +true:5 diff --git a/tests/thread_pool/045-submit_this_readonly.phpt b/tests/thread_pool/045-submit_this_readonly.phpt new file mode 100644 index 0000000..3cd06e7 --- /dev/null +++ b/tests/thread_pool/045-submit_this_readonly.phpt @@ -0,0 +1,37 @@ +--TEST-- +ThreadPool: submit() $this with a readonly property +--SKIPIF-- + +--FILE-- +submit(function() { + return $this->n; + }); + return await($f); + } +} + +$boot = function() { + eval('class C { public function __construct(public readonly int $n = 11) {} }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C(99))->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=99 diff --git a/tests/thread_pool/046-submit_this_enum_instance.phpt b/tests/thread_pool/046-submit_this_enum_instance.phpt new file mode 100644 index 0000000..a9abd5c --- /dev/null +++ b/tests/thread_pool/046-submit_this_enum_instance.phpt @@ -0,0 +1,38 @@ +--TEST-- +ThreadPool: submit() $this is a BackedEnum case; access through $this works +--SKIPIF-- + +--FILE-- +submit(function() { + return $this->value . ":" . $this->name; + }); + return await($f); + } +} + +$boot = function() { + eval('enum Suit: string { case H = "h"; case S = "s"; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", Suit::H->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=h:H diff --git a/tests/thread_pool/047-submit_this_enum_property.phpt b/tests/thread_pool/047-submit_this_enum_property.phpt new file mode 100644 index 0000000..3b31a3a --- /dev/null +++ b/tests/thread_pool/047-submit_this_enum_property.phpt @@ -0,0 +1,44 @@ +--TEST-- +ThreadPool: submit() $this has an enum-typed property; value preserved across transfer +--SKIPIF-- + +--FILE-- +submit(function() { + return $this->c->value; + }); + return await($f); + } +} + +$boot = function() { + eval('enum Color: string { case R = "r"; case G = "g"; } class C { public Color $c = Color::R; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + $c = new C(); + $c->c = Color::G; + echo "result=", $c->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=g diff --git a/tests/thread_pool/048-submit_this_weakref.phpt b/tests/thread_pool/048-submit_this_weakref.phpt new file mode 100644 index 0000000..3c505ed --- /dev/null +++ b/tests/thread_pool/048-submit_this_weakref.phpt @@ -0,0 +1,47 @@ +--TEST-- +ThreadPool: submit() $this with a WeakReference property; identity preserved when target reachable +--SKIPIF-- + +--FILE-- +submit(function() { + $t = $this->w->get(); + return $t === null ? "dead" : $t->v; + }); + return await($f); + } +} + +$boot = function() { + eval('class T { public int $v = 99; } class C { public \WeakReference $w; public T $strong; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + $c = new C(); + $t = new T(); + $c->strong = $t; + $c->w = \WeakReference::create($t); + echo "result=", $c->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=99 diff --git a/tests/thread_pool/049-submit_this_typed_props.phpt b/tests/thread_pool/049-submit_this_typed_props.phpt new file mode 100644 index 0000000..a44bf12 --- /dev/null +++ b/tests/thread_pool/049-submit_this_typed_props.phpt @@ -0,0 +1,41 @@ +--TEST-- +ThreadPool: submit() $this with strict-typed properties; types preserved in worker copy +--SKIPIF-- + +--FILE-- +submit(function() { + return gettype($this->f) . ":" . var_export($this->s, true); + }); + return await($f); + } +} + +$boot = function() { + eval('class C { public float $f = 1.5; public ?string $s = null; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + $c = new C(); + $c->f = 2.5; + $c->s = "set"; + echo "result=", $c->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=double:'set' diff --git a/tests/thread_pool/050-submit_this_method_dispatch.phpt b/tests/thread_pool/050-submit_this_method_dispatch.phpt new file mode 100644 index 0000000..fb39e00 --- /dev/null +++ b/tests/thread_pool/050-submit_this_method_dispatch.phpt @@ -0,0 +1,41 @@ +--TEST-- +ThreadPool: submit() $this->method() dispatch inside the transferred closure +--SKIPIF-- + +--FILE-- +n * 10; + } + + public function go(ThreadPool $pool): int { + $f = $pool->submit(function() { + return $this->helper(); + }); + return await($f); + } +} + +$boot = function() { + eval('class C { public int $n = 3; function helper(): int { return $this->n * 10; } }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=30 diff --git a/tests/thread_pool/051-submit_this_first_class_callable.phpt b/tests/thread_pool/051-submit_this_first_class_callable.phpt new file mode 100644 index 0000000..cdd0c82 --- /dev/null +++ b/tests/thread_pool/051-submit_this_first_class_callable.phpt @@ -0,0 +1,38 @@ +--TEST-- +ThreadPool: submit() first-class callable from an instance method ($this->method(...)) +--SKIPIF-- + +--FILE-- +n} => " . ($this->n + 100); + } + + public function go(ThreadPool $pool): string { + return await($pool->submit($this->work(...))); + } +} + +$boot = function() { + eval('class C { public int $n = 4; function work(): string { return "n={$this->n} => " . ($this->n + 100); } }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=n=4 => 104 diff --git a/tests/thread_pool/052-submit_this_first_class_external.phpt b/tests/thread_pool/052-submit_this_first_class_external.phpt new file mode 100644 index 0000000..129a9e1 --- /dev/null +++ b/tests/thread_pool/052-submit_this_first_class_external.phpt @@ -0,0 +1,35 @@ +--TEST-- +ThreadPool: submit() first-class callable from an external object ($obj->method(...)) +--SKIPIF-- + +--FILE-- +n + 1; + } +} + +$boot = function() { + eval('class C { public int $n = 6; function work(): int { return $this->n + 1; } }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + $c = new C(); + echo "result=", await($pool->submit($c->work(...))), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=7 diff --git a/tests/thread_pool/053-submit_this_from_callable.phpt b/tests/thread_pool/053-submit_this_from_callable.phpt new file mode 100644 index 0000000..a73af5e --- /dev/null +++ b/tests/thread_pool/053-submit_this_from_callable.phpt @@ -0,0 +1,39 @@ +--TEST-- +ThreadPool: submit() Closure::fromCallable([$this, 'method']) +--SKIPIF-- + +--FILE-- +n * 3; + } + + public function go(ThreadPool $pool): int { + $cl = Closure::fromCallable([$this, 'work']); + return await($pool->submit($cl)); + } +} + +$boot = function() { + eval('class C { public int $n = 8; function work(): int { return $this->n * 3; } }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=24 diff --git a/tests/thread_pool/054-submit_this_closure_bind.phpt b/tests/thread_pool/054-submit_this_closure_bind.phpt new file mode 100644 index 0000000..0da4808 --- /dev/null +++ b/tests/thread_pool/054-submit_this_closure_bind.phpt @@ -0,0 +1,34 @@ +--TEST-- +ThreadPool: submit() Closure::bind to an object before submit +--SKIPIF-- + +--FILE-- +n + 5; + }, $c, C::class); + echo "result=", await($pool->submit($cl)), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=17 diff --git a/tests/thread_pool/055-submit_this_arrow_fn.phpt b/tests/thread_pool/055-submit_this_arrow_fn.phpt new file mode 100644 index 0000000..c01bce9 --- /dev/null +++ b/tests/thread_pool/055-submit_this_arrow_fn.phpt @@ -0,0 +1,34 @@ +--TEST-- +ThreadPool: submit() arrow function fn() => $this->x +--SKIPIF-- + +--FILE-- +submit(fn() => $this->n + 7)); + } +} + +$boot = function() { + eval('class C { public int $n = 13; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=20 diff --git a/tests/thread_pool/056-submit_this_nested_closure.phpt b/tests/thread_pool/056-submit_this_nested_closure.phpt new file mode 100644 index 0000000..75b98f4 --- /dev/null +++ b/tests/thread_pool/056-submit_this_nested_closure.phpt @@ -0,0 +1,40 @@ +--TEST-- +ThreadPool: submit() nested closure created at runtime inherits $this from outer +--SKIPIF-- + +--FILE-- +n * 2; + }; + return $inner(); + }; + return await($pool->submit(Closure::bind($outer, $this, C::class))); + } +} + +$boot = function() { + eval('class C { public int $n = 14; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=28 diff --git a/tests/thread_pool/057-submit_static_first_class.phpt b/tests/thread_pool/057-submit_static_first_class.phpt new file mode 100644 index 0000000..4cbe53d --- /dev/null +++ b/tests/thread_pool/057-submit_static_first_class.phpt @@ -0,0 +1,32 @@ +--TEST-- +ThreadPool: submit() first-class callable from a static method (no $this) +--SKIPIF-- + +--FILE-- +submit(C::work(...))), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=42 diff --git a/tests/thread_pool/058-submit_this_class_missing.phpt b/tests/thread_pool/058-submit_this_class_missing.phpt new file mode 100644 index 0000000..46c45d7 --- /dev/null +++ b/tests/thread_pool/058-submit_this_class_missing.phpt @@ -0,0 +1,36 @@ +--TEST-- +ThreadPool: submit() $this with no bootloader → clear error, no SEGV +--SKIPIF-- + +--FILE-- +submit(function() { + return $this->n; + })); + } catch (\Throwable $e) { + return get_class($e) . ': ' . $e->getMessage(); + } + } +} + +spawn(function() { + $pool = new ThreadPool(2); + echo (new C)->go($pool), "\n"; + $pool->close(); +}); +?> +--EXPECTF-- +%Aclass "C" not found%A diff --git a/tests/thread_pool/059-submit_this_with_use_vars.phpt b/tests/thread_pool/059-submit_this_with_use_vars.phpt new file mode 100644 index 0000000..223b4b2 --- /dev/null +++ b/tests/thread_pool/059-submit_this_with_use_vars.phpt @@ -0,0 +1,37 @@ +--TEST-- +ThreadPool: submit() closure that is both $this-bound and captures use() vars +--SKIPIF-- + +--FILE-- +submit(function() use ($extra) { + return $this->n + $extra; + }); + return await($f); + } +} + +$boot = function() { + eval('class C { public int $n = 10; }'); +}; + +spawn(function() use ($boot) { + $pool = new ThreadPool(2, 0, $boot); + echo "result=", (new C)->go($pool, 5), "\n"; + $pool->close(); +}); +?> +--EXPECT-- +result=15 diff --git a/tests/thread_pool/060-bootloader_this_transfer_error.phpt b/tests/thread_pool/060-bootloader_this_transfer_error.phpt new file mode 100644 index 0000000..f0f0207 --- /dev/null +++ b/tests/thread_pool/060-bootloader_this_transfer_error.phpt @@ -0,0 +1,41 @@ +--TEST-- +ThreadPool: $this-bound bootloader that can't load on the worker rejects the task with the real error +--SKIPIF-- + +--FILE-- +boot(...)); + try { + await($pool->submit(fn() => 1)); + echo "no exception\n"; + } catch (\Throwable $e) { + echo get_class($e), ": ", $e->getMessage(), "\n"; + } + $pool->close(); +}); +?> +--EXPECTF-- +%ACannot load transferred object: class "C" not found%A diff --git a/tests/thread_pool/061-bootloader_exit.phpt b/tests/thread_pool/061-bootloader_exit.phpt new file mode 100644 index 0000000..e74dd28 --- /dev/null +++ b/tests/thread_pool/061-bootloader_exit.phpt @@ -0,0 +1,30 @@ +--TEST-- +ThreadPool: exit() in the bootloader terminates the worker cleanly and rejects tasks (no crash) +--SKIPIF-- + +--FILE-- +submit(fn() => 1)); + echo "no exception\n"; + } catch (\Async\ThreadTransferException $e) { + echo "caught ThreadTransferException\n"; + echo "message not empty: ", (strlen($e->getMessage()) > 0 ? "yes" : "no"), "\n"; + } + $pool->close(); +}); +?> +--EXPECT-- +caught ThreadTransferException +message not empty: yes diff --git a/tests/thread_pool/062-task_exit_sync.phpt b/tests/thread_pool/062-task_exit_sync.phpt new file mode 100644 index 0000000..54623ab --- /dev/null +++ b/tests/thread_pool/062-task_exit_sync.phpt @@ -0,0 +1,28 @@ +--TEST-- +ThreadPool: exit()/die() in a task resolves its future to null and the worker keeps serving (no crash) +--SKIPIF-- + +--FILE-- +submit(function() { exit(5); }); + $b = $pool->submit(function() { return 42; }); + var_dump(await($a)); + var_dump(await($b)); + $pool->close(); +}); +?> +--EXPECT-- +NULL +int(42) diff --git a/tests/thread_pool/063-bootloader_exception.phpt b/tests/thread_pool/063-bootloader_exception.phpt new file mode 100644 index 0000000..5d05906 --- /dev/null +++ b/tests/thread_pool/063-bootloader_exception.phpt @@ -0,0 +1,28 @@ +--TEST-- +ThreadPool: an exception thrown in the bootloader body propagates to awaiting tasks +--SKIPIF-- + +--FILE-- +submit(fn() => 1)); + echo "no exception\n"; + } catch (\Throwable $e) { + echo get_class($e), ": ", $e->getMessage(), "\n"; + } + $pool->close(); +}); +?> +--EXPECT-- +RuntimeException: boot failed! diff --git a/tests/thread_pool/064-task_exit_worker_survives.phpt b/tests/thread_pool/064-task_exit_worker_survives.phpt new file mode 100644 index 0000000..a464890 --- /dev/null +++ b/tests/thread_pool/064-task_exit_worker_survives.phpt @@ -0,0 +1,42 @@ +--TEST-- +ThreadPool: a worker survives exit()/throw mixed with normal tasks (single worker, ordered) +--SKIPIF-- + +--FILE-- +submit(function() { exit(1); }); + $t2 = $pool->submit(function() { return "normal-1"; }); + $t3 = $pool->submit(function() { throw new RuntimeException("boom"); }); + $t4 = $pool->submit(function() { exit(2); }); + $t5 = $pool->submit(function() { return 7 * 7; }); + + var_dump(await($t1)); + var_dump(await($t2)); + try { + await($t3); + } catch (\RuntimeException $e) { + echo "t3: ", $e->getMessage(), "\n"; + } + var_dump(await($t4)); + var_dump(await($t5)); + + $pool->close(); +}); +?> +--EXPECT-- +NULL +string(8) "normal-1" +t3: boom +NULL +int(49) diff --git a/thread_pool.c b/thread_pool.c index e984882..935acf7 100644 --- a/thread_pool.c +++ b/thread_pool.c @@ -42,7 +42,7 @@ static zend_object_handlers thread_pool_handlers; static void thread_pool_destroy(async_thread_pool_t *pool); static void thread_pool_close(async_thread_pool_t *pool); -static void thread_pool_drain_tasks(async_thread_pool_t *pool, bool reject); +static void thread_pool_drain_tasks(async_thread_pool_t *pool, bool reject, zend_object *reject_with); static bool thread_pool_spawn_task_coroutine( async_thread_pool_t *pool, zend_async_scope_t *pool_scope, zval *callable, @@ -79,6 +79,33 @@ static bool thread_pool_spawn_task_coroutine( static zend_function worker_root_function = { ZEND_INTERNAL_FUNCTION }; +/* Build a ThreadTransferException carrying the current bailout's message. + * Used when the worker observed a graceful exit()/die() (unwind-exit token) or a + * fatal-error bailout: the pool delivers this to awaiters instead of re-raising + * zend_bailout() or passing the token to reject() — either crashes the worker + * fiber, which can't transfer a non-throwable exit token. */ +static zend_object *thread_pool_bailout_exception(void) +{ + const zend_string *msg = PG(last_error_message); + return async_new_exception(async_ce_thread_transfer_exception, "%s", + msg != NULL ? ZSTR_VAL(msg) + : "ThreadPool worker terminated via exit() or a fatal error"); +} + +/* Build a clean ThreadTransferException carrying another exception's message. + * Used for errors thrown deep in the cross-thread transfer machinery (e.g. + * "Cannot load transferred object"): that Error's full object graph — its + * backtrace reaches into worker-local load state — can crash the awaiter when + * deep-copied to the parent thread, so we re-ship only its message text. The + * copy happens inside async_new_exception while `src` is still alive. */ +static zend_object *thread_pool_wrap_transfer_error(zend_object *src) +{ + zval rv; + const zval *msg = zend_read_property_ex(src->ce, src, ZSTR_KNOWN(ZEND_STR_MESSAGE), 1, &rv); + return async_new_exception(async_ce_thread_transfer_exception, "%s", + (msg != NULL && Z_TYPE_P(msg) == IS_STRING) ? Z_STRVAL_P(msg) : "thread transfer failed"); +} + /* event is always NULL for pool workers (started via ZEND_ASYNC_START_THREAD) */ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *ctx) { @@ -126,29 +153,61 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c async_thread_create_closure(&pool->bootloader_snapshot->entry, &boot_callable); if (UNEXPECTED(EG(exception))) { - zend_exception_error(EG(exception), E_WARNING); + /* Bootloader transfer failed (e.g. a $this-bound bootloader whose + * class isn't defined on the worker). Re-ship the error's message + * as a clean transfer exception (the raw Error's backtrace reaches + * into worker-local load state and crashes the awaiter if copied). */ + zend_object *boot_ex = thread_pool_wrap_transfer_error(EG(exception)); zend_clear_exception(); zval_ptr_dtor(&boot_callable); thread_pool_close(pool); - thread_pool_drain_tasks(pool, true); + thread_pool_drain_tasks(pool, true, boot_ex); + OBJ_RELEASE(boot_ex); goto done; } zend_fcall_info boot_fci; zend_fcall_info_cache boot_fcc; + volatile bool boot_bailed = false; if (zend_fcall_info_init(&boot_callable, 0, &boot_fci, &boot_fcc, NULL, NULL) == SUCCESS) { boot_fci.retval = &boot_retval; - zend_call_function(&boot_fci, &boot_fcc); + zend_try { + zend_call_function(&boot_fci, &boot_fcc); + } zend_catch { + boot_bailed = true; + } zend_end_try(); } zval_ptr_dtor(&boot_retval); zval_ptr_dtor(&boot_callable); + if (boot_bailed + || (EG(exception) != NULL + && (zend_is_unwind_exit(EG(exception)) + || zend_is_graceful_exit(EG(exception))))) { + /* Bootloader called exit()/die() (unwind-exit token) or hit a fatal + * error (bailout). Convert into a transfer exception for every + * pending task instead of leaking the token through reject or + * re-raising zend_bailout(), either of which crashes the worker fiber. */ + if (EG(exception) != NULL) { + zend_clear_exception(); + } + zend_object *boot_ex = thread_pool_bailout_exception(); + thread_pool_close(pool); + thread_pool_drain_tasks(pool, true, boot_ex); + OBJ_RELEASE(boot_ex); + goto done; + } + if (UNEXPECTED(EG(exception))) { - zend_exception_error(EG(exception), E_WARNING); + /* Bootloader body threw — propagate the real exception to every + * pending task's awaiter instead of a generic cancellation. */ + zend_object *boot_ex = EG(exception); + GC_ADDREF(boot_ex); zend_clear_exception(); thread_pool_close(pool); - thread_pool_drain_tasks(pool, true); + thread_pool_drain_tasks(pool, true, boot_ex); + OBJ_RELEASE(boot_ex); goto done; } } @@ -330,21 +389,49 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c goto task_cleanup; } + /* A real fatal error longjmps (zend_bailout); exit()/die() instead + * throws an unwind-exit token into EG(exception). Neither may be + * re-raised or passed to reject() — the token can't cross the fiber + * boundary and would crash the worker fiber. */ + volatile bool task_bailed = false; + zend_try { + zend_call_function(&fci, &fcc); + } zend_catch { + task_bailed = true; + } zend_end_try(); + /* Decrement running and bump completed BEFORE notifying the awaiter * via complete/reject — otherwise a coroutine waking from await() * would observe stale running_count and a missing completed bump. */ - if (zend_call_function(&fci, &fcc) == SUCCESS && !EG(exception)) { - zend_atomic_int_dec(&pool->base.running_count); - zend_atomic_int_inc(&pool->base.completed_count); - async_future_shared_state_complete(state, &retval); - } else if (EG(exception)) { - zend_atomic_int_dec(&pool->base.running_count); - zend_atomic_int_inc(&pool->base.completed_count); + zend_atomic_int_dec(&pool->base.running_count); + zend_atomic_int_inc(&pool->base.completed_count); + + const bool task_exited = (EG(exception) != NULL + && (zend_is_unwind_exit(EG(exception)) + || zend_is_graceful_exit(EG(exception)))); + + if (task_exited) { + /* exit()/die() is graceful "this task is done" — the worker's + * request survives it, so resolve the future with null (no return + * value) and keep serving the next task. Never pass the unwind + * token to reject(): it can't cross the fiber to the awaiter. */ + zend_clear_exception(); + zval null_result; + ZVAL_NULL(&null_result); + async_future_shared_state_complete(state, &null_result); + } else if (task_bailed) { + /* A real fatal (e.g. OOM) leaves the worker's request unusable — + * deliver a transfer exception and tear the pool down. */ + zend_object *bex = thread_pool_bailout_exception(); + async_future_shared_state_reject(state, bex); + thread_pool_close(pool); + thread_pool_drain_tasks(pool, true, bex); + OBJ_RELEASE(bex); + } else if (EG(exception) != NULL) { async_future_shared_state_reject(state, EG(exception)); zend_clear_exception(); } else { - zend_atomic_int_dec(&pool->base.running_count); - zend_atomic_int_inc(&pool->base.completed_count); + async_future_shared_state_complete(state, &retval); } task_cleanup: @@ -399,12 +486,19 @@ static void thread_pool_worker_handler(zend_async_thread_event_t *event, void *c /* Restore execute_data */ EG(current_execute_data) = fake_frame.prev_execute_data; - /* Release worker's ref on pool */ - ZEND_THREAD_POOL_DELREF(&pool->base); - if (bailout) { - zend_bailout(); + /* A bailout escaped the per-task/bootloader guards (e.g. during the + * scheduler drain). Re-raising zend_bailout() inside the worker fiber + * crashes it, so reject any still-pending tasks and exit cleanly. Done + * before DELREF so the pool is still alive for the drain. */ + zend_object *bex = thread_pool_bailout_exception(); + thread_pool_close(pool); + thread_pool_drain_tasks(pool, true, bex); + OBJ_RELEASE(bex); } + + /* Release worker's ref on pool */ + ZEND_THREAD_POOL_DELREF(&pool->base); } /////////////////////////////////////////////////////////// @@ -698,10 +792,13 @@ static void thread_pool_close_base(zend_async_thread_pool_t *base) /** * Drain remaining tasks from channel buffer. - * @param reject If true, reject each task's future with a cancellation exception. - * If false, just release the shared_state ref (for destroy path). + * @param reject If true, reject each task's future with an exception. + * If false, just release the shared_state ref (for destroy path). + * @param reject_with Exception to reject with (borrowed). If NULL, a generic + * "cancelled before execution" exception is synthesized per task. + * Used to propagate the real bootloader-transfer error. */ -static void thread_pool_drain_tasks(async_thread_pool_t *pool, bool reject) +static void thread_pool_drain_tasks(async_thread_pool_t *pool, bool reject, zend_object *reject_with) { async_thread_channel_t *ch = pool->task_channel; if (ch == NULL) { @@ -754,11 +851,15 @@ static void thread_pool_drain_tasks(async_thread_pool_t *pool, bool reject) (zend_future_shared_state_t *)(uintptr_t) Z_LVAL_P(state_zv); if (reject) { - zend_object *exception = async_new_exception( - async_ce_cancellation_exception, - "ThreadPool task was cancelled before execution"); - async_future_shared_state_reject(state, exception); - OBJ_RELEASE(exception); + if (reject_with != NULL) { + async_future_shared_state_reject(state, reject_with); + } else { + zend_object *exception = async_new_exception( + async_ce_cancellation_exception, + "ThreadPool task was cancelled before execution"); + async_future_shared_state_reject(state, exception); + OBJ_RELEASE(exception); + } } async_future_shared_state_delref(state); @@ -775,7 +876,7 @@ static void thread_pool_drain_tasks(async_thread_pool_t *pool, bool reject) */ static void thread_pool_destroy(async_thread_pool_t *pool) { - thread_pool_drain_tasks(pool, false); + thread_pool_drain_tasks(pool, false, NULL); if (pool->task_channel != NULL) { pool->task_channel->channel.event.dispose(&pool->task_channel->channel.event); @@ -1169,7 +1270,7 @@ METHOD(cancel) thread_pool_close(pool); /* Reject queued (not-yet-picked-up) tasks. In-flight: sync runs to * completion (not preemptible), coroutine dies via scope cascade. */ - thread_pool_drain_tasks(pool, /*reject*/ true); + thread_pool_drain_tasks(pool, /*reject*/ true, NULL); } METHOD(isClosed)