Skip to content

Commit 0b741f4

Browse files
committed
child_process: watch child_process stdin pipe peer close event
Fixes: #25131
1 parent 052aec7 commit 0b741f4

File tree

6 files changed

+164
-9
lines changed

6 files changed

+164
-9
lines changed

doc/api/child_process.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,14 +1066,14 @@ pipes between the parent and child. The value is one of the following:
10661066
corresponds to the index in the `stdio` array. The stream must have an
10671067
underlying descriptor (file streams do not start until the `'open'` event has
10681068
occurred).
1069-
**NOTE:** While it is technically possible to pass `stdin` as a writable or
1070-
`stdout`/`stderr` as readable, it is not recommended.
1069+
**NOTE:** While it is technically possible to pass `stdin` as a readable or
1070+
`stdout`/`stderr` as writable, it is not recommended.
10711071
Readable and writable streams are designed with distinct behaviors, and using
10721072
them incorrectly (e.g., passing a readable stream where a writable stream is
10731073
expected) can lead to unexpected results or errors. This practice is discouraged
10741074
as it may result in undefined behavior or dropped callbacks if the stream
1075-
encounters errors. Always ensure that `stdin` is used as readable and
1076-
`stdout`/`stderr` as writable to maintain the intended flow of data between
1075+
encounters errors. Always ensure that `stdin` is used as writable and
1076+
`stdout`/`stderr` as readable to maintain the intended flow of data between
10771077
the parent and child processes.
10781078
7. Positive integer: The integer value is interpreted as a file descriptor
10791079
that is open in the parent process. It is shared with the child
@@ -2170,6 +2170,11 @@ until this stream has been closed via `end()`.
21702170
If the child process was spawned with `stdio[0]` set to anything other than `'pipe'`,
21712171
then this will be `null`.
21722172
2173+
On non-Windows platforms, when `stdio[0]` is `'pipe'`, Node.js watches for the
2174+
child process closing its end of the stdin pipe and destroys `subprocess.stdin`
2175+
when that happens. This helps surface pipe peer-close semantics consistently for
2176+
the writable side of the stream.
2177+
21732178
`subprocess.stdin` is an alias for `subprocess.stdio[0]`. Both properties will
21742179
refer to the same value.
21752180

lib/internal/child_process.js

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,15 @@ function flushStdio(subprocess) {
332332
}
333333

334334

