Skip to content

Commit 2d731ae

Browse files
committed
pipefs: Enable pipes to notify readiness
The poll method of PIPEFS receives a notification callback from the caller. PIPEFS notifies the caller when the fd becomes readable using that callback. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent ffd1bbe commit 2d731ae

File tree

1 file changed

+19
-1
lines changed

1 file changed

+19
-1
lines changed

src/lib/libpipefs.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@ addToLibrary({
2121
// able to read from the read end after write end is closed.
2222
refcnt : 2,
2323
timestamp: new Date(),
24+
readableHandlers: [],
25+
registerReadableHanlder: (callback) => {
26+
callback.registerCleanupFunc(() => {
27+
const i = pipe.readableHandlers.indexOf(callback);
28+
if (i !== -1) pipe.readableHandlers.splice(i, 1);
29+
});
30+
pipe.readableHandlers.push(callback);
31+
},
32+
notifyReadableHanders: () => {
33+
while (pipe.readableHandlers.length > 0) {
34+
const cb = pipe.readableHandlers.shift();
35+
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
36+
}
37+
pipe.readableHandlers = [];
38+
}
2439
};
2540

2641
pipe.buckets.push({
@@ -80,7 +95,7 @@ addToLibrary({
8095
blocks: 0,
8196
};
8297
},
83-
poll(stream) {
98+
poll(stream, timeout, notifyCallback) {
8499
var pipe = stream.node.pipe;
85100

86101
if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
@@ -92,6 +107,7 @@ addToLibrary({
92107
}
93108
}
94109

110+
if (notifyCallback) pipe.registerReadableHanlder(notifyCallback);
95111
return 0;
96112
},
97113
dup(stream) {
@@ -204,6 +220,7 @@ addToLibrary({
204220
if (freeBytesInCurrBuffer >= dataLen) {
205221
currBucket.buffer.set(data, currBucket.offset);
206222
currBucket.offset += dataLen;
223+
pipe.notifyReadableHanders();
207224
return dataLen;
208225
} else if (freeBytesInCurrBuffer > 0) {
209226
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
@@ -235,6 +252,7 @@ addToLibrary({
235252
newBucket.buffer.set(data);
236253
}
237254

255+
pipe.notifyReadableHanders();
238256
return dataLen;
239257
},
240258
close(stream) {

0 commit comments

Comments
 (0)