Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/lib/libpipefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ addToLibrary({
// able to read from the read end after write end is closed.
refcnt : 2,
timestamp: new Date(),
#if PTHREADS
readableHandlers: [],
registerReadableHandler: (callback) => {
callback.registerCleanupFunc(() => {
const i = pipe.readableHandlers.indexOf(callback);
if (i !== -1) pipe.readableHandlers.splice(i, 1);
});
pipe.readableHandlers.push(callback);
},
notifyReadableHandlers: () => {
while (pipe.readableHandlers.length > 0) {
const cb = pipe.readableHandlers.shift();
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
}
pipe.readableHandlers = [];
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all the new code here be behind if PTHREADS ? i.e. in single threaded builds non of this is possible, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved them to behind if PTHREADS.

#endif
};

pipe.buckets.push({
Expand Down Expand Up @@ -80,7 +97,7 @@ addToLibrary({
blocks: 0,
};
},
poll(stream) {
poll(stream, timeout, notifyCallback) {
var pipe = stream.node.pipe;

if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
Expand All @@ -92,6 +109,9 @@ addToLibrary({
}
}

#if PTHREADS
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
#endif
return 0;
},
dup(stream) {
Expand Down Expand Up @@ -204,6 +224,9 @@ addToLibrary({
if (freeBytesInCurrBuffer >= dataLen) {
currBucket.buffer.set(data, currBucket.offset);
currBucket.offset += dataLen;
#if PTHREADS
pipe.notifyReadableHandlers();
#endif
return dataLen;
} else if (freeBytesInCurrBuffer > 0) {
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
Expand Down Expand Up @@ -235,6 +258,9 @@ addToLibrary({
newBucket.buffer.set(data);
}

#if PTHREADS
pipe.notifyReadableHandlers();
#endif
return dataLen;
},
close(stream) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/libsigs.js
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ sigs = {
_mmap_js__sig: 'ipiiijpp',
_msync_js__sig: 'ippiiij',
_munmap_js__sig: 'ippiiij',
_newselect_js__sig: 'ippipppj',
_setitimer_js__sig: 'iid',
_timegm_js__sig: 'jp',
_tzset_js__sig: 'vpppp',
Expand Down
188 changes: 145 additions & 43 deletions src/lib/libsyscall.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,63 @@ var SyscallsLibrary = {
},
},

$parseSelectFDSet__internal: true,
$parseSelectFDSet: (readfds, writefds, exceptfds) => {
var total = 0;

var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0),
srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0);
var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0),
srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0);
var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0),
srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0);

var dstReadLow = 0,
dstReadHigh = 0;
var dstWriteLow = 0,
dstWriteHigh = 0;
var dstExceptLow = 0,
dstExceptHigh = 0;

var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val);

return {
allLow: srcReadLow | srcWriteLow | srcExceptLow,
allHigh: srcReadHigh | srcWriteHigh | srcExceptHigh,
getTotal: () => total,
setFlags: (fd, flags) => {
var mask = 1 << (fd % 32);

if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) {
fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) {
fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) {
fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask);
total++;
}
},
commit: () => {
if (readfds) {
{{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}};
{{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}};
}
if (writefds) {
{{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}};
{{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}};
}
if (exceptfds) {
{{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}};
{{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}};
}
}
};
},

$syscallGetVarargI__internal: true,
$syscallGetVarargI: () => {
#if ASSERTIONS
Expand Down Expand Up @@ -543,37 +600,88 @@ var SyscallsLibrary = {
return 0;
},
__syscall__newselect__i53abi: true,
__syscall__newselect__proxy: 'none',
__syscall__newselect__deps: ['_newselect_js',
#if PTHREADS
'_emscripten_proxy_newselect',
#endif
],
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
#if PTHREADS
if (ENVIRONMENT_IS_PTHREAD) {
return __emscripten_proxy_newselect(nfds,
{{{ to64('readfds') }}},
{{{ to64('writefds') }}},
{{{ to64('exceptfds') }}},
{{{ splitI64('timeoutInMillis') }}});
}
#endif
return __newselect_js({{{ to64('0') }}},
{{{ to64('0') }}},
nfds,
{{{ to64('readfds') }}},
{{{ to64('writefds') }}},
{{{ to64('exceptfds') }}},
{{{ splitI64('timeoutInMillis') }}});
},
_newselect_js__i53abi: true,
_newselect_js__proxy: 'none',
_newselect_js__deps: ['$parseSelectFDSet',
#if PTHREADS
'_emscripten_proxy_newselect_finish',
#endif
],
_newselect_js: (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
// readfds are supported,
// writefds checks socket open status
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
// and so the exceptfds list will always return empty.
// timeout is supported, although on SOCKFS and PIPEFS these are ignored and always treated as 0 - fully async
// timeout is supported, although on SOCKFS these are ignored and always treated as 0 - fully async
// and PIPEFS supports timeout only when the select is called from a worker.
#if ASSERTIONS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we asssert that we are not running on a thread here? i.e. doNewselect must be called on the main thread only right?

assert(nfds <= 64, 'nfds must be less than or equal to 64'); // fd sets have 64 bits // TODO: this could be 1024 based on current musl headers
#if PTHREADS
assert(!ENVIRONMENT_IS_PTHREAD, '_newselect_js must be called in the main thread');
#endif
#endif

var total = 0;

var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0),
srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0);
var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0),
srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0);
var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0),
srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0);

var dstReadLow = 0,
dstReadHigh = 0;
var dstWriteLow = 0,
dstWriteHigh = 0;
var dstExceptLow = 0,
dstExceptHigh = 0;
var fdSet = parseSelectFDSet(readfds, writefds, exceptfds);

