Skip to content

Commit 071fa18

Browse files
committed
child_process: watch pipe peer close event
Fixes: #25131
1 parent 052aec7 commit 071fa18

File tree

5 files changed

+147
-6
lines changed

5 files changed

+147
-6
lines changed

lib/internal/child_process.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,14 @@ function flushStdio(subprocess) {
333333

334334

335335
function createSocket(pipe, readable) {
336-
return net.Socket({ handle: pipe, readable });
336+
const sock = net.Socket({ handle: pipe, readable });
337+
if (!readable &&
338+
process.platform !== 'win32' &&
339+
typeof pipe?.watchPeerClose === 'function') {
340+
pipe.watchPeerClose(() => sock.destroy());
341+
sock.once('close', () => pipe.unwatchPeerClose?.());
342+
}
343+
return sock;
337344
}
338345

339346

src/pipe_wrap.cc

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "handle_wrap.h"
2929
#include "node.h"
3030
#include "node_buffer.h"
31+
#include "node_errors.h"
3132
#include "node_external_reference.h"
3233
#include "stream_base-inl.h"
3334
#include "stream_wrap.h"
@@ -80,6 +81,8 @@ void PipeWrap::Initialize(Local<Object> target,
8081
SetProtoMethod(isolate, t, "listen", Listen);
8182
SetProtoMethod(isolate, t, "connect", Connect);
8283
SetProtoMethod(isolate, t, "open", Open);
84+
SetProtoMethod(isolate, t, "watchPeerClose", WatchPeerClose);
85+
SetProtoMethod(isolate, t, "unwatchPeerClose", UnwatchPeerClose);
8386

8487
#ifdef _WIN32
8588
SetProtoMethod(isolate, t, "setPendingInstances", SetPendingInstances);
@@ -110,6 +113,8 @@ void PipeWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
110113
registry->Register(Listen);
111114
registry->Register(Connect);
112115
registry->Register(Open);
116+
registry->Register(WatchPeerClose);
117+
registry->Register(UnwatchPeerClose);
113118
#ifdef _WIN32
114119
registry->Register(SetPendingInstances);
115120
#endif
@@ -159,6 +164,11 @@ PipeWrap::PipeWrap(Environment* env,
159164
// Suggestion: uv_pipe_init() returns void.
160165
}
161166

167+
PipeWrap::~PipeWrap() {
168+
peer_close_watching_ = false;
169+
peer_close_cb_.Reset();
170+
}
171+
162172
void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
163173
PipeWrap* wrap;
164174
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
@@ -213,6 +223,96 @@ void PipeWrap::Open(const FunctionCallbackInfo<Value>& args) {
213223
args.GetReturnValue().Set(err);
214224
}
215225

226+
void PipeWrap::WatchPeerClose(const FunctionCallbackInfo<Value>& args) {
227+
PipeWrap* wrap;
228+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
229+
230+
if (!wrap->IsAlive()) {
231+
return args.GetReturnValue().Set(UV_EBADF);
232+
}
233+
234+
if (wrap->peer_close_watching_) {
235+
return args.GetReturnValue().Set(0);
236+
}
237+
238+
CHECK_GT(args.Length(), 0);
239+
CHECK(args[0]->IsFunction());
240+
241+
Environment* env = wrap->env();
242+
Isolate* isolate = env->isolate();
243+
244+
// Store the JS callback securely so it isn't garbage collected.
245+
wrap->peer_close_cb_.Reset(isolate, args[0].As<Function>());
246+
wrap->peer_close_watching_ = true;
247+
248+
// Start reading to detect EOF/ECONNRESET from the peer.
249+
// We use our custom allocator and reader, ignoring actual data.
250+
int err = uv_read_start(wrap->stream(), PeerCloseAlloc, PeerCloseRead);
251+
if (err != 0) {
252+
wrap->peer_close_watching_ = false;
253+
wrap->peer_close_cb_.Reset();
254+
}
255+
args.GetReturnValue().Set(err);
256+
}
257+
258+
void PipeWrap::UnwatchPeerClose(const FunctionCallbackInfo<Value>& args) {
259+
PipeWrap* wrap;
260+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
261+
262+
if (!wrap->peer_close_watching_) {
263+
wrap->peer_close_cb_.Reset();
264+
return args.GetReturnValue().Set(0);
265+
}
266+
267+
// Stop listening and release the JS callback to prevent memory leaks.
268+
wrap->peer_close_watching_ = false;
269+
wrap->peer_close_cb_.Reset();
270+
args.GetReturnValue().Set(uv_read_stop(wrap->stream()));
271+
}
272+
273+
void PipeWrap::PeerCloseAlloc(uv_handle_t* handle,
274+
size_t suggested_size,
275+
uv_buf_t* buf) {
276+
// We only care about EOF, not the actual data.
277+
// Using a static 1-byte buffer avoids dynamic memory allocation overhead.
278+
static char scratch;
279+
*buf = uv_buf_init(&scratch, 1);
280+
}
281+
282+
void PipeWrap::PeerCloseRead(uv_stream_t* stream,
283+
ssize_t nread,
284+
const uv_buf_t* buf) {
285+
PipeWrap* wrap = static_cast<PipeWrap*>(stream->data);
286+
if (wrap == nullptr || !wrap->peer_close_watching_) return;
287+
288+
// Ignore actual data reads or EAGAIN (0). We only watch for disconnects.
289+
if (nread > 0 || nread == 0) return;
290+
291+
// Wait specifically for EOF or connection reset (peer closed).
292+
if (nread != UV_EOF && nread != UV_ECONNRESET) return;
293+
294+
// Peer has closed the connection. Stop reading immediately.
295+
wrap->peer_close_watching_ = false;
296+
uv_read_stop(stream);
297+
298+
if (wrap->peer_close_cb_.IsEmpty()) return;
299+
Environment* env = wrap->env();
300+
Isolate* isolate = env->isolate();
301+
302+
// Set up V8 context and handles to safely execute the JS callback.
303+
v8::HandleScope handle_scope(isolate);
304+
v8::Context::Scope context_scope(env->context());
305+
Local<Function> cb = wrap->peer_close_cb_.Get(isolate);
306+
// Reset before calling to prevent re-entrancy issues
307+
wrap->peer_close_cb_.Reset();
308+
309+
errors::TryCatchScope try_catch(env);
310+
try_catch.SetVerbose(true);
311+
312+
// MakeCallback properly tracks AsyncHooks context and flushes microtasks.
313+
wrap->MakeCallback(cb, 0, nullptr);
314+
}
315+
216316
void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
217317
Environment* env = Environment::GetCurrent(args);
218318

@@ -252,7 +352,6 @@ void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
252352

253353
args.GetReturnValue().Set(err);
254354
}
255-
256355
} // namespace node
257356

