Skip to content

Commit 399f169

Browse files
committed
pipefs: Enable pipes to notify readiness
The poll method of PIPEFS receives a notification callback from the caller. PIPEFS notifies the caller when the fd becomes readable using that callback. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent a511ca1 commit 399f169

File tree

1 file changed

+27
-1
lines changed

1 file changed

+27
-1
lines changed

src/lib/libpipefs.js

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,23 @@ addToLibrary({
2121
// able to read from the read end after write end is closed.
2222
refcnt : 2,
2323
timestamp: new Date(),
24+
#if PTHREADS
25+
readableHandlers: [],
26+
registerReadableHandler: (callback) => {
27+
callback.registerCleanupFunc(() => {
28+
const i = pipe.readableHandlers.indexOf(callback);
29+
if (i !== -1) pipe.readableHandlers.splice(i, 1);
30+
});
31+
pipe.readableHandlers.push(callback);
32+
},
33+
notifyReadableHandlers: () => {
34+
while (pipe.readableHandlers.length > 0) {
35+
const cb = pipe.readableHandlers.shift();
36+
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
37+
}
38+
pipe.readableHandlers = [];
39+
}
40+
#endif
2441
};
2542

2643
pipe.buckets.push({
@@ -80,7 +97,7 @@ addToLibrary({
8097
blocks: 0,
8198
};
8299
},
83-
poll(stream) {
100+
poll(stream, timeout, notifyCallback) {
84101
var pipe = stream.node.pipe;
85102

86103
if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
@@ -92,6 +109,9 @@ addToLibrary({
92109
}
93110
}
94111

112+
#if PTHREADS
113+
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
114+
#endif
95115
return 0;
96116
},
97117
dup(stream) {
@@ -204,6 +224,9 @@ addToLibrary({
204224
if (freeBytesInCurrBuffer >= dataLen) {
205225
currBucket.buffer.set(data, currBucket.offset);
206226
currBucket.offset += dataLen;
227+
#if PTHREADS
228+
pipe.notifyReadableHandlers();
229+
#endif
207230
return dataLen;
208231
} else if (freeBytesInCurrBuffer > 0) {
209232
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
@@ -235,6 +258,9 @@ addToLibrary({
235258
newBucket.buffer.set(data);
236259
}
237260

261+
#if PTHREADS
262+
pipe.notifyReadableHandlers();
263+
#endif
238264
return dataLen;
239265
},
240266
close(stream) {

0 commit comments

Comments
 (0)