335-
function createSocket(pipe, readable) {
336-
return net.Socket({ handle: pipe, readable });
335+
function createSocket(pipe, readable, watchPeerClose) {
336+
const sock = net.Socket({ handle: pipe, readable });
337+
if (watchPeerClose &&
338+
process.platform !== 'win32' &&
339+
typeof pipe?.watchPeerClose === 'function') {
340+
pipe.watchPeerClose(() => sock.destroy());
341+
sock.once('close', () => pipe.unwatchPeerClose?.());
342+
}
343+
return sock;
337344
}
338345

339346

@@ -472,7 +479,7 @@ ChildProcess.prototype.spawn = function spawn(options) {
472479

473480
if (stream.handle) {
474481
stream.socket = createSocket(this.pid !== 0 ?
475-
stream.handle : null, i > 0);
482+
stream.handle : null, i > 0, i === 0);
476483

477484
if (i > 0 && this.pid !== 0) {
478485
this._closesNeeded++;

src/pipe_wrap.cc

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "handle_wrap.h"
2929
#include "node.h"
3030
#include "node_buffer.h"
31+
#include "node_errors.h"
3132
#include "node_external_reference.h"
3233
#include "stream_base-inl.h"
3334
#include "stream_wrap.h"
@@ -80,6 +81,8 @@ void PipeWrap::Initialize(Local<Object> target,
8081
SetProtoMethod(isolate, t, "listen", Listen);
8182
SetProtoMethod(isolate, t, "connect", Connect);
8283
SetProtoMethod(isolate, t, "open", Open);
84+
SetProtoMethod(isolate, t, "watchPeerClose", WatchPeerClose);
85+
SetProtoMethod(isolate, t, "unwatchPeerClose", UnwatchPeerClose);
8386

8487
#ifdef _WIN32
8588
SetProtoMethod(isolate, t, "setPendingInstances", SetPendingInstances);
@@ -110,6 +113,8 @@ void PipeWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
110113
registry->Register(Listen);
111114
registry->Register(Connect);
112115
registry->Register(Open);
116+
registry->Register(WatchPeerClose);
117+
registry->Register(UnwatchPeerClose);
113118
#ifdef _WIN32
114119
registry->Register(SetPendingInstances);
115120
#endif
@@ -159,6 +164,11 @@ PipeWrap::PipeWrap(Environment* env,
159164
// Suggestion: uv_pipe_init() returns void.
160165
}
161166

167+
PipeWrap::~PipeWrap() {
168+
peer_close_watching_ = false;
169+
peer_close_cb_.Reset();
170+
}
171+
162172
void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
163173
PipeWrap* wrap;
164174
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
@@ -213,6 +223,96 @@ void PipeWrap::Open(const FunctionCallbackInfo<Value>& args) {
213223
args.GetReturnValue().Set(err);
214224
}
215225

226+
void PipeWrap::WatchPeerClose(const FunctionCallbackInfo<Value>& args) {
227+
PipeWrap* wrap;
228+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
229+
230+
if (!wrap->IsAlive()) {
231+
return args.GetReturnValue().Set(UV_EBADF);
232+
}
233+
234+
if (wrap->peer_close_watching_) {
235+
return args.GetReturnValue().Set(0);
236+
}
237+
238+
CHECK_GT(args.Length(), 0);
239+
CHECK(args[0]->IsFunction());
240+
241+
Environment* env = wrap->env();
242+
Isolate* isolate = env->isolate();
243+
244+
// Store the JS callback securely so it isn't garbage collected.
245+
wrap->peer_close_cb_.Reset(isolate, args[0].As<Function>());
246+
wrap->peer_close_watching_ = true;
247+
248+
// Start reading to detect EOF/ECONNRESET from the peer.
249+
// We use our custom allocator and reader, ignoring actual data.
250+
int err = uv_read_start(wrap->stream(), PeerCloseAlloc, PeerCloseRead);
251+
if (err != 0) {
252+
wrap->peer_close_watching_ = false;
253+
wrap->peer_close_cb_.Reset();
254+
}
255+
args.GetReturnValue().Set(err);
256+
}
257+
258+
void PipeWrap::UnwatchPeerClose(const FunctionCallbackInfo<Value>& args) {
259+
PipeWrap* wrap;
260+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
261+
262+
if (!wrap->peer_close_watching_) {
263+
wrap->peer_close_cb_.Reset();
264+
return args.GetReturnValue().Set(0);
265+
}
266+
267+
// Stop listening and release the JS callback to prevent memory leaks.
268+
wrap->peer_close_watching_ = false;
269+
wrap->peer_close_cb_.Reset();
270+
args.GetReturnValue().Set(uv_read_stop(wrap->stream()));
271+
}
272+
273+
void PipeWrap::PeerCloseAlloc(uv_handle_t* handle,
274+
size_t suggested_size,
275+
uv_buf_t* buf) {
276+
// We only care about EOF, not the actual data.
277+
// Using a static 1-byte buffer avoids dynamic memory allocation overhead.
278+
static char scratch;
279+
*buf = uv_buf_init(&scratch, 1);
280+
}
281+
282+
void PipeWrap::PeerCloseRead(uv_stream_t* stream,
283+
ssize_t nread,
284+
const uv_buf_t* buf) {
285+
PipeWrap* wrap = static_cast<PipeWrap*>(stream->data);
286+
if (wrap == nullptr || !wrap->peer_close_watching_) return;
287+
288+
// Ignore actual data reads or EAGAIN (0). We only watch for disconnects.
289+
if (nread > 0 || nread == 0) return;
290+
291+
// Wait specifically for EOF or connection reset (peer closed).
292+
if (nread != UV_EOF && nread != UV_ECONNRESET) return;
293+
294+
// Peer has closed the connection. Stop reading immediately.
295+
wrap->peer_close_watching_ = false;
296+
uv_read_stop(stream);
297+
298+
if (wrap->peer_close_cb_.IsEmpty()) return;
299+
Environment* env = wrap->env();
300+
Isolate* isolate = env->isolate();
301+
302+
// Set up V8 context and handles to safely execute the JS callback.
303+
v8::HandleScope handle_scope(isolate);
304+
v8::Context::Scope context_scope(env->context());
305+
Local<Function> cb = wrap->peer_close_cb_.Get(isolate);
306+
// Reset before calling to prevent re-entrancy issues
307+
wrap->peer_close_cb_.Reset();
308+
309+
errors::TryCatchScope try_catch(env);
310+
try_catch.SetVerbose(true);
311+
312+
// MakeCallback properly tracks AsyncHooks context and flushes microtasks.
313+
wrap->MakeCallback(cb, 0, nullptr);
314+
}
315+
216316
void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
217317
Environment* env = Environment::GetCurrent(args);
218318

@@ -252,7 +352,6 @@ void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
252352

253353
args.GetReturnValue().Set(err);
254354
}
255-
256355
} // namespace node
257356

258357
NODE_BINDING_CONTEXT_AWARE_INTERNAL(pipe_wrap, node::PipeWrap::Initialize)

src/pipe_wrap.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
5454
SET_SELF_SIZE(PipeWrap)
5555

5656
private:
57+
~PipeWrap() override;
5758
PipeWrap(Environment* env,
5859
v8::Local<v8::Object> object,
5960
ProviderType provider,
@@ -64,12 +65,23 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
6465
static void Listen(const v8::FunctionCallbackInfo<v8::Value>& args);
6566
static void Connect(const v8::FunctionCallbackInfo<v8::Value>& args);
6667
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
68+
static void WatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
69+
static void UnwatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
70+
static void PeerCloseAlloc(uv_handle_t* handle,
71+
size_t suggested_size,
72+
uv_buf_t* buf);
73+
static void PeerCloseRead(uv_stream_t* stream,
74+
ssize_t nread,
75+
const uv_buf_t* buf);
6776

6877
#ifdef _WIN32
6978
static void SetPendingInstances(
7079
const v8::FunctionCallbackInfo<v8::Value>& args);
7180
#endif
7281
static void Fchmod(const v8::FunctionCallbackInfo<v8::Value>& args);
82+
83+
bool peer_close_watching_ = false;
84+
v8::Global<v8::Function> peer_close_cb_;
7385
};
7486

7587

test/async-hooks/test-pipewrap.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const processwrap = processes[0];
3535
const pipe1 = pipes[0];
3636
const pipe2 = pipes[1];
3737
const pipe3 = pipes[2];
38+
const pipe1ExpectedInvocations = process.platform === 'win32' ? 1 : 2;
3839

3940
assert.strictEqual(processwrap.type, 'PROCESSWRAP');
4041
assert.strictEqual(processwrap.triggerAsyncId, 1);
@@ -83,7 +84,11 @@ function onexit() {
8384
// Usually it is just one event, but it can be more.
8485
assert.ok(ioEvents >= 3, `at least 3 stdout io events, got ${ioEvents}`);
8586

86-
checkInvocations(pipe1, { init: 1, before: 1, after: 1 },
87+
checkInvocations(pipe1, {
88+
init: 1,
89+
before: pipe1ExpectedInvocations,
90+
after: pipe1ExpectedInvocations,
91+
},
8792
'pipe wrap when sleep.spawn was called');
8893
checkInvocations(pipe2, { init: 1, before: ioEvents, after: ioEvents },
8994
'pipe wrap when sleep.spawn was called');
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { spawn } = require('child_process');
6+
7+
if (common.isWindows) {
8+
common.skip('Not applicable on Windows');
9+
}
10+
11+
const child = spawn(process.execPath, [
12+
'-e',
13+
'require("fs").closeSync(0); setTimeout(() => {}, 2000)',
14+
], { stdio: ['pipe', 'ignore', 'ignore'] });
15+
16+
const timeout = setTimeout(() => {
17+
assert.fail('stdin close event was not emitted');
18+
}, 1000);
19+
20+
child.stdin.on('close', common.mustCall(() => {
21+
clearTimeout(timeout);
22+
child.kill();
23+
}));
24+
25+
child.on('exit', common.mustCall(() => {
26+
clearTimeout(timeout);
27+
}));

0 commit comments

Comments
 (0)