Skip to content

Commit 8694832

Browse files
authored
Fix select didn't work with pipes and the timeout argument (#25523)
This PR fixes an issue that the timeout argument for the select syscall wasn't applied to pipes and it returned immedeately instead of waiting for events. Although the select syscall passes the timeout value to the `poll` method of the stream implementations, this can't handle multiple fds because a fd can't notify readiness while another is blocking in `poll`. As a result, using the select syscall with a combination of pipes and other streams (e.g. PTY) can be problematic. To address this, this PR implements a callback-based event notification. Each stream implementation's `poll` invocation receives a callback, allowing it asynchronously notify the select syscall when it becomes ready. The select syscall blocks until one of these callbacks is triggered or the timeout expires. This behviour is enabled only when PROXY_TO_PTHREAD is enabled to avoid blocking the main worker. To maintain compatibility with non-pipefs streams, the select syscall allows stream implementations to ignore the callback and synchronously return the event status instead. In such cases, the select syscall still updates the flags accordingly.
1 parent 1cd1c2e commit 8694832

File tree

15 files changed

+445
-49
lines changed

15 files changed

+445
-49
lines changed

src/lib/libpipefs.js

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,23 @@ addToLibrary({
2121
// able to read from the read end after write end is closed.
2222
refcnt : 2,
2323
timestamp: new Date(),
24+
#if PTHREADS
25+
readableHandlers: [],
26+
registerReadableHandler: (callback) => {
27+
callback.registerCleanupFunc(() => {
28+
const i = pipe.readableHandlers.indexOf(callback);
29+
if (i !== -1) pipe.readableHandlers.splice(i, 1);
30+
});
31+
pipe.readableHandlers.push(callback);
32+
},
33+
notifyReadableHandlers: () => {
34+
while (pipe.readableHandlers.length > 0) {
35+
const cb = pipe.readableHandlers.shift();
36+
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
37+
}
38+
pipe.readableHandlers = [];
39+
}
40+
#endif
2441
};
2542

2643
pipe.buckets.push({
@@ -80,7 +97,7 @@ addToLibrary({
8097
blocks: 0,
8198
};
8299
},
83-
poll(stream) {
100+
poll(stream, timeout, notifyCallback) {
84101
var pipe = stream.node.pipe;
85102

86103
if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
@@ -92,6 +109,9 @@ addToLibrary({
92109
}
93110
}
94111

112+
#if PTHREADS
113+
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
114+
#endif
95115
return 0;
96116
},
97117
dup(stream) {
@@ -204,6 +224,9 @@ addToLibrary({
204224
if (freeBytesInCurrBuffer >= dataLen) {
205225
currBucket.buffer.set(data, currBucket.offset);
206226
currBucket.offset += dataLen;
227+
#if PTHREADS
228+
pipe.notifyReadableHandlers();
229+
#endif
207230
return dataLen;
208231
} else if (freeBytesInCurrBuffer > 0) {
209232
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
@@ -235,6 +258,9 @@ addToLibrary({
235258
newBucket.buffer.set(data);
236259
}
237260

261+
#if PTHREADS
262+
pipe.notifyReadableHandlers();
263+
#endif
238264
return dataLen;
239265
},
240266
close(stream) {

src/lib/libsigs.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ sigs = {
381381
_mmap_js__sig: 'ipiiijpp',
382382
_msync_js__sig: 'ippiiij',
383383
_munmap_js__sig: 'ippiiij',
384+
_newselect_js__sig: 'ippipppj',
384385
_setitimer_js__sig: 'iid',
385386
_timegm_js__sig: 'jp',
386387
_tzset_js__sig: 'vpppp',

src/lib/libsyscall.js

Lines changed: 145 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,63 @@ var SyscallsLibrary = {
104104
},
105105
},
106106

107+
$parseSelectFDSet__internal: true,
108+
$parseSelectFDSet: (readfds, writefds, exceptfds) => {
109+
var total = 0;
110+
111+
var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0),
112+
srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0);
113+
var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0),
114+
srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0);
115+
var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0),
116+
srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0);
117+
118+
var dstReadLow = 0,
119+
dstReadHigh = 0;
120+
var dstWriteLow = 0,
121+
dstWriteHigh = 0;
122+
var dstExceptLow = 0,
123+
dstExceptHigh = 0;
124+
125+
var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val);
126+
127+
return {
128+
allLow: srcReadLow | srcWriteLow | srcExceptLow,
129+
allHigh: srcReadHigh | srcWriteHigh | srcExceptHigh,
130+
getTotal: () => total,
131+
setFlags: (fd, flags) => {
132+
var mask = 1 << (fd % 32);
133+
134+
if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) {
135+
fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask);
136+
total++;
137+
}
138+
if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) {
139+
fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask);
140+
total++;
141+
}
142+
if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) {
143+
fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask);
144+
total++;
145+
}
146+
},
147+
commit: () => {
148+
if (readfds) {
149+
{{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}};
150+
{{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}};
151+
}
152+
if (writefds) {
153+
{{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}};
154+
{{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}};
155+
}
156+
if (exceptfds) {
157+
{{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}};
158+
{{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}};
159+
}
160+
}
161+
};
162+
},
163+
107164
$syscallGetVarargI__internal: true,
108165
$syscallGetVarargI: () => {
109166
#if ASSERTIONS
@@ -543,37 +600,88 @@ var SyscallsLibrary = {
543600
return 0;
544601
},
545602
__syscall__newselect__i53abi: true,
603+
__syscall__newselect__proxy: 'none',
604+
__syscall__newselect__deps: ['_newselect_js',
605+
#if PTHREADS
606+
'_emscripten_proxy_newselect',
607+
#endif
608+
],
546609
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
610+
#if PTHREADS
611+
if (ENVIRONMENT_IS_PTHREAD) {
612+
return __emscripten_proxy_newselect(nfds,
613+
{{{ to64('readfds') }}},
614+
{{{ to64('writefds') }}},
615+
{{{ to64('exceptfds') }}},
616+
{{{ splitI64('timeoutInMillis') }}});
617+
}
618+
#endif
619+
return __newselect_js({{{ to64('0') }}},
620+
{{{ to64('0') }}},
621+
nfds,
622+
{{{ to64('readfds') }}},
623+
{{{ to64('writefds') }}},
624+
{{{ to64('exceptfds') }}},
625+
{{{ splitI64('timeoutInMillis') }}});
626+
},
627+
_newselect_js__i53abi: true,
628+
_newselect_js__proxy: 'none',
629+
_newselect_js__deps: ['$parseSelectFDSet',
630+
#if PTHREADS
631+
'_emscripten_proxy_newselect_finish',
632+
#endif
633+
],
634+
_newselect_js: (ctx, arg, nfds, readfds, writefds, exceptfds, timeoutInMillis) => {
547635
// readfds are supported,
548636
// writefds checks socket open status
549637
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
550638
// and so the exceptfds list will always return empty.
551-
// timeout is supported, although on SOCKFS and PIPEFS these are ignored and always treated as 0 - fully async
639+
// timeout is supported, although on SOCKFS these are ignored and always treated as 0 - fully async
640+
// and PIPEFS supports timeout only when the select is called from a worker.
552641
#if ASSERTIONS
553642
assert(nfds <= 64, 'nfds must be less than or equal to 64'); // fd sets have 64 bits // TODO: this could be 1024 based on current musl headers
643+
#if PTHREADS
644+
assert(!ENVIRONMENT_IS_PTHREAD, '_newselect_js must be called in the main thread');
645+
#endif
554646
#endif
555647

556-
var total = 0;
557-
558-
var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0),
559-
srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0);
560-
var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0),
561-
srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0);
562-
var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0),
563-
srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0);
564-
565-
var dstReadLow = 0,
566-
dstReadHigh = 0;
567-
var dstWriteLow = 0,
568-
dstWriteHigh = 0;
569-
var dstExceptLow = 0,
570-
dstExceptHigh = 0;
648+
var fdSet = parseSelectFDSet(readfds, writefds, exceptfds);
571649