var allLow = srcReadLow | srcWriteLow | srcExceptLow;
var allHigh = srcReadHigh | srcWriteHigh | srcExceptHigh;
var allLow = fdSet.allLow;
var allHigh = fdSet.allHigh;

var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val);

#if PTHREADS
var makeNotifyCallback = null;
if (ctx) {
// Enable event handlers only when the select call is proxied from a worker.
var cleanupFuncs = [];
var notifyDone = false;
makeNotifyCallback = (fd) => {
var cb = (flags) => {
if (notifyDone) {
return;
}
if (fd >= 0) {
fdSet.setFlags(fd, flags);
}
notifyDone = true;
cleanupFuncs.forEach(cb => cb());
fdSet.commit();
__emscripten_proxy_newselect_finish({{{ to64('ctx') }}}, {{{ to64('arg') }}}, fdSet.getTotal());
}
cb.registerCleanupFunc = (f) => {
if (f != null) cleanupFuncs.push(f);
}
return cb;
}
if (timeoutInMillis > 0) {
setTimeout(() => makeNotifyCallback(-1)(0), timeoutInMillis);
}
}
#endif

for (var fd = 0; fd < nfds; fd++) {
var mask = 1 << (fd % 32);
if (!(check(fd, allLow, allHigh, mask))) {
Expand All @@ -585,41 +693,35 @@ var SyscallsLibrary = {
var flags = SYSCALLS.DEFAULT_POLLMASK;

if (stream.stream_ops.poll) {
flags = stream.stream_ops.poll(stream, timeoutInMillis);
flags = (() => {
#if PTHREADS
if (makeNotifyCallback != null) {
return stream.stream_ops.poll(stream, timeoutInMillis, timeoutInMillis != 0 ? makeNotifyCallback(fd) : null);
}
#endif
return stream.stream_ops.poll(stream, timeoutInMillis);
})();
} else {
#if ASSERTIONS
if (timeoutInMillis != 0) warnOnce('non-zero select() timeout not supported: ' + timeoutInMillis)
#endif
}

if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) {
fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) {
fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask);
total++;
}
if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) {
fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask);
total++;
}
fdSet.setFlags(fd, flags);
}

if (readfds) {
{{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}};
{{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}};
}
if (writefds) {
{{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}};
{{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}};
}
if (exceptfds) {
{{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}};
{{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}};
#if PTHREADS
if (makeNotifyCallback != null) {
if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) {
makeNotifyCallback(-1)(0);
}
return 0;
}
#endif

fdSet.commit();

return total;
return fdSet.getTotal();
},
_msync_js__i53abi: true,
_msync_js: (addr, len, prot, flags, fd, offset) => {
Expand Down
2 changes: 2 additions & 0 deletions system/lib/libc/emscripten_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ EmscriptenDeviceOrientationEvent* _emscripten_get_last_deviceorientation_event()
EmscriptenDeviceMotionEvent* _emscripten_get_last_devicemotion_event();
EmscriptenMouseEvent* _emscripten_get_last_mouse_event();

int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout);

#ifdef __cplusplus
}
#endif
43 changes: 43 additions & 0 deletions system/lib/libc/proxying_select.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2025 The Emscripten Authors. All rights reserved.
* Emscripten is available under two separate licenses, the MIT license and the
* University of Illinois/NCSA Open Source License. Both these licenses can be
* found in the LICENSE file.
*/

#include <assert.h>
#include <emscripten/proxying.h>
#include <emscripten/threading.h>

#include "emscripten_internal.h"

typedef struct proxied_select_t {
int n;
void *rfds;
void *wfds;
void *efds;
int64_t timeout;
int result;
} proxied_select_t;

static void call_newselect(em_proxying_ctx* ctx, void* arg) {
proxied_select_t* t = arg;
_newselect_js(ctx, arg, t->n, t->rfds, t->wfds, t->efds, t->timeout);
}

void _emscripten_proxy_newselect_finish(em_proxying_ctx* ctx, void* arg, int ret) {
proxied_select_t* t = arg;
t->result = ret;
emscripten_proxy_finish(ctx);
}

int _emscripten_proxy_newselect(int n, void *rfds, void *wfds, void *efds, int64_t timeout) {
em_proxying_queue* q = emscripten_proxy_get_system_queue();
pthread_t target = emscripten_main_runtime_thread_id();
proxied_select_t t = {.n = n, .rfds = rfds, .wfds = wfds, .efds = efds, .timeout = timeout};
if (!emscripten_proxy_sync_with_ctx(q, target, call_newselect, &t)) {
assert(false && "emscripten_proxy_sync failed");
return -1;
}
return t.result;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems like the whole point of this custom proxying is to allow the main thread to delay the calling of _emscripten_proxy_newselect_finish until later? i.e. after doNewselect returns.

I'm happy to see this change land but I wonder if we could do this without that need for custom proxying? I.e. I wonder if we could return a promise from doNewselect and have the automatic proxying system just work in that case? Allowing all async proxyied functions to work.

4 changes: 4 additions & 0 deletions system/lib/standalone/standalone.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ weak int _munmap_js(
return -ENOSYS;
}

weak int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout) {
return -ENOSYS;
}

// open(), etc. - we just support the standard streams, with no
// corner case error checking; everything else is not permitted.
// TODO: full file support for WASI, or an option for it
Expand Down
4 changes: 4 additions & 0 deletions system/lib/wasmfs/syscalls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1848,4 +1848,8 @@ int __syscall__newselect(int nfds,
return count;
}

int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout) {
return -ENOSYS;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this in addition to the dummy __syscall__newselect above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Otherwise we get the following test failure in other.test_closure_full_js_library_wasmfs:

error: undefined symbol: _newselect_js (referenced by root reference (e.g. compiled C/C++ code))


} // extern "C"
Loading