diff --git a/Cargo.lock b/Cargo.lock index 6dc53a67fa519f..9156cc36ac5d3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,6 +328,18 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.4" @@ -2226,6 +2238,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -4329,6 +4351,7 @@ dependencies = [ "turbo-tasks", "turbo-tasks-backend", "turbo-tasks-malloc", + "turbopack-node", "turbopack-trace-utils", ] @@ -4470,6 +4493,7 @@ dependencies = [ "turbopack-core", "turbopack-ecmascript-hmr-protocol", "turbopack-ecmascript-plugins", + "turbopack-node", "turbopack-trace-server", "turbopack-trace-utils", "url", @@ -9933,15 +9957,19 @@ name = "turbopack-node" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "async-stream", "async-trait", "base64 0.21.4", "bincode 2.0.1", "const_format", + "dashmap 6.1.0", "either", "futures", "futures-retry", "indoc", + "napi", + "napi-derive", "once_cell", "owo-colors", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 857020e0b32b6d..e34dceeb7fb21f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -328,7 +328,7 @@ turbopack-env = { path = "turbopack/crates/turbopack-env" } turbopack-image = { path = "turbopack/crates/turbopack-image" } turbopack-json = { path = "turbopack/crates/turbopack-json" } turbopack-mdx = { path = "turbopack/crates/turbopack-mdx" } -turbopack-node = { path = "turbopack/crates/turbopack-node" } +turbopack-node = { path = "turbopack/crates/turbopack-node", default-features = false } turbopack-resolve = { path = "turbopack/crates/turbopack-resolve" } turbopack-static = { path = "turbopack/crates/turbopack-static" } turbopack-swc-utils = { path = "turbopack/crates/turbopack-swc-utils" } @@ -422,6 +422,8 @@ napi = { version = "2", default-features = false, features = [ "napi5", "compat-mode", ] } +napi-derive = "2" +napi-build = "2" notify = "8.1.0" once_cell = "1.17.1" owo-colors = "4.2.2" diff --git a/crates/napi/Cargo.toml b/crates/napi/Cargo.toml index f754b9ab80b847..f04edd6cf777fd 100644 --- a/crates/napi/Cargo.toml +++ b/crates/napi/Cargo.toml @@ -46,12 +46,12 @@ workspace = true [package.metadata.cargo-shear] ignored = [ - # we need to set features on these packages when building for WASM, but we don't directly use them - "getrandom", - "iana-time-zone", - # the plugins feature needs to set a feature on this transitively depended-on package, we never - # directly import it - "turbopack-ecmascript-plugins", + # we need to set features on these packages when building for WASM, but we don't directly use them + "getrandom", + "iana-time-zone", + # the plugins feature needs to set a feature on this transitively depended-on package, we never + # directly import it + "turbopack-ecmascript-plugins", ] [dependencies] @@ -64,7 +64,7 @@ flate2 = { workspace = true } futures-util = { workspace = true } owo-colors = { workspace = true } napi = { workspace = true } -napi-derive = "2" +napi-derive = { workspace = true } next-custom-transforms = { workspace = true } next-taskless = { workspace = true } rand = { workspace = true } @@ -116,7 +116,7 @@ next-core = { workspace = true } mdxjs = { workspace = true, features = ["serializable"] } turbo-tasks-malloc = { workspace = true, default-features = false, features = [ - "custom_allocator" + "custom_allocator", ] } turbopack-core = { workspace = true } @@ -124,6 +124,7 @@ turbopack-ecmascript-hmr-protocol = { workspace = true } turbopack-trace-utils = { workspace = true } turbopack-trace-server = { workspace = true } turbopack-ecmascript-plugins = { workspace = true, optional = true } +turbopack-node = { workspace = true, default-features = false, features = ["worker_pool"] } [target.'cfg(windows)'.dependencies] windows-sys = "0.60" @@ -144,8 +145,7 @@ tokio = { workspace = true, features = ["full"] } [build-dependencies] anyhow = { workspace = true } -napi-build = "2" +napi-build = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } vergen-gitcl = { workspace = true } - diff --git a/crates/next-api/Cargo.toml b/crates/next-api/Cargo.toml index c5f733031a0bf4..6068bd198d2aac 100644 --- a/crates/next-api/Cargo.toml +++ b/crates/next-api/Cargo.toml @@ -40,7 +40,7 @@ turbopack-browser = { workspace = true } turbopack-core = { workspace = true } turbopack-css = { workspace = true } turbopack-ecmascript = { workspace = true } -turbopack-node = { workspace = true } +turbopack-node = { workspace = true, default-features = false } turbopack-nodejs = { workspace = true } turbopack-resolve = { workspace = true } turbopack-wasm = { workspace = true } diff --git a/crates/next-build-test/Cargo.toml b/crates/next-build-test/Cargo.toml index b2fc71e70f6f6a..6c9a51cf34f3e8 100644 --- a/crates/next-build-test/Cargo.toml +++ b/crates/next-build-test/Cargo.toml @@ -27,4 +27,4 @@ turbo-tasks = { workspace = true } turbo-tasks-backend = { workspace = true } turbo-tasks-malloc = { workspace = true } turbopack-trace-utils = { workspace = true } - +turbopack-node = { workspace = true, default-features = false, features = ["process_pool"]} diff --git a/crates/next-core/Cargo.toml b/crates/next-core/Cargo.toml index a02c6bdae134ee..d11935d4873008 100644 --- a/crates/next-core/Cargo.toml +++ b/crates/next-core/Cargo.toml @@ -79,7 +79,7 @@ turbopack-ecmascript-plugins = { workspace = true, features = [ ] } turbopack-ecmascript-runtime = { workspace = true } turbopack-image = { workspace = true } -turbopack-node = { workspace = true } +turbopack-node = { workspace = true, default-features = false } turbopack-nodejs = { workspace = true } turbopack-resolve = { workspace = true } turbopack-static = { workspace = true } diff --git a/packages/next/src/build/swc/generated-native.d.ts b/packages/next/src/build/swc/generated-native.d.ts index 83f2f1d87e2878..2dfc9a6a904a63 100644 --- a/packages/next/src/build/swc/generated-native.d.ts +++ b/packages/next/src/build/swc/generated-native.d.ts @@ -34,6 +34,28 @@ export declare class ExternalObject { [K: symbol]: T } } +export interface NapiPoolOptions { + filename: RcStr + concurrency: number + env: Record + cwd: RcStr +} +export interface WorkerTermination { + filename: RcStr + workerId: number +} +export declare function recvPoolRequest(): Promise +export declare function recvWorkerTermination(): Promise +export declare function recvWorkerRequest(filename: RcStr): Promise +export declare function recvMessageInWorker(workerId: number): Promise +export declare function notifyWorkerAck( + taskId: number, + workerId: number +): Promise +export declare function sendTaskMessage( + taskId: number, + message: string +): Promise export declare function lockfileTryAcquireSync( path: string ): { __napiType: 'Lockfile' } | null diff --git a/packages/next/src/build/swc/index.ts b/packages/next/src/build/swc/index.ts index 18b8ef1d252037..4b6727ee058e07 100644 --- a/packages/next/src/build/swc/index.ts +++ b/packages/next/src/build/swc/index.ts @@ -42,6 +42,7 @@ import type { WrittenEndpoint, } from './types' import { throwTurbopackInternalError } from '../../shared/lib/turbopack/internal-error' +import { runLoaderWorkerPool } from './loaderWorkerPool' type RawBindings = typeof import('./generated-native') type RawWasmBindings = typeof import('./generated-wasm') & { @@ -494,6 +495,7 @@ const normalizePathOnWindows = (p: string) => // TODO(sokra) Support wasm option. function bindingToApi( binding: RawBindings, + bindingPath: string, _wasm: boolean ): Binding['turbo']['createProject'] { type NativeFunction = ( @@ -674,6 +676,13 @@ function bindingToApi( constructor(nativeProject: { __napiType: 'Project' }) { this._nativeProject = nativeProject + + if ( + typeof binding.recvPoolRequest === 'function' && + typeof binding.recvWorkerTermination === 'function' + ) { + runLoaderWorkerPool(binding, bindingPath) + } } async update(options: PartialProjectOptions) { @@ -1361,20 +1370,24 @@ function loadNative(importPath?: string) { throw new Error('cannot run loadNative when `NEXT_TEST_WASM` is set') } + const customBindingsPath = !!__INTERNAL_CUSTOM_TURBOPACK_BINDINGS + ? require.resolve(__INTERNAL_CUSTOM_TURBOPACK_BINDINGS) + : null const customBindings: RawBindings = !!__INTERNAL_CUSTOM_TURBOPACK_BINDINGS ? require(__INTERNAL_CUSTOM_TURBOPACK_BINDINGS) : null let bindings: RawBindings = customBindings + let bindingsPath = customBindingsPath let attempts: any[] = [] const NEXT_TEST_NATIVE_DIR = process.env.NEXT_TEST_NATIVE_DIR for (const triple of triples) { if (NEXT_TEST_NATIVE_DIR) { try { + const bindingForTest = `${NEXT_TEST_NATIVE_DIR}/next-swc.${triple.platformArchABI}.node` // Use the binary directly to skip `pnpm pack` for testing as it's slow because of the large native binary. - bindings = require( - `${NEXT_TEST_NATIVE_DIR}/next-swc.${triple.platformArchABI}.node` - ) + bindings = require(bindingForTest) + bindingsPath = require.resolve(bindingForTest) infoLog( 'next-swc build: local built @next/swc from NEXT_TEST_NATIVE_DIR' ) @@ -1382,9 +1395,9 @@ function loadNative(importPath?: string) { } catch (e) {} } else { try { - bindings = require( - `@next/swc/native/next-swc.${triple.platformArchABI}.node` - ) + const normalBinding = `@next/swc/native/next-swc.${triple.platformArchABI}.node` + bindings = require(normalBinding) + bindingsPath = require.resolve(normalBinding) infoLog('next-swc build: local built @next/swc') break } catch (e) {} @@ -1402,6 +1415,7 @@ function loadNative(importPath?: string) { : `@next/swc-${triple.platformArchABI}` try { bindings = require(pkg) + bindingsPath = require.resolve(pkg) if (!importPath) { checkVersionMismatch(require(`${pkg}/package.json`)) } @@ -1480,7 +1494,11 @@ function loadNative(importPath?: string) { initCustomTraceSubscriber: bindings.initCustomTraceSubscriber, teardownTraceSubscriber: bindings.teardownTraceSubscriber, turbo: { - createProject: bindingToApi(customBindings ?? bindings, false), + createProject: bindingToApi( + customBindings ?? bindings, + customBindingsPath ?? bindingsPath!, + false + ), startTurbopackTraceServer(traceFilePath, port) { Log.warn( `Turbopack trace server started. View trace at https://trace.nextjs.org${port != null ? `?port=${port}` : ''}` diff --git a/packages/next/src/build/swc/loaderWorkerPool.ts b/packages/next/src/build/swc/loaderWorkerPool.ts new file mode 100644 index 00000000000000..255e83fe252f62 --- /dev/null +++ b/packages/next/src/build/swc/loaderWorkerPool.ts @@ -0,0 +1,96 @@ +import { Worker } from 'worker_threads' + +const loaderWorkers: Record> = {} + +const KillMsg = '__kill__' + +export async function runLoaderWorkerPool( + binding: typeof import('./generated-native'), + bindingPath: string +) { + await Promise.all([ + runPoolScaler(binding, bindingPath), + runWorkerTerminator(binding), + ]) +} + +async function runPoolScaler( + binding: typeof import('./generated-native'), + bindingPath: string +) { + while (true) { + try { + console.log('[loaderWorkerPool] waiting for pool request') + let poolOptions = await binding.recvPoolRequest() + console.log('[loaderWorkerPool] received pool request', { + ...poolOptions, + env: 'omitted', + }) + const { filename, concurrency, env, cwd } = poolOptions + // Wildcard of "*" meaning to scale all of pools even with different poolId + const workers = + filename === '*' + ? Object.values(loaderWorkers).flat() + : loaderWorkers[filename] || (loaderWorkers[filename] = []) + if (workers.length < concurrency) { + for (let i = workers.length; i < concurrency; i++) { + const worker = new Worker(filename, { + workerData: { + poolId: filename, + bindingPath, + cwd, + }, + env, + }) + workers.push(worker) + } + } else if (workers.length > concurrency) { + const workersToKill = workers.splice(0, workers.length - concurrency) + workersToKill.forEach(terminateWorker) + } + } catch (_) { + // rust channel closed, do nothing + return + } + } +} + +async function runWorkerTerminator( + binding: typeof import('./generated-native') +) { + while (true) { + try { + console.log('[loaderWorkerPool] waiting for worker termination') + const { filename, workerId } = await binding.recvWorkerTermination() + console.log('[loaderWorkerPool] received worker termination', { + filename, + workerId, + }) + const workers = loaderWorkers[filename] + const workerIdx = workers.findIndex( + (worker) => worker.threadId === workerId + ) + if (workerIdx > -1) { + const workersToKill = workers.splice(workerIdx, 1) + workersToKill.forEach(terminateWorker) + } + } catch (_) { + // rust channel closed, do nothing + return + } + } +} + +async function terminateWorker(worker: Worker) { + await new Promise((resolve) => { + const onMessage = (msg: any) => { + if (msg === KillMsg) { + worker.off('message', onMessage) + resolve() + } + } + worker.on('message', onMessage) + worker.postMessage(KillMsg) + }) + await worker.terminate() +} diff --git a/turbopack/crates/turbo-rcstr/src/lib.rs b/turbopack/crates/turbo-rcstr/src/lib.rs index e6c32e5639e18b..f5330c041cc412 100644 --- a/turbopack/crates/turbo-rcstr/src/lib.rs +++ b/turbopack/crates/turbo-rcstr/src/lib.rs @@ -240,6 +240,12 @@ impl AsRef<[u8]> for RcStr { } } +impl AsRef for RcStr { + fn as_ref(&self) -> &str { + self.as_str() + } +} + impl From for BytesStr { fn from(value: RcStr) -> Self { Self::from_str_slice(value.as_str()) diff --git a/turbopack/crates/turbopack-cli/Cargo.toml b/turbopack/crates/turbopack-cli/Cargo.toml index bc37b050ec5b5a..74ea226f18200a 100644 --- a/turbopack/crates/turbopack-cli/Cargo.toml +++ b/turbopack/crates/turbopack-cli/Cargo.toml @@ -68,7 +68,7 @@ turbopack-ecmascript-plugins = { workspace = true, features = [ ] } turbopack-ecmascript-runtime = { workspace = true } turbopack-env = { workspace = true } -turbopack-node = { workspace = true } +turbopack-node = { workspace = true, default-features = false, features = ["process_pool"]} turbopack-nodejs = { workspace = true } turbopack-resolve = { workspace = true } turbopack-trace-utils = { workspace = true } diff --git a/turbopack/crates/turbopack-node/Cargo.toml b/turbopack/crates/turbopack-node/Cargo.toml index ad8ec886484cb0..7226ec583fe75a 100644 --- a/turbopack/crates/turbopack-node/Cargo.toml +++ b/turbopack/crates/turbopack-node/Cargo.toml @@ -10,8 +10,11 @@ autobenches = false bench = false [features] +default = ["process_pool"] # enable "HMR" for embedded assets dynamic_embed_contents = ["turbo-tasks-fs/dynamic_embed_contents"] +process_pool = ["tokio/full"] +worker_pool = ["async-channel", "napi", "napi-derive", "turbo-rcstr/napi"] [lints] workspace = true @@ -20,6 +23,7 @@ workspace = true anyhow = { workspace = true } async-stream = "0.3.4" async-trait = { workspace = true } +dashmap = { workspace = true } base64 = "0.21.0" bincode = { workspace = true } const_format = { workspace = true } @@ -35,7 +39,7 @@ rustc-hash = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } -tokio = { workspace = true, features = ["full"] } +tokio = { workspace = true, optional = true } tracing = { workspace = true } turbo-bincode = { workspace = true } turbo-rcstr = { workspace = true } @@ -47,4 +51,6 @@ turbopack-cli-utils = { workspace = true } turbopack-core = { workspace = true } turbopack-ecmascript = { workspace = true } turbopack-resolve = { workspace = true } - +napi = { workspace = true, features = ["error_anyhow"], optional = true } +napi-derive = { workspace = true, optional = true } +async-channel = { version = "2.5.0", optional = true } diff --git a/turbopack/crates/turbopack-node/js/src/ipc/evaluate.ts b/turbopack/crates/turbopack-node/js/src/child_process/evaluate.ts similarity index 95% rename from turbopack/crates/turbopack-node/js/src/ipc/evaluate.ts rename to turbopack/crates/turbopack-node/js/src/child_process/evaluate.ts index 018a3505ddf18c..e7ed556f6c5382 100644 --- a/turbopack/crates/turbopack-node/js/src/ipc/evaluate.ts +++ b/turbopack/crates/turbopack-node/js/src/child_process/evaluate.ts @@ -1,6 +1,8 @@ import { IPC } from './index' import type { Ipc as GenericIpc } from './index' +import type { Channel as Ipc } from '../types' + type IpcIncomingMessage = | { type: 'evaluate' @@ -29,11 +31,6 @@ type IpcOutgoingMessage = data: any } -export type Ipc = { - sendInfo(message: IM): Promise - sendRequest(message: RM): Promise - sendError(error: Error): Promise -} const ipc = IPC as GenericIpc const queue: string[][] = [] diff --git a/turbopack/crates/turbopack-node/js/src/globals.ts b/turbopack/crates/turbopack-node/js/src/child_process/globals.ts similarity index 100% rename from turbopack/crates/turbopack-node/js/src/globals.ts rename to turbopack/crates/turbopack-node/js/src/child_process/globals.ts diff --git a/turbopack/crates/turbopack-node/js/src/ipc/index.ts b/turbopack/crates/turbopack-node/js/src/child_process/index.ts similarity index 91% rename from turbopack/crates/turbopack-node/js/src/ipc/index.ts rename to turbopack/crates/turbopack-node/js/src/child_process/index.ts index d01d80cd708f39..d1b4773f78230c 100644 --- a/turbopack/crates/turbopack-node/js/src/ipc/index.ts +++ b/turbopack/crates/turbopack-node/js/src/child_process/index.ts @@ -1,26 +1,6 @@ import { createConnection } from 'node:net' import { Writable } from 'node:stream' -import type { StackFrame } from '../compiled/stacktrace-parser' -import { parse as parseStackTrace } from '../compiled/stacktrace-parser' -import { getProperError } from './error' - -export type StructuredError = { - name: string - message: string - stack: StackFrame[] - cause: StructuredError | undefined -} - -export function structuredError(e: unknown): StructuredError { - e = getProperError(e) - - return { - name: e.name, - message: e.message, - stack: typeof e.stack === 'string' ? parseStackTrace(e.stack) : [], - cause: e.cause ? structuredError(getProperError(e.cause)) : undefined, - } -} +import { structuredError } from '../error' type State = | { diff --git a/turbopack/crates/turbopack-node/js/src/ipc/error.ts b/turbopack/crates/turbopack-node/js/src/error.ts similarity index 74% rename from turbopack/crates/turbopack-node/js/src/ipc/error.ts rename to turbopack/crates/turbopack-node/js/src/error.ts index 62f02e86a50525..f02ff477700a0e 100644 --- a/turbopack/crates/turbopack-node/js/src/ipc/error.ts +++ b/turbopack/crates/turbopack-node/js/src/error.ts @@ -1,3 +1,6 @@ +import type { StackFrame } from './compiled/stacktrace-parser' +import { parse as parseStackTrace } from './compiled/stacktrace-parser' + // merged from next.js // https://github.com/vercel/next.js/blob/e657741b9908cf0044aaef959c0c4defb19ed6d8/packages/next/src/lib/is-error.ts // https://github.com/vercel/next.js/blob/e657741b9908cf0044aaef959c0c4defb19ed6d8/packages/next/src/shared/lib/is-plain-object.ts @@ -50,3 +53,21 @@ function isPlainObject(value: any): boolean { */ return prototype === null || prototype.hasOwnProperty('isPrototypeOf') } + +export type StructuredError = { + name: string + message: string + stack: StackFrame[] + cause: StructuredError | undefined +} + +export function structuredError(e: Error | string): StructuredError { + e = getProperError(e) + + return { + name: e.name, + message: e.message, + stack: typeof e.stack === 'string' ? parseStackTrace(e.stack) : [], + cause: e.cause ? structuredError(getProperError(e.cause)) : undefined, + } +} diff --git a/turbopack/crates/turbopack-node/js/src/transforms/transforms.ts b/turbopack/crates/turbopack-node/js/src/transforms/transforms.ts index bfd925a7f234f9..3c8f8bc8bbdf0c 100644 --- a/turbopack/crates/turbopack-node/js/src/transforms/transforms.ts +++ b/turbopack/crates/turbopack-node/js/src/transforms/transforms.ts @@ -2,10 +2,11 @@ * Shared utilities for our 2 transform implementations. */ -import type { Ipc } from '../ipc/evaluate' +import type { Channel as Ipc } from '../types' import { relative, isAbsolute, join, sep } from 'path' -import { type StructuredError } from '../ipc' +import { type StructuredError } from '../error' import { type StackFrame } from '../compiled/stacktrace-parser' +import { isMainThread, workerData } from 'worker_threads' export type IpcInfoMessage = | { @@ -44,7 +45,8 @@ export type IpcRequestMessage = export type TransformIpc = Ipc -const contextDir = process.cwd() +const contextDir = + !isMainThread && workerData.cwd ? workerData.cwd : process.cwd() export const toPath = (file: string) => { const relPath = relative(contextDir, file) if (isAbsolute(relPath)) { diff --git a/turbopack/crates/turbopack-node/js/src/transforms/webpack-loaders.ts b/turbopack/crates/turbopack-node/js/src/transforms/webpack-loaders.ts index e72e3065d2be03..dbf5c1297d4263 100644 --- a/turbopack/crates/turbopack-node/js/src/transforms/webpack-loaders.ts +++ b/turbopack/crates/turbopack-node/js/src/transforms/webpack-loaders.ts @@ -1,14 +1,14 @@ declare const __turbopack_external_require__: { - resolve: (name: string, opt: { paths: string[] }) => string + resolve: (name: string, opt?: { paths: string[] }) => string } & ((id: string, thunk: () => any, esm?: boolean) => any) -import type { Ipc } from '../ipc/evaluate' +import type { Channel as Ipc } from '../types' import { dirname, resolve as pathResolve, relative } from 'path' import { StackFrame, parse as parseStackTrace, } from '../compiled/stacktrace-parser' -import { structuredError, type StructuredError } from '../ipc' +import { structuredError, type StructuredError } from '../error' import { fromPath, getReadEnvVariables, @@ -17,6 +17,7 @@ import { } from './transforms' import fs from 'fs' import path from 'path' +import { isMainThread, workerData } from 'worker_threads' export type IpcInfoMessage = | { @@ -59,7 +60,8 @@ const { runLoaders, }: typeof import('loader-runner') = require('@vercel/turbopack/loader-runner') -const contextDir = process.cwd() +const contextDir = + !isMainThread && workerData.cwd ? workerData.cwd : process.cwd() const LogType = Object.freeze({ error: 'error', diff --git a/turbopack/crates/turbopack-node/js/src/types.ts b/turbopack/crates/turbopack-node/js/src/types.ts new file mode 100644 index 00000000000000..f37d49d86cec27 --- /dev/null +++ b/turbopack/crates/turbopack-node/js/src/types.ts @@ -0,0 +1,5 @@ +export type Channel = { + sendInfo(message: IM): Promise + sendRequest(message: RM): Promise + sendError(error: Error): Promise +} diff --git a/turbopack/crates/turbopack-node/js/src/worker_threads/evaluate.ts b/turbopack/crates/turbopack-node/js/src/worker_threads/evaluate.ts new file mode 100644 index 00000000000000..4ddaa05efdc54a --- /dev/null +++ b/turbopack/crates/turbopack-node/js/src/worker_threads/evaluate.ts @@ -0,0 +1,145 @@ +import { threadId as workerId, workerData, parentPort } from 'worker_threads' +import { structuredError } from '../error' +import type { Channel } from '../types' +import { Binding, TaskChannel } from './taskChannel' + +const binding: Binding = require( + /* turbopackIgnore: true */ workerData.bindingPath +) + +const KillMsg = '__kill__' + +let willKill = false + +parentPort!.on('message', (msg) => { + if (msg === KillMsg) { + willKill = true + } +}) + +export const run = async ( + moduleFactory: () => Promise<{ + init?: () => Promise + default: (channel: Channel, ...deserializedArgs: any[]) => any + }> +) => { + let getValue: (channel: Channel, ...deserializedArgs: any[]) => any + + let isRunning = false + let runningTask: Promise | undefined + + const run = async (taskId: number, args: string[]) => { + try { + if (typeof getValue !== 'function') { + const module = await moduleFactory() + if (typeof module.init === 'function') { + await module.init() + } + getValue = module.default + } + const value = await getValue(new TaskChannel(binding, taskId), ...args) + console.log(`[worker ${workerId}] sending end task message for ${taskId}`) + await binding.sendTaskMessage( + taskId, + JSON.stringify({ + type: 'end', + data: value === undefined ? undefined : JSON.stringify(value), + duration: 0, + }) + ) + console.log(`[worker ${workerId}] sent end task message for ${taskId}`) + } catch (err) { + console.log( + `[worker ${workerId}] sending error task message for ${taskId}` + ) + await binding.sendTaskMessage( + taskId, + JSON.stringify({ + type: 'error', + ...structuredError(err as Error), + }) + ) + console.log(`[worker ${workerId}] sent error task message for ${taskId}`) + } + isRunning = false + runningTask = undefined + } + + const loop = async () => { + console.log(`[worker ${workerId}] waiting for worker request`) + let taskId: number | undefined + let msg_str: string + console.log(`[worker ${workerId}] received worker request ${taskId}`) + + console.log(`[worker ${workerId}] notifying worker ack ${taskId}`) + if (isRunning) { + msg_str = await binding.recvMessageInWorker(workerId) + } else { + taskId = await binding.recvWorkerRequest(workerData.poolId) + await binding.notifyWorkerAck(taskId, workerId) + console.log(`[worker ${workerId}] notified worker ack ${taskId}`) + console.log(`[worker ${workerId}] waiting for message in worker`) + msg_str = await binding.recvMessageInWorker(workerId) + console.log(`[worker ${workerId}] received message in worker`) + } + + const msg = JSON.parse(msg_str) as + | { + type: 'evaluate' + args: string[] + } + | { + type: 'result' + id: number + error?: string + data?: any + } + + switch (msg.type) { + case 'evaluate': { + if (!isRunning && taskId !== undefined) { + isRunning = true + runningTask = run(taskId, msg.args) + } + break + } + case 'result': { + const request = TaskChannel.requests.get(msg.id) + if (request) { + TaskChannel.requests.delete(msg.id) + if (msg.error) { + // Need to reject at next macro task queue, because some rejection callbacks is not registered when executing to here, + // that will cause the error be propergated to schedule thread, then causing panic. + // The situation always happen when using sass-loader, it will try to resolve many posible dependencies, + // some of them may fail with error. + setTimeout(() => request.reject(new Error(msg.error)), 0) + } else { + request.resolve(msg.data) + } + } + break + } + default: { + console.error('unexpected message type', (msg as any).type) + } + } + } + + while (true) { + if (willKill) { + if (runningTask) { + await runningTask + } + parentPort!.postMessage(KillMsg) + return + } + + const loopTask = loop() + + if (!isRunning) { + runningTask = loopTask + } + + await loopTask + } +} diff --git a/turbopack/crates/turbopack-node/js/src/worker_threads/taskChannel.ts b/turbopack/crates/turbopack-node/js/src/worker_threads/taskChannel.ts new file mode 100644 index 00000000000000..ec4c53e1074150 --- /dev/null +++ b/turbopack/crates/turbopack-node/js/src/worker_threads/taskChannel.ts @@ -0,0 +1,61 @@ +import { structuredError } from '../error' + +export interface Binding { + recvWorkerRequest(poolId: string): Promise + recvMessageInWorker(workerId: number): Promise + notifyWorkerAck(taskId: number, workerId: number): Promise + sendTaskMessage(taskId: number, message: string): Promise +} + +// Export this, maybe in the future, we can add an implementation via web worker on browser +export class TaskChannel { + static nextId = 1 + static requests = new Map() + + constructor( + private binding: Binding, + private taskId: number + ) {} + + async sendInfo(message: any) { + return await this.binding.sendTaskMessage( + this.taskId, + JSON.stringify({ + type: 'info', + data: message, + }) + ) + } + + async sendRequest(message: any) { + const id = TaskChannel.nextId++ + let resolve, reject + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + TaskChannel.requests.set(id, { resolve, reject }) + return await this.binding + .sendTaskMessage( + this.taskId, + JSON.stringify({ type: 'request', id, data: message }) + ) + .then(() => promise) + } + + async sendError(error: Error) { + try { + await this.binding.sendTaskMessage( + this.taskId, + JSON.stringify({ + type: 'error', + ...structuredError(error), + }) + ) + } catch (err) { + // There's nothing we can do about errors that happen after this point, we can't tell anyone + // about them. + console.error('failed to send error back to rust:', err) + } + } +} diff --git a/turbopack/crates/turbopack-node/src/evaluate.rs b/turbopack/crates/turbopack-node/src/evaluate.rs index e7141b73239b8f..6e0a24c215d3b3 100644 --- a/turbopack/crates/turbopack-node/src/evaluate.rs +++ b/turbopack/crates/turbopack-node/src/evaluate.rs @@ -1,4 +1,7 @@ -use std::{borrow::Cow, iter, sync::Arc, thread::available_parallelism, time::Duration}; +use std::{ + borrow::Cow, iter, process::ExitStatus, sync::Arc, thread::available_parallelism, + time::Duration, +}; use anyhow::{Result, bail}; use futures_retry::{FutureRetry, RetryPolicy}; @@ -10,7 +13,9 @@ use turbo_tasks::{ TryJoinIterExt, Vc, duration_span, fxindexmap, get_effects, trace::TraceRawVcs, }; use turbo_tasks_env::{EnvMap, ProcessEnv}; -use turbo_tasks_fs::{File, FileContent, FileSystemPath, to_sys_path}; +use turbo_tasks_fs::{ + File, FileContent, FileSystemPath, json::parse_json_with_source_context, to_sys_path, +}; use turbopack_core::{ asset::AssetContent, changed::content_changed, @@ -31,12 +36,13 @@ use turbopack_core::{ virtual_source::VirtualSource, }; +#[cfg(feature = "process_pool")] +use crate::process_pool::ChildProcessPool; +#[cfg(feature = "worker_pool")] +use crate::worker_pool::WorkerThreadPool; use crate::{ - AssetsForSourceMapping, - embed_js::embed_file_path, - emit, emit_package_json, internal_assets_for_source_mapping, - pool::{FormattingMode, NodeJsOperation, NodeJsPool}, - source_map::StructuredError, + AssetsForSourceMapping, embed_js::embed_file_path, emit, emit_package_json, + format::FormattingMode, internal_assets_for_source_mapping, source_map::StructuredError, }; #[derive(Serialize)] @@ -60,6 +66,56 @@ enum EvalJavaScriptIncomingMessage { Error(StructuredError), } +#[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)] +pub struct EvaluatePool { + pub id: RcStr, + #[turbo_tasks(trace_ignore, debug_ignore)] + pool: Box, + pub assets_for_source_mapping: ResolvedVc, + pub assets_root: FileSystemPath, + pub project_dir: FileSystemPath, +} + +impl EvaluatePool { + pub async fn operation(&self) -> Result> { + self.pool.operation().await + } +} + +impl EvaluatePool { + pub(crate) fn new( + id: RcStr, + pool: Box, + assets_for_source_mapping: ResolvedVc, + assets_root: FileSystemPath, + project_dir: FileSystemPath, + ) -> Self { + Self { + id, + pool, + assets_for_source_mapping, + assets_root, + project_dir, + } + } +} + +#[async_trait::async_trait] +pub trait EvaluateOperation: Send + Sync { + async fn operation(&self) -> Result>; +} + +#[async_trait::async_trait] +pub trait Operation: Send { + async fn recv(&mut self) -> Result; + + async fn send(&mut self, data: String) -> Result<()>; + + async fn wait_or_kill(&mut self) -> Result; + + fn disallow_reuse(&mut self) -> (); +} + #[turbo_tasks::value] struct EmittedEvaluatePoolAssets { bootstrap: ResolvedVc>, @@ -161,7 +217,7 @@ pub async fn get_evaluate_pool( additional_invalidation: ResolvedVc, debug: bool, env_var_tracking: EnvVarTracking, -) -> Result> { +) -> Result> { let operation = emit_evaluate_pool_assets_with_effects_operation(entries, chunking_context, module_graph); let EmittedEvaluatePoolAssetsWithEffects { assets, effects } = @@ -199,7 +255,21 @@ pub async fn get_evaluate_pool( env.read_all().untracked().await? } }; - let pool = NodeJsPool::new( + + #[cfg(feature = "process_pool")] + #[allow(unused_variables)] + let pool = ChildProcessPool::create( + cwd.clone(), + entrypoint.clone(), + env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(), + assets_for_source_mapping, + output_root.clone(), + chunking_context.root_path().owned().await?, + available_parallelism().map_or(1, |v| v.get()), + debug, + ); + #[cfg(feature = "worker_pool")] + let pool = WorkerThreadPool::create( cwd, entrypoint, env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(), @@ -256,7 +326,7 @@ pub trait EvaluateContext { type ResponseMessage: Serialize; type State: Default; - fn pool(&self) -> OperationVc; + fn pool(&self) -> OperationVc; fn keep_alive(&self) -> bool { false } @@ -265,24 +335,24 @@ pub trait EvaluateContext { fn emit_error( &self, error: StructuredError, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; fn info( &self, state: &mut Self::State, data: Self::InfoMessage, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; fn request( &self, state: &mut Self::State, data: Self::RequestMessage, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; fn finish( &self, state: Self::State, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> impl Future> + Send; } @@ -308,9 +378,11 @@ pub async fn custom_evaluate(evaluate_context: impl EvaluateContext) -> Result>, runtime_entries: Option>, ) -> Result> { + #[cfg(feature = "process_pool")] + #[allow(unused_variables)] + let runtime_module_path = rcstr!("child_process/evaluate.ts"); + #[cfg(feature = "worker_pool")] + let runtime_module_path = rcstr!("worker_threads/evaluate.ts"); + let runtime_asset = asset_context .process( Vc::upcast(FileSource::new( - embed_file_path(rcstr!("ipc/evaluate.ts")).owned().await?, + embed_file_path(runtime_module_path).owned().await?, )), ReferenceType::Internal(InnerAssets::empty().to_resolved().await?), ) @@ -392,22 +470,29 @@ pub async fn get_evaluate_entries( .await?; let runtime_entries = { - let globals_module = asset_context - .process( - Vc::upcast(FileSource::new( - embed_file_path(rcstr!("globals.ts")).owned().await?, - )), - ReferenceType::Internal(InnerAssets::empty().to_resolved().await?), - ) - .module(); - - let Some(globals_module) = - Vc::try_resolve_sidecast::>(globals_module).await? - else { - bail!("Internal module is not evaluatable"); - }; - - let mut entries = vec![globals_module.to_resolved().await?]; + let mut entries = vec![]; + + #[cfg(feature = "process_pool")] + { + let globals_module = asset_context + .process( + Vc::upcast(FileSource::new( + embed_file_path(rcstr!("child_process/globals.ts")) + .owned() + .await?, + )), + ReferenceType::Internal(InnerAssets::empty().to_resolved().await?), + ) + .module(); + + let Some(globals_module) = + Vc::try_resolve_sidecast::>(globals_module).await? + else { + bail!("Internal module is not evaluatable"); + }; + + entries.push(globals_module.to_resolved().await?); + } if let Some(runtime_entries) = runtime_entries { for &entry in &*runtime_entries.await? { entries.push(entry) @@ -455,18 +540,20 @@ pub async fn evaluate( .await } -/// Repeatedly pulls from the NodeJsOperation until we receive a +/// Repeatedly pulls from the ChilProcessOperation until we receive a /// value/error/end. async fn pull_operation( - operation: &mut NodeJsOperation, - pool: &NodeJsPool, + operation: &mut Box, + pool: &EvaluatePool, evaluate_context: &T, state: &mut T::State, ) -> Result> { let _guard = duration_span!("Node.js evaluation"); loop { - match operation.recv().await? { + let message = parse_json_with_source_context(&operation.recv().await?)?; + + match message { EvalJavaScriptIncomingMessage::Error(error) => { evaluate_context.emit_error(error, pool).await?; // Do not reuse the process in case of error @@ -487,20 +574,24 @@ async fn pull_operation( { Ok(response) => { operation - .send(EvalJavaScriptOutgoingMessage::Result { - id, - error: None, - data: Some(serde_json::to_value(response)?), - }) + .send(serde_json::to_string( + &EvalJavaScriptOutgoingMessage::Result { + id, + error: None, + data: Some(serde_json::to_value(response)?), + }, + )?) .await?; } Err(e) => { operation - .send(EvalJavaScriptOutgoingMessage::Result { - id, - error: Some(PrettyPrintError(&e).to_string()), - data: None, - }) + .send(serde_json::to_string( + &EvalJavaScriptOutgoingMessage::Result { + id, + error: Some(PrettyPrintError(&e).to_string()), + data: None, + }, + )?) .await?; } } @@ -528,7 +619,7 @@ impl EvaluateContext for BasicEvaluateContext { type ResponseMessage = (); type State = (); - fn pool(&self) -> OperationVc { + fn pool(&self) -> OperationVc { get_evaluate_pool( self.entries, self.cwd.clone(), @@ -553,7 +644,7 @@ impl EvaluateContext for BasicEvaluateContext { !self.args.is_empty() } - async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> { + async fn emit_error(&self, error: StructuredError, pool: &EvaluatePool) -> Result<()> { EvaluationIssue { error, source: IssueSource::from_source_only(self.context_source_for_issue), @@ -570,7 +661,7 @@ impl EvaluateContext for BasicEvaluateContext { &self, _state: &mut Self::State, _data: Self::InfoMessage, - _pool: &NodeJsPool, + _pool: &EvaluatePool, ) -> Result<()> { bail!("BasicEvaluateContext does not support info messages") } @@ -579,24 +670,16 @@ impl EvaluateContext for BasicEvaluateContext { &self, _state: &mut Self::State, _data: Self::RequestMessage, - _pool: &NodeJsPool, + _pool: &EvaluatePool, ) -> Result { bail!("BasicEvaluateContext does not support request messages") } - async fn finish(&self, _state: Self::State, _pool: &NodeJsPool) -> Result<()> { + async fn finish(&self, _state: Self::State, _pool: &EvaluatePool) -> Result<()> { Ok(()) } } -pub fn scale_zero() { - NodeJsPool::scale_zero(); -} - -pub fn scale_down() { - NodeJsPool::scale_down(); -} - /// An issue that occurred while evaluating node code. #[turbo_tasks::value(shared)] pub struct EvaluationIssue { @@ -647,3 +730,25 @@ impl Issue for EvaluationIssue { Vc::cell(Some(self.source)) } } + +pub fn scale_down() { + #[cfg(feature = "process_pool")] + { + ChildProcessPool::scale_down(); + } + #[cfg(feature = "worker_pool")] + { + WorkerThreadPool::scale_down(); + } +} + +pub fn scale_zero() { + #[cfg(feature = "process_pool")] + { + ChildProcessPool::scale_zero(); + } + #[cfg(feature = "worker_pool")] + { + WorkerThreadPool::scale_zero(); + } +} diff --git a/turbopack/crates/turbopack-node/src/format.rs b/turbopack/crates/turbopack-node/src/format.rs new file mode 100644 index 00000000000000..27dd550c28119a --- /dev/null +++ b/turbopack/crates/turbopack-node/src/format.rs @@ -0,0 +1,36 @@ +use std::fmt::Display; + +use owo_colors::{OwoColorize, Style}; + +#[derive(Clone, Copy)] +pub enum FormattingMode { + /// No formatting, just print the output + Plain, + /// Use ansi colors to format the output + AnsiColors, +} + +impl FormattingMode { + pub fn magic_identifier<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { + match self { + FormattingMode::Plain => format!("{{{content}}}"), + FormattingMode::AnsiColors => format!("{{{content}}}").italic().to_string(), + } + } + + pub fn lowlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { + match self { + FormattingMode::Plain => Style::new(), + FormattingMode::AnsiColors => Style::new().dimmed(), + } + .style(content) + } + + pub fn highlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { + match self { + FormattingMode::Plain => Style::new(), + FormattingMode::AnsiColors => Style::new().bold().underline(), + } + .style(content) + } +} diff --git a/turbopack/crates/turbopack-node/src/lib.rs b/turbopack/crates/turbopack-node/src/lib.rs index c9c043ee7cd1cc..07d44655a4d698 100644 --- a/turbopack/crates/turbopack-node/src/lib.rs +++ b/turbopack/crates/turbopack-node/src/lib.rs @@ -17,10 +17,13 @@ pub mod debug; pub mod embed_js; pub mod evaluate; pub mod execution_context; -mod heap_queue; -mod pool; +mod format; +#[cfg(feature = "process_pool")] +mod process_pool; pub mod source_map; pub mod transforms; +#[cfg(feature = "worker_pool")] +pub mod worker_pool; #[turbo_tasks::function] async fn emit( diff --git a/turbopack/crates/turbopack-node/src/heap_queue.rs b/turbopack/crates/turbopack-node/src/process_pool/heap_queue.rs similarity index 100% rename from turbopack/crates/turbopack-node/src/heap_queue.rs rename to turbopack/crates/turbopack-node/src/process_pool/heap_queue.rs diff --git a/turbopack/crates/turbopack-node/src/pool.rs b/turbopack/crates/turbopack-node/src/process_pool/mod.rs similarity index 91% rename from turbopack/crates/turbopack-node/src/pool.rs rename to turbopack/crates/turbopack-node/src/process_pool/mod.rs index 1d4532868d66e4..3d229d2287012e 100644 --- a/turbopack/crates/turbopack-node/src/pool.rs +++ b/turbopack/crates/turbopack-node/src/process_pool/mod.rs @@ -1,6 +1,6 @@ use std::{ cmp::max, - fmt::{Debug, Display}, + fmt::Debug, future::Future, mem::take, path::{Path, PathBuf}, @@ -12,10 +12,9 @@ use std::{ use anyhow::{Context, Result, bail}; use futures::join; use once_cell::sync::Lazy; -use owo_colors::{OwoColorize, Style}; +use owo_colors::OwoColorize; use parking_lot::Mutex; use rustc_hash::FxHashMap; -use serde::{Serialize, de::DeserializeOwned}; use tokio::{ io::{ AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stderr, @@ -29,43 +28,18 @@ use tokio::{ }; use turbo_rcstr::RcStr; use turbo_tasks::{FxIndexSet, ResolvedVc, Vc, duration_span}; -use turbo_tasks_fs::{FileSystemPath, json::parse_json_with_source_context}; +use turbo_tasks_fs::FileSystemPath; use turbopack_ecmascript::magic_identifier::unmangle_identifiers; -use crate::{AssetsForSourceMapping, heap_queue::HeapQueue, source_map::apply_source_mapping}; - -#[derive(Clone, Copy)] -pub enum FormattingMode { - /// No formatting, just print the output - Plain, - /// Use ansi colors to format the output - AnsiColors, -} - -impl FormattingMode { - pub fn magic_identifier<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { - match self { - FormattingMode::Plain => format!("{{{content}}}"), - FormattingMode::AnsiColors => format!("{{{content}}}").italic().to_string(), - } - } - - pub fn lowlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { - match self { - FormattingMode::Plain => Style::new(), - FormattingMode::AnsiColors => Style::new().dimmed(), - } - .style(content) - } +use crate::{ + AssetsForSourceMapping, + evaluate::{EvaluateOperation, EvaluatePool, Operation}, + format::FormattingMode, + source_map::apply_source_mapping, +}; - pub fn highlight<'a>(&self, content: impl Display + 'a) -> impl Display + 'a { - match self { - FormattingMode::Plain => Style::new(), - FormattingMode::AnsiColors => Style::new().bold().underline(), - } - .style(content) - } -} +mod heap_queue; +use heap_queue::HeapQueue; struct NodeJsPoolProcess { child: Option, @@ -686,7 +660,7 @@ static ACTIVE_POOLS: Lazy = Lazy::new(Default::default); /// The worker will *not* use the env of the parent process by default. All env /// vars need to be provided to make the execution as pure as possible. #[turbo_tasks::value(cell = "new", serialization = "none", eq = "manual", shared)] -pub struct NodeJsPool { +pub struct ChildProcessPool { cwd: PathBuf, entrypoint: PathBuf, env: FxHashMap, @@ -711,10 +685,10 @@ pub struct NodeJsPool { stats: Arc>, } -impl NodeJsPool { +impl ChildProcessPool { /// * debug: Whether to automatically enable Node's `--inspect-brk` when spawning it. Note: /// automatically overrides concurrency to 1. - pub(super) fn new( + pub fn create( cwd: PathBuf, entrypoint: PathBuf, env: FxHashMap, @@ -723,24 +697,58 @@ impl NodeJsPool { project_dir: FileSystemPath, concurrency: usize, debug: bool, - ) -> Self { - Self { - cwd, - entrypoint, - env, + ) -> EvaluatePool { + EvaluatePool::new( + entrypoint.to_string_lossy().to_string().into(), + Box::new(Self { + cwd, + entrypoint, + env, + assets_for_source_mapping, + assets_root: assets_root.clone(), + project_dir: project_dir.clone(), + concurrency_semaphore: Arc::new(Semaphore::new(if debug { + 1 + } else { + concurrency + })), + bootup_semaphore: Arc::new(Semaphore::new(1)), + idle_processes: Arc::new(HeapQueue::new()), + shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())), + shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())), + debug, + stats: Default::default(), + }), assets_for_source_mapping, assets_root, project_dir, - concurrency_semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })), - bootup_semaphore: Arc::new(Semaphore::new(1)), - idle_processes: Arc::new(HeapQueue::new()), - shared_stdout: Arc::new(Mutex::new(FxIndexSet::default())), - shared_stderr: Arc::new(Mutex::new(FxIndexSet::default())), - debug, - stats: Default::default(), - } + ) + } +} + +#[async_trait::async_trait] +impl EvaluateOperation for ChildProcessPool { + async fn operation(&self) -> Result> { + // Acquire a running process (handles concurrency limits, boots up the process) + + let operation = { + let _guard = duration_span!("Node.js operation"); + let (process, permits) = self.acquire_process().await?; + ChildProcessOperation { + process: Some(process), + permits, + idle_processes: self.idle_processes.clone(), + start: Instant::now(), + stats: self.stats.clone(), + allow_process_reuse: true, + } + }; + + Ok(Box::new(operation)) } +} +impl ChildProcessPool { async fn acquire_process(&self) -> Result<(NodeJsPoolProcess, AcquiredPermits)> { { self.stats.lock().add_queued_task(); @@ -797,20 +805,6 @@ impl NodeJsPool { Ok((process, start.elapsed())) } - pub async fn operation(&self) -> Result { - // Acquire a running process (handles concurrency limits, boots up the process) - let (process, permits) = self.acquire_process().await?; - - Ok(NodeJsOperation { - process: Some(process), - permits, - idle_processes: self.idle_processes.clone(), - start: Instant::now(), - stats: self.stats.clone(), - allow_process_reuse: true, - }) - } - pub fn scale_down() { let pools = ACTIVE_POOLS.lock().clone(); for pool in pools { @@ -826,7 +820,7 @@ impl NodeJsPool { } } -pub struct NodeJsOperation { +pub struct ChildProcessOperation { process: Option, // This is used for drop #[allow(dead_code)] @@ -837,48 +831,20 @@ pub struct NodeJsOperation { allow_process_reuse: bool, } -impl NodeJsOperation { - async fn with_process<'a, F: Future> + Send + 'a, T>( - &'a mut self, - f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F, - ) -> Result { - let process = self - .process - .as_mut() - .context("Node.js operation already finished")?; - - if !self.allow_process_reuse { - bail!("Node.js process is no longer usable"); - } - - let result = f(process).await; - if result.is_err() && self.allow_process_reuse { - self.stats.lock().remove_worker(); - self.allow_process_reuse = false; - } - result - } - - pub async fn recv(&mut self) -> Result - where - M: DeserializeOwned, - { - let message = self +#[async_trait::async_trait] +impl Operation for ChildProcessOperation { + async fn recv(&mut self) -> Result { + let vec = self .with_process(|process| async move { process.recv().await.context("failed to receive message") }) .await?; - let message = std::str::from_utf8(&message).context("message is not valid UTF-8")?; - parse_json_with_source_context(message).context("failed to deserialize message") + Ok(String::from_utf8(vec)?) } - pub async fn send(&mut self, message: M) -> Result<()> - where - M: Serialize, - { - let message = serde_json::to_vec(&message).context("failed to serialize message")?; + async fn send(&mut self, message: String) -> Result<()> { self.with_process(|process| async move { - timeout(Duration::from_secs(30), process.send(message)) + timeout(Duration::from_secs(30), process.send(message.into_bytes())) .await .context("timeout while sending message")? .context("failed to send message")?; @@ -887,7 +853,7 @@ impl NodeJsOperation { .await } - pub async fn wait_or_kill(mut self) -> Result { + async fn wait_or_kill(&mut self) -> Result { let mut process = self .process .take() @@ -912,7 +878,7 @@ impl NodeJsOperation { Ok(status) } - pub fn disallow_reuse(&mut self) { + fn disallow_reuse(&mut self) { if self.allow_process_reuse { self.stats.lock().remove_worker(); self.allow_process_reuse = false; @@ -920,7 +886,30 @@ impl NodeJsOperation { } } -impl Drop for NodeJsOperation { +impl ChildProcessOperation { + async fn with_process<'a, F: Future> + Send + 'a, T>( + &'a mut self, + f: impl FnOnce(&'a mut NodeJsPoolProcess) -> F, + ) -> Result { + let process = self + .process + .as_mut() + .context("Node.js operation already finished")?; + + if !self.allow_process_reuse { + bail!("Node.js process is no longer usable"); + } + + let result = f(process).await; + if result.is_err() && self.allow_process_reuse { + self.stats.lock().remove_worker(); + self.allow_process_reuse = false; + } + result + } +} + +impl Drop for ChildProcessOperation { fn drop(&mut self) { if let Some(mut process) = self.process.take() { let elapsed = self.start.elapsed(); diff --git a/turbopack/crates/turbopack-node/src/source_map/mod.rs b/turbopack/crates/turbopack-node/src/source_map/mod.rs index 78d67cb8f59920..1689b58ef7ba91 100644 --- a/turbopack/crates/turbopack-node/src/source_map/mod.rs +++ b/turbopack/crates/turbopack-node/src/source_map/mod.rs @@ -20,7 +20,7 @@ use turbopack_core::{ use turbopack_ecmascript::magic_identifier::unmangle_identifiers; pub use crate::source_map::trace::{StackFrame, TraceResult, trace_source_map}; -use crate::{AssetsForSourceMapping, pool::FormattingMode}; +use crate::{AssetsForSourceMapping, format::FormattingMode}; pub mod trace; diff --git a/turbopack/crates/turbopack-node/src/transforms/webpack.rs b/turbopack/crates/turbopack-node/src/transforms/webpack.rs index d1576e0fd8160c..5248e93324c826 100644 --- a/turbopack/crates/turbopack-node/src/transforms/webpack.rs +++ b/turbopack/crates/turbopack-node/src/transforms/webpack.rs @@ -53,11 +53,11 @@ use crate::{ debug::should_debug, embed_js::embed_file_path, evaluate::{ - EnvVarTracking, EvaluateContext, EvaluateEntries, EvaluationIssue, custom_evaluate, - get_evaluate_entries, get_evaluate_pool, + EnvVarTracking, EvaluateContext, EvaluateEntries, EvaluatePool, EvaluationIssue, + custom_evaluate, get_evaluate_entries, get_evaluate_pool, }, execution_context::ExecutionContext, - pool::{FormattingMode, NodeJsPool}, + format::FormattingMode, source_map::{StackFrame, StructuredError}, transforms::util::{EmittedAsset, emitted_assets_to_virtual_sources}, }; @@ -449,7 +449,7 @@ impl EvaluateContext for WebpackLoaderContext { type ResponseMessage = ResponseMessage; type State = Vec; - fn pool(&self) -> OperationVc { + fn pool(&self) -> OperationVc { get_evaluate_pool( self.entries, self.cwd.clone(), @@ -477,7 +477,7 @@ impl EvaluateContext for WebpackLoaderContext { true } - async fn emit_error(&self, error: StructuredError, pool: &NodeJsPool) -> Result<()> { + async fn emit_error(&self, error: StructuredError, pool: &EvaluatePool) -> Result<()> { EvaluationIssue { error, source: IssueSource::from_source_only(self.context_source_for_issue), @@ -494,7 +494,7 @@ impl EvaluateContext for WebpackLoaderContext { &self, state: &mut Self::State, data: Self::InfoMessage, - pool: &NodeJsPool, + pool: &EvaluatePool, ) -> Result<()> { match data { InfoMessage::Dependencies { @@ -570,7 +570,7 @@ impl EvaluateContext for WebpackLoaderContext { &self, _state: &mut Self::State, data: Self::RequestMessage, - _pool: &NodeJsPool, + _pool: &EvaluatePool, ) -> Result { match data { RequestMessage::Resolve { @@ -624,7 +624,7 @@ impl EvaluateContext for WebpackLoaderContext { } } - async fn finish(&self, state: Self::State, pool: &NodeJsPool) -> Result<()> { + async fn finish(&self, state: Self::State, pool: &EvaluatePool) -> Result<()> { let has_errors = state.iter().any(|log| log.log_type == LogType::Error); let has_warnings = state.iter().any(|log| log.log_type == LogType::Warn); if has_errors || has_warnings { diff --git a/turbopack/crates/turbopack-node/src/worker_pool/mod.rs b/turbopack/crates/turbopack-node/src/worker_pool/mod.rs new file mode 100644 index 00000000000000..d953aaa98c9385 --- /dev/null +++ b/turbopack/crates/turbopack-node/src/worker_pool/mod.rs @@ -0,0 +1,127 @@ +use std::{ + path::PathBuf, + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, +}; + +use anyhow::Result; +use rustc_hash::FxHashMap; +use turbo_rcstr::{RcStr, rcstr}; +use turbo_tasks::{ResolvedVc, duration_span}; +use turbo_tasks_fs::FileSystemPath; + +use crate::{ + AssetsForSourceMapping, + evaluate::{EvaluateOperation, EvaluatePool, Operation}, + worker_pool::operation::{ + PoolOptions, WorkerOperation, connect_to_worker, create_or_scale_pool, + kill_schedule_channels, + }, +}; + +mod operation; +mod worker_thread; + +static OPERATION_TASK_ID: AtomicU32 = AtomicU32::new(1); + +#[turbo_tasks::value] +pub(crate) struct WorkerThreadPool { + cwd: PathBuf, + entrypoint: PathBuf, + env: Arc>, + concurrency: usize, + pub(crate) assets_for_source_mapping: ResolvedVc, + pub(crate) assets_root: FileSystemPath, + pub(crate) project_dir: FileSystemPath, +} + +impl WorkerThreadPool { + pub(crate) fn create( + cwd: PathBuf, + entrypoint: PathBuf, + env: FxHashMap, + assets_for_source_mapping: ResolvedVc, + assets_root: FileSystemPath, + project_dir: FileSystemPath, + concurrency: usize, + debug: bool, + ) -> EvaluatePool { + EvaluatePool::new( + entrypoint.to_string_lossy().to_string().into(), + Box::new(Self { + cwd, + entrypoint, + env: Arc::new(env), + concurrency: (if debug { 1 } else { concurrency }), + assets_for_source_mapping, + assets_root: assets_root.clone(), + project_dir: project_dir.clone(), + }), + assets_for_source_mapping, + assets_root, + project_dir, + ) + } +} + +impl WorkerThreadPool { + pub fn scale_down() { + napi::bindgen_prelude::spawn(async { + let _ = create_or_scale_pool(PoolOptions { + filename: rcstr!("*"), + concurrency: 1, + ..Default::default() + }) + .await; + }); + } + + pub fn scale_zero() { + napi::bindgen_prelude::spawn(async { + let _ = create_or_scale_pool(PoolOptions { + // Wildcard of "*" meaning to scale all of pools even with different poolId + filename: rcstr!("*"), + concurrency: 0, + ..Default::default() + }) + .await; + kill_schedule_channels().await; + }); + } +} + +#[async_trait::async_trait] +impl EvaluateOperation for WorkerThreadPool { + async fn operation(&self) -> Result> { + let operation = { + let _guard = duration_span!("Node.js operation"); + let pool_id: RcStr = self.entrypoint.to_string_lossy().into(); + + create_or_scale_pool(PoolOptions { + filename: pool_id.clone(), + concurrency: self.concurrency as u32, + env: self.env.clone(), + cwd: self.cwd.to_string_lossy().into(), + }) + .await?; + + let task_id = OPERATION_TASK_ID.fetch_add(1, Ordering::Release); + + if task_id == 0 { + panic!("operation task id overflow") + } + + let worker_id = connect_to_worker(pool_id.clone(), task_id).await?; + + WorkerOperation { + pool_id, + task_id, + worker_id, + } + }; + + Ok(Box::new(operation)) + } +} diff --git a/turbopack/crates/turbopack-node/src/worker_pool/operation.rs b/turbopack/crates/turbopack-node/src/worker_pool/operation.rs new file mode 100644 index 00000000000000..6888612da23951 --- /dev/null +++ b/turbopack/crates/turbopack-node/src/worker_pool/operation.rs @@ -0,0 +1,269 @@ +use std::{ + process::ExitStatus, + sync::{Arc, LazyLock}, +}; + +use anyhow::{Context, Result}; +use async_channel::{Receiver, Sender, unbounded}; +use dashmap::DashMap; +use rustc_hash::FxHashMap; +use turbo_rcstr::RcStr; + +use crate::evaluate::Operation; + +#[derive(Clone)] +pub(crate) struct MessageChannel { + sender: Sender, + receiver: Receiver, +} + +impl MessageChannel { + pub(super) fn unbounded() -> Self { + let (sender, receiver) = unbounded::(); + Self { sender, receiver } + } +} + +impl MessageChannel { + pub(crate) async fn send(&self, data: T) -> Result<()> { + Ok(self.sender.send(data).await?) + } + + pub(crate) async fn recv(&self) -> Result { + Ok(self.receiver.recv().await?) + } + + pub(crate) async fn close(&self) { + self.sender.closed().await; + self.receiver.close(); + } +} + +#[derive(Default)] +pub(super) struct PoolOptions { + pub(super) filename: RcStr, + pub(super) concurrency: u32, + pub(super) env: Arc>, + pub(super) cwd: RcStr, +} + +pub(crate) struct WorkerPoolOperation { + pool_request_channel: MessageChannel, + worker_termination_channel: MessageChannel<(RcStr, u32)>, + worker_request_channel: DashMap>, + worker_ack_channel: DashMap>, + worker_routed_channel: DashMap>, + task_routed_channel: DashMap>, +} + +impl Default for WorkerPoolOperation { + fn default() -> Self { + Self { + pool_request_channel: MessageChannel::unbounded(), + worker_termination_channel: MessageChannel::unbounded(), + worker_request_channel: DashMap::new(), + worker_ack_channel: DashMap::new(), + worker_routed_channel: DashMap::new(), + task_routed_channel: DashMap::new(), + } + } +} + +impl WorkerPoolOperation { + pub(crate) async fn create_or_scale_pool(&self, pool_options: PoolOptions) -> Result<()> { + self.pool_request_channel + .send(pool_options) + .await + .context("failed to send pool request")?; + + Ok(()) + } + + pub(crate) async fn connect_to_worker(&self, pool_id: RcStr, task_id: u32) -> Result { + let channel = self + .worker_request_channel + .entry(pool_id.clone()) + .or_insert_with(MessageChannel::unbounded) + .clone(); + channel + .send(task_id) + .await + .context("failed to send worker request")?; + let worker_id = async move { + let channel = self + .worker_ack_channel + .entry(task_id) + .or_insert_with(MessageChannel::unbounded) + .clone(); + channel.recv().await.context("failed to recv worker ack") + } + .await?; + Ok(worker_id) + } + + pub(crate) async fn send_worker_termination( + &self, + pool_id: RcStr, + worker_id: u32, + ) -> Result<()> { + self.worker_termination_channel + .send((pool_id, worker_id)) + .await + .context("failed to send worker termination") + } + + pub(crate) async fn recv_worker_termination(&self) -> Result<(RcStr, u32)> { + self.worker_termination_channel + .recv() + .await + .context("failed to recv worker termination") + } + + pub(crate) async fn send_message_to_worker(&self, worker_id: u32, data: String) -> Result<()> { + let channel = self + .worker_routed_channel + .entry(worker_id) + .or_insert_with(MessageChannel::unbounded) + .clone(); + channel + .send(data) + .await + .context("failed to send message to worker")?; + Ok(()) + } + + pub async fn recv_task_response(&self, task_id: u32) -> Result { + let channel = self + .task_routed_channel + .entry(task_id) + .or_insert_with(MessageChannel::unbounded) + .clone(); + let data = channel + .recv() + .await + .context("failed to recv task message")?; + Ok(data) + } + + pub(crate) async fn recv_pool_request(&self) -> Result { + self.pool_request_channel + .recv() + .await + .context("failed to recv pool request") + } + + pub(crate) async fn kill_schedule_channels(&self) { + // We need to close channels connected to schedule thread, + // or else, it will be forever waiting in schedule thread + self.pool_request_channel.close().await; + self.worker_termination_channel.close().await; + } + + pub(crate) async fn recv_worker_request(&self, pool_id: RcStr) -> Result { + let channel = self + .worker_request_channel + .entry(pool_id.clone()) + .or_insert_with(MessageChannel::unbounded) + .clone(); + channel + .recv() + .await + .context("failed to recv worker request") + } + + pub(crate) async fn notify_worker_ack(&self, task_id: u32, worker_id: u32) -> Result<()> { + let channel = self + .worker_ack_channel + .get(&task_id) + .with_context(|| format!("worker ack channel for {task_id} not found"))?; + channel + .send(worker_id) + .await + .context("failed to notify worker ack") + } + + pub(crate) async fn recv_message_in_worker(&self, worker_id: u32) -> Result { + let channel = self + .worker_routed_channel + .entry(worker_id) + .or_insert_with(MessageChannel::unbounded) + .clone(); + channel + .recv() + .await + .with_context(|| format!("failed to recv message in worker {worker_id}")) + } + + pub(crate) async fn send_task_message(&self, task_id: u32, data: String) -> Result<()> { + let channel = self + .task_routed_channel + .entry(task_id) + .or_insert_with(MessageChannel::unbounded) + .clone(); + channel + .send(data) + .await + .with_context(|| format!("failed to send response for task {task_id}")) + } +} + +pub(crate) static WORKER_POOL_OPERATION: LazyLock = + LazyLock::new(WorkerPoolOperation::default); + +pub(crate) async fn create_or_scale_pool(pool_options: PoolOptions) -> Result<()> { + WORKER_POOL_OPERATION + .create_or_scale_pool(pool_options) + .await +} + +pub(crate) async fn connect_to_worker(pool_id: RcStr, task_id: u32) -> Result { + WORKER_POOL_OPERATION + .connect_to_worker(pool_id, task_id) + .await +} + +pub(crate) async fn send_message_to_worker(worker_id: u32, data: String) -> Result<()> { + WORKER_POOL_OPERATION + .send_message_to_worker(worker_id, data) + .await +} + +pub(crate) async fn send_worker_termination(pool_id: RcStr, worker_id: u32) -> Result<()> { + WORKER_POOL_OPERATION + .send_worker_termination(pool_id, worker_id) + .await +} + +pub(crate) async fn recv_task_message(task_id: u32) -> Result { + WORKER_POOL_OPERATION.recv_task_response(task_id).await +} + +pub(crate) async fn kill_schedule_channels() { + WORKER_POOL_OPERATION.kill_schedule_channels().await; +} + +pub(crate) struct WorkerOperation { + pub(crate) pool_id: RcStr, + pub(crate) task_id: u32, + pub(crate) worker_id: u32, +} + +#[async_trait::async_trait] +impl Operation for WorkerOperation { + async fn recv(&mut self) -> Result { + recv_task_message(self.task_id).await + } + + async fn send(&mut self, data: String) -> Result<()> { + send_message_to_worker(self.worker_id, data).await + } + + async fn wait_or_kill(&mut self) -> Result { + send_worker_termination(self.pool_id.clone(), self.worker_id).await?; + Ok(ExitStatus::default()) + } + + fn disallow_reuse(&mut self) { + // do nothing + } +} diff --git a/turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs b/turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs new file mode 100644 index 00000000000000..0f8381cc34f59f --- /dev/null +++ b/turbopack/crates/turbopack-node/src/worker_pool/worker_thread.rs @@ -0,0 +1,90 @@ +use std::sync::Arc; + +use napi_derive::napi; +use rustc_hash::FxHashMap; +use turbo_rcstr::RcStr; + +use crate::worker_pool::{PoolOptions, operation::WORKER_POOL_OPERATION}; + +#[napi(object)] +#[allow(unused)] +pub struct NapiPoolOptions { + pub filename: RcStr, + pub concurrency: u32, + #[napi(ts_type = "Record")] + pub env: Arc>, + pub cwd: RcStr, +} + +impl From for NapiPoolOptions { + fn from(pool_options: PoolOptions) -> Self { + let PoolOptions { + filename, + concurrency, + env, + cwd, + } = pool_options; + NapiPoolOptions { + filename, + concurrency, + env, + cwd, + } + } +} + +#[napi(object)] +#[allow(unused)] +pub struct WorkerTermination { + pub filename: RcStr, + pub worker_id: u32, +} + +#[napi] +#[allow(unused)] +pub async fn recv_pool_request() -> napi::Result { + let pool_options = WORKER_POOL_OPERATION.recv_pool_request().await?; + + Ok(pool_options.into()) +} + +#[napi] +#[allow(unused)] +pub async fn recv_worker_termination() -> napi::Result { + let (filename, worker_id) = WORKER_POOL_OPERATION.recv_worker_termination().await?; + Ok(WorkerTermination { + filename, + worker_id, + }) +} + +#[napi] +#[allow(unused)] +pub async fn recv_worker_request(filename: RcStr) -> napi::Result { + Ok(WORKER_POOL_OPERATION.recv_worker_request(filename).await?) +} + +#[napi] +#[allow(unused)] +// TODO: use zero-copy externaled type array +pub async fn recv_message_in_worker(worker_id: u32) -> napi::Result { + Ok(WORKER_POOL_OPERATION + .recv_message_in_worker(worker_id) + .await?) +} + +#[napi] +#[allow(unused)] +pub async fn notify_worker_ack(task_id: u32, worker_id: u32) -> napi::Result<()> { + Ok(WORKER_POOL_OPERATION + .notify_worker_ack(task_id, worker_id) + .await?) +} + +#[napi] +#[allow(unused)] +pub async fn send_task_message(task_id: u32, message: String) -> napi::Result<()> { + Ok(WORKER_POOL_OPERATION + .send_task_message(task_id, message) + .await?) +} diff --git a/turbopack/crates/turbopack-tests/Cargo.toml b/turbopack/crates/turbopack-tests/Cargo.toml index f7f3c66292d202..09726dd9fcfa53 100644 --- a/turbopack/crates/turbopack-tests/Cargo.toml +++ b/turbopack/crates/turbopack-tests/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] turbopack = { workspace = true } +turbopack-node = { workspace = true, default-features = false, features = ["process_pool"]} [dev-dependencies] anyhow = { workspace = true } @@ -42,7 +43,7 @@ turbopack-ecmascript-plugins = { workspace = true, features = [ ] } turbopack-ecmascript-runtime = { workspace = true } turbopack-env = { workspace = true } -turbopack-node = { workspace = true } +turbopack-node = { workspace = true, default-features = false, features = ["process_pool"]} turbopack-nodejs = { workspace = true, features = ["test"] } turbopack-resolve = { workspace = true } turbopack-test-utils = { workspace = true } diff --git a/turbopack/crates/turbopack/Cargo.toml b/turbopack/crates/turbopack/Cargo.toml index a806180a409f79..58d27ae3090536 100644 --- a/turbopack/crates/turbopack/Cargo.toml +++ b/turbopack/crates/turbopack/Cargo.toml @@ -36,7 +36,7 @@ turbopack-ecmascript = { workspace = true } turbopack-env = { workspace = true } turbopack-json = { workspace = true } turbopack-mdx = { workspace = true } -turbopack-node = { workspace = true } +turbopack-node = { workspace = true, default-features = false } turbopack-resolve = { workspace = true } turbopack-static = { workspace = true } turbopack-wasm = { workspace = true }