572-
var allLow = srcReadLow | srcWriteLow | srcExceptLow;
573-
var allHigh = srcReadHigh | srcWriteHigh | srcExceptHigh;
650+
var allLow = fdSet.allLow;
651+
var allHigh = fdSet.allHigh;
574652

575653
var check = (fd, low, high, val) => fd < 32 ? (low & val) : (high & val);
576654

655+
#if PTHREADS
656+
var makeNotifyCallback = null;
657+
if (ctx) {
658+
// Enable event handlers only when the select call is proxied from a worker.
659+
var cleanupFuncs = [];
660+
var notifyDone = false;
661+
makeNotifyCallback = (fd) => {
662+
var cb = (flags) => {
663+
if (notifyDone) {
664+
return;
665+
}
666+
if (fd >= 0) {
667+
fdSet.setFlags(fd, flags);
668+
}
669+
notifyDone = true;
670+
cleanupFuncs.forEach(cb => cb());
671+
fdSet.commit();
672+
__emscripten_proxy_newselect_finish({{{ to64('ctx') }}}, {{{ to64('arg') }}}, fdSet.getTotal());
673+
}
674+
cb.registerCleanupFunc = (f) => {
675+
if (f != null) cleanupFuncs.push(f);
676+
}
677+
return cb;
678+
}
679+
if (timeoutInMillis > 0) {
680+
setTimeout(() => makeNotifyCallback(-1)(0), timeoutInMillis);
681+
}
682+
}
683+
#endif
684+
577685
for (var fd = 0; fd < nfds; fd++) {
578686
var mask = 1 << (fd % 32);
579687
if (!(check(fd, allLow, allHigh, mask))) {
@@ -585,41 +693,35 @@ var SyscallsLibrary = {
585693
var flags = SYSCALLS.DEFAULT_POLLMASK;
586694

587695
if (stream.stream_ops.poll) {
588-
flags = stream.stream_ops.poll(stream, timeoutInMillis);
696+
flags = (() => {
697+
#if PTHREADS
698+
if (makeNotifyCallback != null) {
699+
return stream.stream_ops.poll(stream, timeoutInMillis, timeoutInMillis != 0 ? makeNotifyCallback(fd) : null);
700+
}
701+
#endif
702+
return stream.stream_ops.poll(stream, timeoutInMillis);
703+
})();
589704
} else {
590705
#if ASSERTIONS
591706
if (timeoutInMillis != 0) warnOnce('non-zero select() timeout not supported: ' + timeoutInMillis)
592707
#endif
593708
}
594709

595-
if ((flags & {{{ cDefs.POLLIN }}}) && check(fd, srcReadLow, srcReadHigh, mask)) {
596-
fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask);
597-
total++;
598-
}
599-
if ((flags & {{{ cDefs.POLLOUT }}}) && check(fd, srcWriteLow, srcWriteHigh, mask)) {
600-
fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask);
601-
total++;
602-
}
603-
if ((flags & {{{ cDefs.POLLPRI }}}) && check(fd, srcExceptLow, srcExceptHigh, mask)) {
604-
fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask);
605-
total++;
606-
}
710+
fdSet.setFlags(fd, flags);
607711
}
608712

