-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Fix select didn't work with pipes and the timeout argument
#25523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1bb9651
7539a75
9f40870
6d79715
8e87186
60ef195
2ed14b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,23 @@ addToLibrary({ | |
| // able to read from the read end after write end is closed. | ||
| refcnt : 2, | ||
| timestamp: new Date(), | ||
| #if PTHREADS | ||
| readableHandlers: [], | ||
| registerReadableHandler: (callback) => { | ||
| callback.registerCleanupFunc(() => { | ||
| const i = pipe.readableHandlers.indexOf(callback); | ||
| if (i !== -1) pipe.readableHandlers.splice(i, 1); | ||
| }); | ||
| pipe.readableHandlers.push(callback); | ||
| }, | ||
| notifyReadableHandlers: () => { | ||
| while (pipe.readableHandlers.length > 0) { | ||
| const cb = pipe.readableHandlers.shift(); | ||
| if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); | ||
| } | ||
| pipe.readableHandlers = []; | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can all the new code here be behind
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved them to behind |
||
| #endif | ||
| }; | ||
|
|
||
| pipe.buckets.push({ | ||
|
|
@@ -80,7 +97,7 @@ addToLibrary({ | |
| blocks: 0, | ||
| }; | ||
| }, | ||
| poll(stream) { | ||
| poll(stream, timeout, notifyCallback) { | ||
| var pipe = stream.node.pipe; | ||
|
|
||
| if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) { | ||
|
|
@@ -92,6 +109,9 @@ addToLibrary({ | |
| } | ||
| } | ||
|
|
||
| #if PTHREADS | ||
| if (notifyCallback) pipe.registerReadableHandler(notifyCallback); | ||
| #endif | ||
| return 0; | ||
| }, | ||
| dup(stream) { | ||
|
|
@@ -204,6 +224,9 @@ addToLibrary({ | |
| if (freeBytesInCurrBuffer >= dataLen) { | ||
| currBucket.buffer.set(data, currBucket.offset); | ||
| currBucket.offset += dataLen; | ||
| #if PTHREADS | ||
| pipe.notifyReadableHandlers(); | ||
| #endif | ||
| return dataLen; | ||
| } else if (freeBytesInCurrBuffer > 0) { | ||
| currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset); | ||
|
|
@@ -235,6 +258,9 @@ addToLibrary({ | |
| newBucket.buffer.set(data); | ||
| } | ||
|
|
||
| #if PTHREADS | ||
| pipe.notifyReadableHandlers(); | ||
| #endif | ||
| return dataLen; | ||
| }, | ||
| close(stream) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,6 +104,63 @@ var SyscallsLibrary = { | |
| }, | ||
| }, | ||
|
|
||
| $parseSelectFDSet__internal: true, | ||
| $parseSelectFDSet: (readfds, writefds, exceptfds) => { | ||
| var total = 0; | ||
|
|
||
| var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0), | ||
| srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0); | ||
| var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0), | ||
| srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0); | ||
| var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0), | ||
| srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0); | ||
|
|
||
| var dstReadLow = 0, | ||
| dstReadHigh = 0; | ||
| var dstWriteLow = 0, | ||
| dstWriteHigh = 0; | ||
| var dstExceptLow = 0, | ||
| dstExceptHigh = 0; | ||
|
|
||
| var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val); | ||
|
|
||
| return { | ||
| allLow: srcReadLow | srcWriteLow | srcExceptLow, | ||
| allHigh: srcReadHigh | srcWriteHigh | srcExceptHigh, | ||
| getTotal: () => total, | ||
| setFlags: (fd, flags) => { | ||
| var mask = 1 << (fd % 32); | ||
|
|
||
| if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) { | ||
| fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask); | ||
| total++; | ||
| } | ||
| if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) { | ||
| fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask); | ||
| total++; | ||
| } | ||
| if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) { | ||
| fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask); | ||
| total++; | ||
| } | ||
| }, | ||
| commit: () => { | ||
| if (readfds) { | ||
| {{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}}; | ||
| {{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}}; | ||
| } | ||
| if (writefds) { | ||
| {{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}}; | ||
| {{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}}; | ||
| } | ||
| if (exceptfds) { | ||
| {{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}}; | ||
| {{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}}; | ||
| } | ||
| } | ||
| }; | ||
| }, | ||
|
|
||
| $syscallGetVarargI__internal: true, | ||
| $syscallGetVarargI: () => { | ||
| #if ASSERTIONS | ||
|
|
@@ -543,37 +600,88 @@ var SyscallsLibrary = { | |
| return 0; | ||
| }, | ||
| __syscall__newselect__i53abi: true, | ||
| __syscall__newselect__proxy: 'none', | ||
| __syscall__newselect__deps: ['_newselect_js', | ||
| #if PTHREADS | ||
| '_emscripten_proxy_newselect', | ||
| #endif | ||
| ], | ||
| __syscall__newselect: (nfds, readfds, writefds, exceptfds, timeoutInMillis) => { | ||
| #if PTHREADS | ||
| if (ENVIRONMENT_IS_PTHREAD) { | ||
| return __emscripten_proxy_newselect(nfds, | ||
| {{{ to64('readfds') }}}, | ||
| {{{ to64('writefds') }}}, | ||
| {{{ to64('exceptfds') }}}, | ||
| {{{ splitI64('timeoutInMillis') }}}); | ||
| } | ||
| #endif | ||
| return __newselect_js({{{ to64('0') }}}, | ||
| {{{ to64('0') }}}, | ||
| nfds, | ||
| {{{ to64('readfds') }}}, | ||
| {{{ to64('writefds') }}}, | ||
| {{{ to64('exceptfds') }}}, | ||
| {{{ splitI64('timeoutInMillis') }}}); | ||
| }, | ||
| _newselect_js__i53abi: true, | ||
| _newselect_js__proxy: 'none', | ||
| _newselect_js__deps: ['$parseSelectFDSet', | ||
| #if PTHREADS | ||
| '_emscripten_proxy_newselect_finish', | ||
| #endif | ||
| ], | ||
| _newselect_js: (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => { | ||
| // readfds are supported, | ||
| // writefds checks socket open status | ||
| // exceptfds are supported, although on web, such exceptional conditions never arise in web sockets | ||
| // and so the exceptfds list will always return empty. | ||
| // timeout is supported, although on SOCKFS and PIPEFS these are ignored and always treated as 0 - fully async | ||
| // timeout is supported, although on SOCKFS these are ignored and always treated as 0 - fully async | ||
| // and PIPEFS supports timeout only when the select is called from a worker. | ||
| #if ASSERTIONS | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we asssert that we are not running on a thread here? i.e. |
||
| assert(nfds <= 64, 'nfds must be less than or equal to 64'); // fd sets have 64 bits // TODO: this could be 1024 based on current musl headers | ||
| #if PTHREADS | ||
| assert(!ENVIRONMENT_IS_PTHREAD, '_newselect_js must be called in the main thread'); | ||
| #endif | ||
| #endif | ||
|
|
||
| var total = 0; | ||
|
|
||
| var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0), | ||
| srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0); | ||
| var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0), | ||
| srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0); | ||
| var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0), | ||
| srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0); | ||
|
|
||
| var dstReadLow = 0, | ||
| dstReadHigh = 0; | ||
| var dstWriteLow = 0, | ||
| dstWriteHigh = 0; | ||
| var dstExceptLow = 0, | ||
| dstExceptHigh = 0; | ||
| var fdSet = parseSelectFDSet(readfds, writefds, exceptfds); | ||
|
|
||
| var allLow = srcReadLow | srcWriteLow | srcExceptLow; | ||
| var allHigh = srcReadHigh | srcWriteHigh | srcExceptHigh; | ||
| var allLow = fdSet.allLow; | ||
| var allHigh = fdSet.allHigh; | ||
|
|
||
| var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val); | ||
|
|
||
| #if PTHREADS | ||
| var makeNotifyCallback = null; | ||
| if (ctx) { | ||
| // Enable event handlers only when the select call is proxied from a worker. | ||
| var cleanupFuncs = []; | ||
| var notifyDone = false; | ||
| makeNotifyCallback = (fd) => { | ||
| var cb = (flags) => { | ||
| if (notifyDone) { | ||
| return; | ||
| } | ||
| if (fd >= 0) { | ||
| fdSet.setFlags(fd, flags); | ||
| } | ||
| notifyDone = true; | ||
| cleanupFuncs.forEach(cb => cb()); | ||
| fdSet.commit(); | ||
| __emscripten_proxy_newselect_finish({{{ to64('ctx') }}}, {{{ to64('arg') }}}, fdSet.getTotal()); | ||
| } | ||
| cb.registerCleanupFunc = (f) => { | ||
| if (f != null) cleanupFuncs.push(f); | ||
| } | ||
| return cb; | ||
| } | ||
| if (timeoutInMillis > 0) { | ||
| setTimeout(() => makeNotifyCallback(-1)(0), timeoutInMillis); | ||
| } | ||
| } | ||
| #endif | ||
sbc100 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| for (var fd = 0; fd < nfds; fd++) { | ||
| var mask = 1 << (fd % 32); | ||
| if (!(check(fd, allLow, allHigh, mask))) { | ||
|
|
@@ -585,41 +693,35 @@ var SyscallsLibrary = { | |
| var flags = SYSCALLS.DEFAULT_POLLMASK; | ||
|
|
||
| if (stream.stream_ops.poll) { | ||
| flags = stream.stream_ops.poll(stream, timeoutInMillis); | ||
| flags = (() => { | ||
| #if PTHREADS | ||
| if (makeNotifyCallback != null) { | ||
| return stream.stream_ops.poll(stream, timeoutInMillis, timeoutInMillis != 0 ? makeNotifyCallback(fd) : null); | ||
| } | ||
| #endif | ||
| return stream.stream_ops.poll(stream, timeoutInMillis); | ||
| })(); | ||
| } else { | ||
| #if ASSERTIONS | ||
| if (timeoutInMillis != 0) warnOnce('non-zero select() timeout not supported: ' + timeoutInMillis) | ||
| #endif | ||
| } | ||
|
|
||
| if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) { | ||
| fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask); | ||
| total++; | ||
| } | ||
| if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) { | ||
| fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask); | ||
| total++; | ||
| } | ||
| if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) { | ||
| fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask); | ||
| total++; | ||
| } | ||
| fdSet.setFlags(fd, flags); | ||
| } | ||
|
|
||
| if (readfds) { | ||
| {{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}}; | ||
| {{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}}; | ||
| } | ||
| if (writefds) { | ||
| {{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}}; | ||
| {{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}}; | ||
| } | ||
| if (exceptfds) { | ||
| {{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}}; | ||
| {{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}}; | ||
| #if PTHREADS | ||
| if (makeNotifyCallback != null) { | ||
| if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) { | ||
| makeNotifyCallback(-1)(0); | ||
| } | ||
| return 0; | ||
| } | ||
| #endif | ||
|
|
||
| fdSet.commit(); | ||
|
|
||
| return total; | ||
| return fdSet.getTotal(); | ||
| }, | ||
| _msync_js__i53abi: true, | ||
| _msync_js: (addr, len, prot, flags, fd, offset) => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Copyright 2025 The Emscripten Authors. All rights reserved. | ||
| * Emscripten is available under two separate licenses, the MIT license and the | ||
| * University of Illinois/NCSA Open Source License. Both these licenses can be | ||
| * found in the LICENSE file. | ||
| */ | ||
|
|
||
| #include <assert.h> | ||
| #include <emscripten/proxying.h> | ||
| #include <emscripten/threading.h> | ||
|
|
||
| #include "emscripten_internal.h" | ||
|
|
||
| typedef struct proxied_select_t { | ||
| int n; | ||
| void *rfds; | ||
| void *wfds; | ||
| void *efds; | ||
| int64_t timeout; | ||
| int result; | ||
| } proxied_select_t; | ||
|
|
||
| static void call_newselect(em_proxying_ctx* ctx, void* arg) { | ||
| proxied_select_t* t = arg; | ||
| _newselect_js(ctx, arg, t->n, t->rfds, t->wfds, t->efds, t->timeout); | ||
| } | ||
|
|
||
| void _emscripten_proxy_newselect_finish(em_proxying_ctx* ctx, void* arg, int ret) { | ||
| proxied_select_t* t = arg; | ||
| t->result = ret; | ||
| emscripten_proxy_finish(ctx); | ||
| } | ||
|
|
||
| int _emscripten_proxy_newselect(int n, void *rfds, void *wfds, void *efds, int64_t timeout) { | ||
| em_proxying_queue* q = emscripten_proxy_get_system_queue(); | ||
| pthread_t target = emscripten_main_runtime_thread_id(); | ||
| proxied_select_t t = {.n = n, .rfds = rfds, .wfds = wfds, .efds = efds, .timeout = timeout}; | ||
| if (!emscripten_proxy_sync_with_ctx(q, target, call_newselect, &t)) { | ||
| assert(false && "emscripten_proxy_sync failed"); | ||
| return -1; | ||
| } | ||
| return t.result; | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it seems like the whole point of this custom proxying is to allow the main thread to delay the calling of I'm happy to see this change land but I wonder if we could do this without that need for custom proxying? I.e. I wonder if we could return a promise from |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1848,4 +1848,8 @@ int __syscall__newselect(int nfds, | |
| return count; | ||
| } | ||
|
|
||
| int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout) { | ||
| return -ENOSYS; | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this in addition to the dummy __syscall__newselect above?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Otherwise we get the following test failure in
|
||
|
|
||
| } // extern "C" | ||
Uh oh!
There was an error while loading. Please reload this page.