Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 81 additions & 66 deletions src/lib/libpthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,74 +259,74 @@ var LibraryPThread = {
PThread.tlsInitFunctions.forEach((f) => f());
},
// Loads the WebAssembly module into the given Worker.
// onFinishedLoading: A callback function that will be called once all of
// the workers have been initialized and are
// ready to host pthreads.
loadWasmModuleToWorker: (worker) => new Promise((onFinishedLoading) => {
worker.onmessage = (e) => {
var d = e['data'];
var cmd = d.cmd;
#if PTHREADS_DEBUG
dbg(`main thread: received message '${cmd}' from worker. ${d}`);
#endif

// If this message is intended to a recipient that is not the main
// thread, forward it to the target thread.
if (d.targetThread && d.targetThread != _pthread_self()) {
var targetWorker = PThread.pthreads[d.targetThread];
if (targetWorker) {
targetWorker.postMessage(d, d.transferList);
} else {
err(`worker sent message (${cmd}) to pthread (${d.targetThread}) that no longer exists`);
// @returns: A promise the resolves once the worker has loaded the wasm module
// and is ready to run a pthread.
loadWasmModuleToWorker: (worker) => {
worker.loaded = new Promise((onFinishedLoading) => {
worker.onmessage = (e) => {
var d = e['data'];
var cmd = d.cmd;
#if PTHREADS_DEBUG
dbg(`main thread: received message '${cmd}' from worker. ${d}`);
#endif

// If this message is intended to a recipient that is not the main
// thread, forward it to the target thread.
if (d.targetThread && d.targetThread != _pthread_self()) {
var targetWorker = PThread.pthreads[d.targetThread];
if (targetWorker) {
targetWorker.postMessage(d, d.transferList);
} else {
err(`worker sent message (${cmd}) to pthread (${d.targetThread}) that no longer exists`);
}
return;
}
return;
}

if (cmd === 'checkMailbox') {
checkMailbox();
} else if (cmd === 'spawnThread') {
spawnThread(d);
} else if (cmd === 'cleanupThread') {
// cleanupThread needs to be run via callUserCallback since it calls
// back into user code to free thread data. Without this it's possible
// the unwind or ExitStatus exception could escape here.
callUserCallback(() => cleanupThread(d.thread));
#if MAIN_MODULE
} else if (cmd === 'markAsFinished') {
markAsFinished(d.thread);
#endif
} else if (cmd === 'loaded') {
worker.loaded = true;
#if ENVIRONMENT_MAY_BE_NODE && PTHREAD_POOL_SIZE
// Check that this worker doesn't have an associated pthread.
if (ENVIRONMENT_IS_NODE && !worker.pthread_ptr) {
// Once worker is loaded & idle, mark it as weakly referenced,
// so that mere existence of a Worker in the pool does not prevent
// Node.js from exiting the app.
worker.unref();
if (cmd === 'checkMailbox') {
checkMailbox();
} else if (cmd === 'spawnThread') {
spawnThread(d);
} else if (cmd === 'cleanupThread') {
// cleanupThread needs to be run via callUserCallback since it calls
// back into user code to free thread data. Without this it's possible
// the unwind or ExitStatus exception could escape here.
callUserCallback(() => cleanupThread(d.thread));
#if MAIN_MODULE
} else if (cmd === 'markAsFinished') {
markAsFinished(d.thread);
#endif
} else if (cmd === 'loaded') {
#if ENVIRONMENT_MAY_BE_NODE && PTHREAD_POOL_SIZE
// Check that this worker doesn't have an associated pthread.
if (ENVIRONMENT_IS_NODE && !worker.pthread_ptr) {
// Once worker is loaded & idle, mark it as weakly referenced,
// so that mere existence of a Worker in the pool does not prevent
// Node.js from exiting the app.
worker.unref();
}
#endif
onFinishedLoading();
} else if (d.target === 'setimmediate') {
// Worker wants to postMessage() to itself to implement setImmediate()
// emulation.
worker.postMessage(d);
#if ENVIRONMENT_MAY_BE_NODE
} else if (cmd === 'uncaughtException') {
// Message handler for Node.js specific out-of-order behavior:
// https://github.com/nodejs/node/issues/59617
// A pthread sent an uncaught exception event. Re-raise it on the main thread.
worker.onerror(d.error);
#endif
} else if (cmd === 'callHandler') {
Module[d.handler](...d.args);
} else if (cmd) {
// The received message looks like something that should be handled by this message
// handler, (since there is a e.data.cmd field present), but is not one of the
// recognized commands:
err(`worker sent an unknown command ${cmd}`);
}
#endif
onFinishedLoading(worker);
} else if (d.target === 'setimmediate') {
// Worker wants to postMessage() to itself to implement setImmediate()
// emulation.
worker.postMessage(d);
#if ENVIRONMENT_MAY_BE_NODE
} else if (cmd === 'uncaughtException') {
// Message handler for Node.js specific out-of-order behavior:
// https://github.com/nodejs/node/issues/59617
// A pthread sent an uncaught exception event. Re-raise it on the main thread.
worker.onerror(d.error);
#endif
} else if (cmd === 'callHandler') {
Module[d.handler](...d.args);
} else if (cmd) {
// The received message looks like something that should be handled by this message
// handler, (since there is a e.data.cmd field present), but is not one of the
// recognized commands:
err(`worker sent an unknown command ${cmd}`);
}
};
};
});

worker.onerror = (e) => {
var message = 'worker sent an error!';
Expand Down Expand Up @@ -423,7 +423,9 @@ var LibraryPThread = {
'workerID': worker.workerID,
#endif
});
}),

return worker.loaded;
},

#if PTHREAD_POOL_SIZE
async loadWasmModuleToAllWorkers() {
Expand Down Expand Up @@ -709,12 +711,20 @@ var LibraryPThread = {
// so that its existence does not prevent Node.js from exiting. This
// has no effect if the worker is already weakly referenced (e.g. if
// this worker was previously idle/unused).
#if ASYNCIFY
worker.loaded.then(() => worker.unref());
#else
worker.unref();
#endif
}
#endif
// Ask the worker to start executing its pthread entry point function.
worker.postMessage(msg, threadParams.transferList);
#if ASYNCIFY
return worker.loaded;
#else
return 0;
#endif
},

_emscripten_init_main_thread_js: (tb) => {
Expand Down Expand Up @@ -759,6 +769,11 @@ var LibraryPThread = {
// allocations from __pthread_create_js we could also remove this.
__pthread_create_js__noleakcheck: true,
#endif
// Pthread creation is async when possible. This allows us to return to the
// event loop and wait for the Worker to be created.
// This is needed in browsers where syncronous worker creation is still not
// possible: <BUG_LINK>
Comment on lines +774 to +775
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This is needed in browsers where syncronous worker creation is still not
// possible: <BUG_LINK>
// This is needed in browsers where synchronous worker creation is still not
// possible: <BUG_LINK>

Do you have a bug to link to?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hoping @juj might have a link to this?

__pthread_create_js__async: 'auto',
__pthread_create_js__deps: ['$spawnThread', '$pthreadCreateProxied',
'emscripten_has_threading_support',
#if OFFSCREENCANVAS_SUPPORT
Expand Down
2 changes: 1 addition & 1 deletion src/postamble.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var mainArgs = undefined;
var ret = entryFunction(argc, {{{ to64('argv') }}});
#endif // STANDALONE_WASM

#if ASYNCIFY == 2 && !PROXY_TO_PTHREAD
#if ASYNCIFY == 2
Comment thread
sbc100 marked this conversation as resolved.
// The current spec of JSPI returns a promise only if the function suspends
// and a plain value otherwise. This will likely change:
// https://github.com/WebAssembly/js-promise-integration/issues/11
Expand Down
8 changes: 4 additions & 4 deletions test/codesize/test_codesize_minimal_pthreads.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"a.out.js": 7367,
"a.out.js.gz": 3587,
"a.out.js": 7363,
"a.out.js.gz": 3582,
"a.out.nodebug.wasm": 19037,
"a.out.nodebug.wasm.gz": 8787,
"total": 26404,
"total_gz": 12374,
"total": 26400,
"total_gz": 12369,
"sent": [
"a (memory)",
"b (exit)",
Expand Down
8 changes: 4 additions & 4 deletions test/codesize/test_codesize_minimal_pthreads_memgrowth.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"a.out.js": 7776,
"a.out.js.gz": 3791,
"a.out.js": 7772,
"a.out.js.gz": 3785,
"a.out.nodebug.wasm": 19038,
"a.out.nodebug.wasm.gz": 8788,
"total": 26814,
"total_gz": 12579,
"total": 26810,
"total_gz": 12573,
"sent": [
"a (memory)",
"b (exit)",
Expand Down
41 changes: 41 additions & 0 deletions test/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,47 @@ def metafunc(self, mode, *args, **kwargs):
return metafunc


def with_asyncify_and_jspi(func):
assert callable(func)

@wraps(func)
def metafunc(self, jspi, *args, **kwargs):
if self.get_setting('WASM_ESM_INTEGRATION'):
self.skipTest('WASM_ESM_INTEGRATION is not compatible with ASYNCIFY')
if jspi:
self.set_setting('JSPI')
self.require_jspi()
else:
self.set_setting('ASYNCIFY')
return func(self, *args, **kwargs)

parameterize(metafunc, {'': (False,),
'jspi': (True,)})
return metafunc


def also_with_asyncify_and_jspi(func):
assert callable(func)

@wraps(func)
def metafunc(self, asyncify, *args, **kwargs):
if asyncify and self.get_setting('WASM_ESM_INTEGRATION'):
self.skipTest('WASM_ESM_INTEGRATION is not compatible with ASYNCIFY')
if asyncify == 2:
self.set_setting('JSPI')
self.require_jspi()
elif asyncify == 1:
self.set_setting('ASYNCIFY')
else:
assert asyncify == 0
return func(self, *args, **kwargs)

parameterize(metafunc, {'': (0,),
'asyncify': (1,),
'jspi': (2,)})
return metafunc


def parameterize(func, parameters):
"""Add additional parameterization to a test function.

Expand Down
6 changes: 3 additions & 3 deletions test/pthread/test_pthread_64bit_atomics.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ void RunTest(int test) {
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 4*1024);

emscripten_outf("Main thread has thread ID %p\n", pthread_self());
assert(pthread_self() != 0);

switch(test) {
case 2: memset(sharedData, 0xFF, sizeof(sharedData)); break;
case 5: memset(sharedData, 0x10, sizeof(sharedData)); break;
Expand Down Expand Up @@ -124,6 +121,9 @@ void RunTest(int test) {
}

int main() {
emscripten_outf("Main thread has thread ID %p\n", pthread_self());
assert(pthread_self() != 0);

globalDouble = 5.0;
globalU64 = 4;

Expand Down
2 changes: 2 additions & 0 deletions test/pthread/test_pthread_printf.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ void *ThreadMain(void *arg) {

int main() {
pthread_t thread;
printf("in main\n");
int rc = pthread_create(&thread, NULL, ThreadMain, 0);
printf("pthread_create done\n");
assert(rc == 0);

rc = pthread_join(thread, NULL);
Expand Down
11 changes: 10 additions & 1 deletion test/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from decorators import (
also_with_asan,
also_with_asyncify_and_jspi,
also_with_fetch_streaming,
also_with_minimal_runtime,
also_with_pthreads,
Expand All @@ -65,6 +66,7 @@
skip_if,
skip_if_simple,
with_all_sjlj,
with_asyncify_and_jspi,
)

from tools import ports, shared, utils
Expand Down Expand Up @@ -3663,6 +3665,10 @@ def test_pthread_pool_size_strict(self):
expected='abort:Assertion failed: thrd_create(&t4, thread_main, NULL) == thrd_success',
cflags=['-g2', '-pthread', '-sPTHREAD_POOL_SIZE=3', '-sPTHREAD_POOL_SIZE_STRICT=2'])

@with_asyncify_and_jspi
def test_pthread_asyncify(self):
self.btest_exit('pthread/test_pthread_printf.c', cflags=['-pthread'])

def test_pthread_in_pthread_pool_size_strict(self):
# Check that it fails when there's a pthread creating another pthread.
self.btest_exit('pthread/test_pthread_create_pthread.c', cflags=['-g2', '-pthread', '-sPTHREAD_POOL_SIZE=2', '-sPTHREAD_POOL_SIZE_STRICT=2'])
Expand All @@ -3678,8 +3684,11 @@ def test_pthread_atomics(self, args):
self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-sPTHREAD_POOL_SIZE=8', '-g1'] + args)

# Test 64-bit atomics.
@also_with_asyncify_and_jspi
def test_pthread_64bit_atomics(self):
self.btest_exit('pthread/test_pthread_64bit_atomics.c', cflags=['-O3', '-pthread', '-sPTHREAD_POOL_SIZE=8'])
if not self.get_setting('JSPI') and not self.get_setting('ASYNCIFY'):
self.set_setting('PTHREAD_POOL_SIZE', 8)
self.btest_exit('pthread/test_pthread_64bit_atomics.c', cflags=['-O3', '-pthread'])

# Test 64-bit C++11 atomics.
@also_with_pthreads
Expand Down
Loading
Loading