258357
NODE_BINDING_CONTEXT_AWARE_INTERNAL(pipe_wrap, node::PipeWrap::Initialize)

src/pipe_wrap.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
5454
SET_SELF_SIZE(PipeWrap)
5555

5656
private:
57+
~PipeWrap() override;
5758
PipeWrap(Environment* env,
5859
v8::Local<v8::Object> object,
5960
ProviderType provider,
@@ -64,12 +65,23 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
6465
static void Listen(const v8::FunctionCallbackInfo<v8::Value>& args);
6566
static void Connect(const v8::FunctionCallbackInfo<v8::Value>& args);
6667
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
68+
static void WatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
69+
static void UnwatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
70+
static void PeerCloseAlloc(uv_handle_t* handle,
71+
size_t suggested_size,
72+
uv_buf_t* buf);
73+
static void PeerCloseRead(uv_stream_t* stream,
74+
ssize_t nread,
75+
const uv_buf_t* buf);
6776

6877
#ifdef _WIN32
6978
static void SetPendingInstances(
7079
const v8::FunctionCallbackInfo<v8::Value>& args);
7180
#endif
7281
static void Fchmod(const v8::FunctionCallbackInfo<v8::Value>& args);
82+
83+
bool peer_close_watching_ = false;
84+
v8::Global<v8::Function> peer_close_cb_;
7385
};
7486

7587

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { spawn } = require('child_process');
6+
7+
if (common.isWindows) {
8+
common.skip('Not applicable on Windows');
9+
}
10+
11+
const child = spawn(process.execPath, [
12+
'-e',
13+
'require("fs").closeSync(0); setTimeout(() => {}, 2000)',
14+
], { stdio: ['pipe', 'ignore', 'ignore'] });
15+
16+
const timeout = setTimeout(() => {
17+
assert.fail('stdin close event was not emitted');
18+
}, 1000);
19+
20+
child.stdin.on('close', common.mustCall(() => {
21+
clearTimeout(timeout);
22+
child.kill();
23+
}));
24+
25+
child.on('exit', common.mustCall(() => {
26+
clearTimeout(timeout);
27+
}));

tools/eslint/package-lock.json

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)