609-
if (readfds) {
610-
{{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}};
611-
{{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}};
612-
}
613-
if (writefds) {
614-
{{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}};
615-
{{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}};
616-
}
617-
if (exceptfds) {
618-
{{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}};
619-
{{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}};
713+
#if PTHREADS
714+
if (makeNotifyCallback != null) {
715+
if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) {
716+
makeNotifyCallback(-1)(0);
717+
}
718+
return 0;
620719
}
720+
#endif
721+
722+
fdSet.commit();
621723

622-
return total;
724+
return fdSet.getTotal();
623725
},
624726
_msync_js__i53abi: true,
625727
_msync_js: (addr, len, prot, flags, fd, offset) => {

system/lib/libc/emscripten_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ EmscriptenDeviceOrientationEvent* _emscripten_get_last_deviceorientation_event()
151151
EmscriptenDeviceMotionEvent* _emscripten_get_last_devicemotion_event();
152152
EmscriptenMouseEvent* _emscripten_get_last_mouse_event();
153153

154+
int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout);
155+
154156
#ifdef __cplusplus
155157
}
156158
#endif

system/lib/libc/proxying_select.c

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2025 The Emscripten Authors. All rights reserved.
3+
* Emscripten is available under two separate licenses, the MIT license and the
4+
* University of Illinois/NCSA Open Source License. Both these licenses can be
5+
* found in the LICENSE file.
6+
*/
7+
8+
#include <assert.h>
9+
#include <emscripten/proxying.h>
10+
#include <emscripten/threading.h>
11+
12+
#include "emscripten_internal.h"
13+
14+
typedef struct proxied_select_t {
15+
int n;
16+
void *rfds;
17+
void *wfds;
18+
void *efds;
19+
int64_t timeout;
20+
int result;
21+
} proxied_select_t;
22+
23+
static void call_newselect(em_proxying_ctx* ctx, void* arg) {
24+
proxied_select_t* t = arg;
25+
_newselect_js(ctx, arg, t->n, t->rfds, t->wfds, t->efds, t->timeout);
26+
}
27+
28+
void _emscripten_proxy_newselect_finish(em_proxying_ctx* ctx, void* arg, int ret) {
29+
proxied_select_t* t = arg;
30+
t->result = ret;
31+
emscripten_proxy_finish(ctx);
32+
}
33+
34+
int _emscripten_proxy_newselect(int n, void *rfds, void *wfds, void *efds, int64_t timeout) {
35+
em_proxying_queue* q = emscripten_proxy_get_system_queue();
36+
pthread_t target = emscripten_main_runtime_thread_id();
37+
proxied_select_t t = {.n = n, .rfds = rfds, .wfds = wfds, .efds = efds, .timeout = timeout};
38+
if (!emscripten_proxy_sync_with_ctx(q, target, call_newselect, &t)) {
39+
assert(false && "emscripten_proxy_sync failed");
40+
return -1;
41+
}
42+
return t.result;
43+
}

system/lib/standalone/standalone.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ weak int _munmap_js(
6666
return -ENOSYS;
6767
}
6868

69+
weak int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout) {
70+
return -ENOSYS;
71+
}
72+
6973
// open(), etc. - we just support the standard streams, with no
7074
// corner case error checking; everything else is not permitted.
7175
// TODO: full file support for WASI, or an option for it

system/lib/wasmfs/syscalls.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,4 +1848,8 @@ int __syscall__newselect(int nfds,
18481848
return count;
18491849
}
18501850

1851+
int _newselect_js(void* ctx, void* arg, int n, void *rfds, void *wfds, void *efds, int64_t timeout) {
1852+
return -ENOSYS;
1853+
}
1854+
18511855
} // extern "C"

0 commit comments

Comments
 (0)