From ad2131716bdd45d4279a9780f50e11e1aa4302ef Mon Sep 17 00:00:00 2001 From: Zack Newsham Date: Thu, 23 Nov 2023 23:03:16 -0500 Subject: [PATCH] Fixes #50880 - enable passthrough IPC in watch mode --- lib/internal/main/watch_mode.js | 17 ++-- lib/internal/watch_mode/files_watcher.js | 24 +++++ test/sequential/test-watch-mode.mjs | 122 ++++++++++++++++++----- 3 files changed, 133 insertions(+), 30 deletions(-) diff --git a/lib/internal/main/watch_mode.js b/lib/internal/main/watch_mode.js index 09453e8902a2b9..88d14188739c1e 100644 --- a/lib/internal/main/watch_mode.js +++ b/lib/internal/main/watch_mode.js @@ -59,6 +59,7 @@ function start() { process.stdout.write(`${red}Failed running ${kCommandStr}${white}\n`); } }); + return child; } async function killAndWait(signal = kKillSignal, force = false) { @@ -91,29 +92,31 @@ function reportGracefulTermination() { }; } -async function stop() { +async function stop(child) { + // without this line, the child process is still able to receive IPC, but is unable to send additional messages + watcher.destroyIPC(child); watcher.clearFileFilters(); const clearGraceReport = reportGracefulTermination(); await killAndWait(); clearGraceReport(); } -async function restart() { +async function restart(child) { if (!kPreserveOutput) process.stdout.write(clear); process.stdout.write(`${green}Restarting ${kCommandStr}${white}\n`); - await stop(); - start(); + await stop(child); + return start(); } (async () => { emitExperimentalWarning('Watch mode'); - + let child; try { - start(); + child = start(); // eslint-disable-next-line no-unused-vars for await (const _ of on(watcher, 'changed')) { - await restart(); + child = await restart(child); } } catch (error) { triggerUncaughtException(error, true /* fromPromise */); diff --git a/lib/internal/watch_mode/files_watcher.js b/lib/internal/watch_mode/files_watcher.js index 3c756c4b5d77c9..3c7952678821ea 100644 --- a/lib/internal/watch_mode/files_watcher.js +++ b/lib/internal/watch_mode/files_watcher.js @@ -31,6 +31,8 @@ class FilesWatcher extends EventEmitter { #throttle; #mode; + #wantsPassthroughIPC = false; + constructor({ throttle = 500, mode = 'filter' } = kEmptyObject) { super(); @@ -38,6 +40,7 @@ class FilesWatcher extends EventEmitter { validateOneOf(mode, 'options.mode', ['filter', 'all']); this.#throttle = throttle; this.#mode = mode; + this.#wantsPassthroughIPC = !!process.send; } #isPathWatched(path) { @@ -117,7 +120,28 @@ class FilesWatcher extends EventEmitter { this.#ownerDependencies.set(owner, dependencies); } } + + + #setupIPC(child) { + child._ipcMessages = { + parentToChild: message => child.send(message), + childToParent: message => process.send(message) + }; + process.on("message", child._ipcMessages.parentToChild); + child.on("message", child._ipcMessages.childToParent); + } + + destroyIPC(child) { + if (this.#wantsPassthroughIPC) { + process.off("message", child._ipcMessages.parentToChild); + child.off("message", child._ipcMessages.childToParent); + } + } + watchChildProcessModules(child, key = null) { + if (this.#wantsPassthroughIPC) { + this.#setupIPC(child); + } if (this.#mode !== 'filter') { return; } diff --git a/test/sequential/test-watch-mode.mjs b/test/sequential/test-watch-mode.mjs index 117d8d9681f2bf..04ba986df09762 100644 --- a/test/sequential/test-watch-mode.mjs +++ b/test/sequential/test-watch-mode.mjs @@ -1,3 +1,4 @@ +import fs from 'fs/promises'; import * as common from '../common/index.mjs'; import * as fixtures from '../common/fixtures.mjs'; import tmpdir from '../common/tmpdir.js'; @@ -34,6 +35,8 @@ async function spawnWithRestarts({ watchedFile = file, restarts = 1, isReady, + spawnOptions, + returnChild = false }) { args ??= [file]; const printedArgs = inspect(args.slice(args.indexOf(file)).join(' ')); @@ -44,30 +47,36 @@ async function spawnWithRestarts({ let cancelRestarts; disableRestart = true; - const child = spawn(execPath, ['--watch', '--no-warnings', ...args], { encoding: 'utf8' }); - child.stderr.on('data', (data) => { - stderr += data; - }); - child.stdout.on('data', async (data) => { - if (data.toString().includes('Restarting')) { - disableRestart = true; - } - stdout += data; - const restartsCount = stdout.match(new RegExp(`Restarting ${printedArgs.replace(/\\/g, '\\\\')}`, 'g'))?.length ?? 0; - if (restarts === 0 || !isReady(data.toString())) { - return; - } - if (restartsCount >= restarts) { - cancelRestarts?.(); - child.kill(); - return; - } - cancelRestarts ??= restart(watchedFile); - if (isReady(data.toString())) { - disableRestart = false; - } - }); + const child = spawn(execPath, ['--watch', '--no-warnings', ...args], { encoding: 'utf8', ...spawnOptions }); + if (!returnChild) { + child.stderr.on('data', (data) => { + stderr += data; + }); + child.stdout.on('data', async (data) => { + if (data.toString().includes('Restarting')) { + disableRestart = true; + } + stdout += data; + const restartsCount = stdout.match(new RegExp(`Restarting ${printedArgs.replace(/\\/g, '\\\\')}`, 'g'))?.length ?? 0; + if (restarts === 0 || !isReady(data.toString())) { + return; + } + if (restartsCount >= restarts) { + cancelRestarts?.(); + child.kill(); + return; + } + cancelRestarts ??= restart(watchedFile); + if (isReady(data.toString())) { + disableRestart = false; + } + }); + } + else { + // this test is doing it's own thing + return { child }; + } await once(child, 'exit'); cancelRestarts?.(); return { stderr, stdout }; @@ -248,6 +257,7 @@ describe('watch mode', { concurrency: false, timeout: 60_000 }, () => { }); }); + // TODO: Remove skip after https://github.com/nodejs/node/pull/45271 lands it('should not watch when running an missing file', { skip: !supportsRecursive @@ -307,4 +317,70 @@ describe('watch mode', { concurrency: false, timeout: 60_000 }, () => { `Completed running ${inspect(file)}`, ]); }); + + it('should pass IPC messages from a spawning parent to the child and back', async () => { + const file = createTmpFile('console.log("running");\nprocess.on("message", (message) => {\n if (message === "exit") {\n process.exit(0);\n } else {\n console.log("Received:", message);\n process.send(message);\n }\n})'); + const { child } = await spawnWithRestarts({ + file, + args: [file], + spawnOptions: { + stdio: ['pipe', 'pipe', 'pipe', 'ipc'], + }, + returnChild: true, + restarts: 2 + }); + + let stderr = ''; + let stdout = ''; + + child.stdout.on("data", data => stdout += data); + child.stderr.on("data", data => stderr += data); + async function waitForEcho(msg) { + const receivedPromise = new Promise((resolve) => { + const fn = (message) => { + if (message === msg) { + child.off("message", fn); + resolve(); + } + }; + child.on("message", fn); + }); + child.send(msg); + await receivedPromise; + } + async function waitForText(text) { + const seenPromise = new Promise((resolve) => { + const fn = (data) => { + if (data.toString().includes(text)) { + resolve(); + child.stdout.off("data", fn); + } + } + child.stdout.on("data", fn); + }); + await seenPromise; + } + + await waitForEcho("first message"); + const stopRestarts = restart(file); + await waitForText("running"); + stopRestarts(); + await waitForEcho("second message"); + const exitedPromise = once(child, 'exit'); + child.send("exit"); + await waitForText("Completed"); + child.disconnect(); + child.kill(); + await exitedPromise; + assert.strictEqual(stderr, ''); + const lines = stdout.split(/\r?\n/).filter(Boolean); + assert.deepStrictEqual(lines, [ + 'running', + 'Received: first message', + `Restarting '${file}'`, + 'running', + 'Received: second message', + `Completed running ${inspect(file)}`, + ]); + }); });