From 4c0b3a54760bb15d0afc90bb70bb5f6ddc8a62af Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 19:30:11 +0100 Subject: [PATCH 1/7] feat: pipeline --- packages/pg/lib/client.js | 284 +++++++++++++++++- packages/pg/lib/connection-parameters.js | 9 + packages/pg/lib/defaults.js | 13 + .../test/integration/client/pipeline-tests.js | 122 ++++++++ 4 files changed, 419 insertions(+), 9 deletions(-) create mode 100644 packages/pg/test/integration/client/pipeline-tests.js diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 459439037..c24b21e50 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -13,23 +13,23 @@ const Connection = require('./connection') const crypto = require('./crypto/utils') const activeQueryDeprecationNotice = nodeUtils.deprecate( - () => {}, + () => { }, 'Client.activeQuery is deprecated and will be removed in a future version.' ) const queryQueueDeprecationNotice = nodeUtils.deprecate( - () => {}, + () => { }, 'Client.queryQueue is deprecated and will be removed in a future version.' ) const pgPassDeprecationNotice = nodeUtils.deprecate( - () => {}, + () => { }, 'pgpass support is deprecated and will be removed in a future version. ' + - 'You can provide an async function as the password property to the Client/Pool constructor that returns a password instead. Within this funciton you can call the pgpass module in your own code.' + 'You can provide an async function as the password property to the Client/Pool constructor that returns a password instead. Within this funciton you can call the pgpass module in your own code.' ) const byoPromiseDeprecationNotice = nodeUtils.deprecate( - () => {}, + () => { }, 'Passing a custom Promise implementation to the Client/Pool constructor is deprecated and will be removed in a future version.' ) @@ -94,6 +94,18 @@ class Client extends EventEmitter { } this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 + + // Pipeline mode configuration + this._pipelineMode = this.connectionParameters.pipelineMode + this._pipelineBatchSize = this.connectionParameters.pipelineBatchSize + this._pipelineBatchTimeout = this.connectionParameters.pipelineBatchTimeout + + // Pipeline state + // Pipeline state + this._activePipelineBatch = [] // Queries being buffered (sent but no Sync) + this._pendingPipelineBatches = [] // Batches sent with Sync, awaiting results + this._pipelineSyncTimer = null + this._pipelineCurrentIndex = 0 // Index into _pendingPipelineBatches[0] } get activeQuery() { @@ -110,6 +122,13 @@ class Client extends EventEmitter { return this._activeQuery } + /** + * Check if pipeline mode is enabled + */ + get pipelineMode() { + return this._pipelineMode + } + _errorAllQueries(err) { const enqueueError = (query) => { process.nextTick(() => { @@ -123,6 +142,13 @@ class Client extends EventEmitter { this._activeQuery = null } + // Clear pipeline pending queries + this._activePipelineBatch.forEach(enqueueError) + this._activePipelineBatch.length = 0 + this._pendingPipelineBatches.forEach(batch => batch.forEach(enqueueError)) + this._pendingPipelineBatches.length = 0 + this._pipelineCurrentIndex = 0 + this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 } @@ -354,6 +380,19 @@ class Client extends EventEmitter { } this.emit('connect') } + + if (this._pipelineMode) { + // In pipeline mode, ReadyForQuery marks end of a pipeline batch sync + // The first batch in _pendingPipelineBatches is now complete + if (this._pendingPipelineBatches.length > 0) { + this._pendingPipelineBatches.shift() + } + this._pipelineCurrentIndex = 0 + this.readyForQuery = true + this._pulseQueryQueue() + return + } + const activeQuery = this._getActiveQuery() this._activeQuery = null this.readyForQuery = true @@ -395,6 +434,26 @@ class Client extends EventEmitter { if (this._connecting) { return this._handleErrorWhileConnecting(msg) } + + if (this._pipelineMode && this._pendingPipelineBatches.length > 0) { + // In pipeline mode, error aborts the current query in the current batch + // Server will skip remaining commands until Sync + const batch = this._pendingPipelineBatches[0] + const query = batch[this._pipelineCurrentIndex] + if (query) { + query.handleError(msg, this.connection) + } + // Mark remaining pending queries IN THIS BATCH as aborted + for (let i = this._pipelineCurrentIndex + 1; i < batch.length; i++) { + const pendingQuery = batch[i] + const abortError = new Error('Query aborted due to pipeline error') + abortError.pipelineAborted = true + abortError.originalError = msg + pendingQuery.handleError(abortError, this.connection) + } + return + } + const activeQuery = this._getActiveQuery() if (!activeQuery) { @@ -407,6 +466,14 @@ class Client extends EventEmitter { } _handleRowDescription(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleRowDescription(msg) + } + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected rowDescription message from backend.') @@ -418,6 +485,14 @@ class Client extends EventEmitter { } _handleDataRow(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleDataRow(msg) + } + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected dataRow message from backend.') @@ -440,6 +515,16 @@ class Client extends EventEmitter { } _handleEmptyQuery(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleEmptyQuery(this.connection) + query.handleReadyForQuery(this.connection) + } + this._pipelineCurrentIndex++ + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected emptyQuery message from backend.') @@ -451,6 +536,18 @@ class Client extends EventEmitter { } _handleCommandComplete(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleCommandComplete(msg, this.connection) + // In pipeline mode, commandComplete marks query completion + // Signal readyForQuery to the individual query + query.handleReadyForQuery(this.connection) + } + this._pipelineCurrentIndex++ + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected commandComplete message from backend.') @@ -462,6 +559,14 @@ class Client extends EventEmitter { } _handleParseComplete() { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query && query.name) { + this.connection.parsedStatements[query.name] = query.text + } + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected parseComplete message from backend.') @@ -574,6 +679,10 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + if (this._pipelineMode) { + return this._pulsePipelineQueue() + } + if (this.readyForQuery === true) { this._activeQuery = this._queryQueue.shift() const activeQuery = this._getActiveQuery() @@ -596,6 +705,152 @@ class Client extends EventEmitter { } } + /** + * Pipeline mode query queue processing. + * Sends queries without waiting for individual ReadyForQuery responses. + */ + /** + * Pipeline mode query queue processing. + * Sends queries without waiting for individual ReadyForQuery responses. + */ + _pulsePipelineQueue() { + // If no queries to send, check if we need to drain or sync current batch + if (this._queryQueue.length === 0) { + if (this._activePipelineBatch.length > 0) { + // Queue is empty but we have unsynced queries, force a sync + this._sendPipelineSync() + } else if (this.hasExecuted && this._pendingPipelineBatches.length === 0) { + this.emit('drain') + } + return + } + + //Cork the stream to batch all messages + this.connection.stream.cork && this.connection.stream.cork() + + try { + // Send queries up to batch size + while (this._queryQueue.length > 0 && this._activePipelineBatch.length < this._pipelineBatchSize) { + const query = this._queryQueue.shift() + this._activePipelineBatch.push(query) + + // Submit query without sync + const queryError = this._submitPipelineQuery(query) + if (queryError) { + // Handle submit error immediately + this._activePipelineBatch.pop() + process.nextTick(() => { + query.handleError(queryError, this.connection) + }) + continue + } + } + + // Check if we should sync + const isFull = this._activePipelineBatch.length >= this._pipelineBatchSize + const isEmpty = this._queryQueue.length === 0 + const useTimeout = this._pipelineBatchTimeout > 0 + + if (isFull) { + this._sendPipelineSync() + } else if (isEmpty && !useTimeout) { + // Default behavior: sync immediately if empty and no timeout + this._sendPipelineSync() + } else if (useTimeout) { + // Schedule sync (wait for more queries or timeout) + this._schedulePipelineSync() + } + } finally { + // Uncork to send all buffered data + this.connection.stream.uncork && this.connection.stream.uncork() + } + } + + /** + * Submit a query in pipeline mode (without sync). + * Returns an error if submit fails, null otherwise. + */ + _submitPipelineQuery(query) { + const connection = this.connection + + this.hasExecuted = true + this.readyForQuery = false + + if (query.requiresPreparation()) { + // Extended query protocol + try { + if (!query.hasBeenParsed(connection)) { + connection.parse({ + text: query.text, + name: query.name, + types: query.types, + }) + } + + connection.bind({ + portal: query.portal || '', + statement: query.name || '', + values: query.values || [], + binary: query.binary, + valueMapper: utils.prepareValue, + }) + + connection.describe({ + type: 'P', + name: query.portal || '', + }) + + connection.execute({ + portal: query.portal || '', + rows: query.rows || 0, + }) + } catch (err) { + return err + } + } else { + // Simple query (can't be truly pipelined, but we queue it) + connection.query(query.text) + } + + return null + } + + _schedulePipelineSync() { + if (this._pipelineSyncTimer) return + + this._pipelineSyncTimer = setTimeout(() => { + this._pipelineSyncTimer = null + if (this._activePipelineBatch.length > 0) { + this._sendPipelineSync() + } + }, this._pipelineBatchTimeout) + } + + /** + * Send a Sync message to mark pipeline boundary + */ + _sendPipelineSync() { + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + this._pipelineSyncTimer = null + } + + // Move active batch to pending batches + if (this._activePipelineBatch.length > 0) { + this._pendingPipelineBatches.push(this._activePipelineBatch) + this._activePipelineBatch = [] + this.connection.sync() + } + } + + /** + * Get the current query being processed in pipeline mode + */ + _getCurrentPipelineQuery() { + const currentBatch = this._pendingPipelineBatches[0] + return currentBatch ? currentBatch[this._pipelineCurrentIndex] : null + } + query(config, values, callback) { // can take in strings, config object or query object let query @@ -628,7 +883,7 @@ class Client extends EventEmitter { } if (readTimeout) { - queryCallback = query.callback || (() => {}) + queryCallback = query.callback || (() => { }) readTimeoutTimer = setTimeout(() => { const error = new Error('Query read timeout') @@ -641,7 +896,7 @@ class Client extends EventEmitter { // we already returned an error, // just do nothing if query completes - query.callback = () => {} + query.callback = () => { } // Remove from queue const index = this._queryQueue.indexOf(query) @@ -696,6 +951,12 @@ class Client extends EventEmitter { end(cb) { this._ending = true + // Clear pipeline timer + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + this._pipelineSyncTimer = null + } + // if we have never connected, then end is a noop, callback immediately if (!this.connection._connecting || this._ended) { if (cb) { @@ -705,8 +966,13 @@ class Client extends EventEmitter { } } - if (this._getActiveQuery() || !this._queryable) { - // if we have an active query we need to force a disconnect + if ( + this._getActiveQuery() || + this._activePipelineBatch.length > 0 || + this._pendingPipelineBatches.length > 0 || + !this._queryable + ) { + // if we have an active query or pipeline queries we need to force a disconnect // on the socket - otherwise a hung query could block end forever this.connection.stream.destroy() } else { diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index c153932bb..16ec4dee9 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -126,6 +126,15 @@ class ConnectionParameters { if (typeof config.keepAliveInitialDelayMillis === 'number') { this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) } + + // Pipeline mode configuration + this.pipelineMode = config.pipelineMode !== undefined ? Boolean(config.pipelineMode) : defaults.pipelineMode + this.pipelineBatchSize = config.pipelineBatchSize !== undefined + ? parseInt(config.pipelineBatchSize, 10) + : defaults.pipelineBatchSize + this.pipelineBatchTimeout = config.pipelineBatchTimeout !== undefined + ? parseInt(config.pipelineBatchTimeout, 10) + : defaults.pipelineBatchTimeout } getLibpqConnectionString(cb) { diff --git a/packages/pg/lib/defaults.js b/packages/pg/lib/defaults.js index 673696f79..eb7e93f9a 100644 --- a/packages/pg/lib/defaults.js +++ b/packages/pg/lib/defaults.js @@ -77,6 +77,19 @@ module.exports = { keepalives: 1, keepalives_idle: 0, + + // Pipeline mode - send multiple queries without waiting for individual responses + // false = disabled (default), true = enabled + pipelineMode: false, + + // Maximum number of queries to batch before sending a Sync message + // Only applies when pipelineMode is true + pipelineBatchSize: 100, + + // Timeout in milliseconds before automatically sending a Sync message + // Set to 0 to disable (sync only on batch size or when queue is empty) + // Only applies when pipelineMode is true + pipelineBatchTimeout: 0, } const pgTypes = require('pg-types') diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js new file mode 100644 index 000000000..82829af65 --- /dev/null +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -0,0 +1,122 @@ +'use strict' + +const assert = require('assert') +const helper = require('./helper') + +const suite = new helper.Suite() + +suite.test('pipeline mode configuration defaults', (done) => { + const client = new helper.Client() + assert.strictEqual(client.pipelineMode, false, 'pipelineMode should default to false') + assert.strictEqual(client._pipelineBatchSize, 100, 'pipelineBatchSize should default to 100') + assert.strictEqual(client._pipelineBatchTimeout, 0, 'pipelineBatchTimeout should default to 0') + done() +}) + +suite.test('pipeline mode can be enabled via config', (done) => { + const client = new helper.Client({ + pipelineMode: true, + pipelineBatchSize: 50, + pipelineBatchTimeout: 100 + }) + assert.strictEqual(client.pipelineMode, true, 'pipelineMode should be true') + assert.strictEqual(client._pipelineBatchSize, 50, 'pipelineBatchSize should be 50') + assert.strictEqual(client._pipelineBatchTimeout, 100, 'pipelineBatchTimeout should be 100') + done() +}) + +suite.testAsync('pipeline mode executes multiple queries', async () => { + const client = new helper.Client({ + pipelineMode: true + }) + await client.connect() + + try { + // Create a test table + await client.query('CREATE TEMP TABLE pipeline_test (id SERIAL PRIMARY KEY, val TEXT)') + + // Execute multiple queries in parallel (pipeline mode batches them) + const [r1, r2, r3] = await Promise.all([ + client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['first']), + client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['second']), + client.query('SELECT count(*)::int AS count FROM pipeline_test') + ]) + + assert.strictEqual(r1.rows[0].id, 1, 'first insert should return id 1') + assert.strictEqual(r2.rows[0].id, 2, 'second insert should return id 2') + // Note: count may be 0, 1, or 2 depending on order - pipeline doesn't guarantee order + // The key test is that all queries complete successfully + assert.ok(typeof r3.rows[0].count === 'number', 'count should be a number') + } finally { + await client.end() + } +}) + +suite.testAsync('pipeline mode handles errors correctly', async () => { + const client = new helper.Client({ + pipelineMode: true + }) + await client.connect() + + try { + // Execute multiple queries where one will fail + const results = await Promise.allSettled([ + client.query('SELECT 1 AS num'), + client.query('SELECT * FROM nonexistent_table_12345'), // This should fail + client.query('SELECT 2 AS num') + ]) + + assert.strictEqual(results[0].status, 'fulfilled', 'first query should succeed') + assert.strictEqual(results[0].value.rows[0].num, 1) + + assert.strictEqual(results[1].status, 'rejected', 'second query should fail') + + // The third query might succeed or be aborted depending on timing + // In pipeline mode with error, remaining queries in the batch are aborted + } finally { + await client.end() + } +}) + +suite.testAsync('pipeline mode with prepared statements', async () => { + const client = new helper.Client({ + pipelineMode: true + }) + await client.connect() + + try { + // Create a test table + await client.query('CREATE TEMP TABLE prep_test (id SERIAL PRIMARY KEY, val TEXT)') + + // Execute multiple inserts with the same prepared statement name + const results = await Promise.all([ + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['a'] }), + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['b'] }), + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['c'] }) + ]) + + assert.strictEqual(results.length, 3, 'should have 3 results') + results.forEach((r, i) => { + assert.strictEqual(r.rows[0].id, i + 1, `insert ${i + 1} should return correct id`) + }) + } finally { + await client.end() + } +}) + +suite.testAsync('non-pipeline mode still works', async () => { + const client = new helper.Client({ + pipelineMode: false + }) + await client.connect() + + try { + const r1 = await client.query('SELECT 1 AS num') + const r2 = await client.query('SELECT 2 AS num') + + assert.strictEqual(r1.rows[0].num, 1) + assert.strictEqual(r2.rows[0].num, 2) + } finally { + await client.end() + } +}) From ddfd61dccdf049e55d08bc34706a30a9a8b71c37 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 19:43:33 +0100 Subject: [PATCH 2/7] fix: lint --- packages/pg/lib/client.js | 14 +- packages/pg/lib/connection-parameters.js | 8 +- .../test/integration/client/pipeline-tests.js | 194 +++++++++--------- 3 files changed, 106 insertions(+), 110 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index c24b21e50..926ff3e4a 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -13,23 +13,23 @@ const Connection = require('./connection') const crypto = require('./crypto/utils') const activeQueryDeprecationNotice = nodeUtils.deprecate( - () => { }, + () => {}, 'Client.activeQuery is deprecated and will be removed in a future version.' ) const queryQueueDeprecationNotice = nodeUtils.deprecate( - () => { }, + () => {}, 'Client.queryQueue is deprecated and will be removed in a future version.' ) const pgPassDeprecationNotice = nodeUtils.deprecate( - () => { }, + () => {}, 'pgpass support is deprecated and will be removed in a future version. ' + 'You can provide an async function as the password property to the Client/Pool constructor that returns a password instead. Within this funciton you can call the pgpass module in your own code.' ) const byoPromiseDeprecationNotice = nodeUtils.deprecate( - () => { }, + () => {}, 'Passing a custom Promise implementation to the Client/Pool constructor is deprecated and will be removed in a future version.' ) @@ -145,7 +145,7 @@ class Client extends EventEmitter { // Clear pipeline pending queries this._activePipelineBatch.forEach(enqueueError) this._activePipelineBatch.length = 0 - this._pendingPipelineBatches.forEach(batch => batch.forEach(enqueueError)) + this._pendingPipelineBatches.forEach((batch) => batch.forEach(enqueueError)) this._pendingPipelineBatches.length = 0 this._pipelineCurrentIndex = 0 @@ -883,7 +883,7 @@ class Client extends EventEmitter { } if (readTimeout) { - queryCallback = query.callback || (() => { }) + queryCallback = query.callback || (() => {}) readTimeoutTimer = setTimeout(() => { const error = new Error('Query read timeout') @@ -896,7 +896,7 @@ class Client extends EventEmitter { // we already returned an error, // just do nothing if query completes - query.callback = () => { } + query.callback = () => {} // Remove from queue const index = this._queryQueue.indexOf(query) diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 16ec4dee9..898402708 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -129,12 +129,8 @@ class ConnectionParameters { // Pipeline mode configuration this.pipelineMode = config.pipelineMode !== undefined ? Boolean(config.pipelineMode) : defaults.pipelineMode - this.pipelineBatchSize = config.pipelineBatchSize !== undefined - ? parseInt(config.pipelineBatchSize, 10) - : defaults.pipelineBatchSize - this.pipelineBatchTimeout = config.pipelineBatchTimeout !== undefined - ? parseInt(config.pipelineBatchTimeout, 10) - : defaults.pipelineBatchTimeout + this.pipelineBatchSize = config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize + this.pipelineBatchTimeout = config.pipelineBatchTimeout !== undefined ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout } getLibpqConnectionString(cb) { diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index 82829af65..340d4ad4f 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -6,117 +6,117 @@ const helper = require('./helper') const suite = new helper.Suite() suite.test('pipeline mode configuration defaults', (done) => { - const client = new helper.Client() - assert.strictEqual(client.pipelineMode, false, 'pipelineMode should default to false') - assert.strictEqual(client._pipelineBatchSize, 100, 'pipelineBatchSize should default to 100') - assert.strictEqual(client._pipelineBatchTimeout, 0, 'pipelineBatchTimeout should default to 0') - done() + const client = new helper.Client() + assert.strictEqual(client.pipelineMode, false, 'pipelineMode should default to false') + assert.strictEqual(client._pipelineBatchSize, 100, 'pipelineBatchSize should default to 100') + assert.strictEqual(client._pipelineBatchTimeout, 0, 'pipelineBatchTimeout should default to 0') + done() }) suite.test('pipeline mode can be enabled via config', (done) => { - const client = new helper.Client({ - pipelineMode: true, - pipelineBatchSize: 50, - pipelineBatchTimeout: 100 - }) - assert.strictEqual(client.pipelineMode, true, 'pipelineMode should be true') - assert.strictEqual(client._pipelineBatchSize, 50, 'pipelineBatchSize should be 50') - assert.strictEqual(client._pipelineBatchTimeout, 100, 'pipelineBatchTimeout should be 100') - done() + const client = new helper.Client({ + pipelineMode: true, + pipelineBatchSize: 50, + pipelineBatchTimeout: 100 + }) + assert.strictEqual(client.pipelineMode, true, 'pipelineMode should be true') + assert.strictEqual(client._pipelineBatchSize, 50, 'pipelineBatchSize should be 50') + assert.strictEqual(client._pipelineBatchTimeout, 100, 'pipelineBatchTimeout should be 100') + done() }) suite.testAsync('pipeline mode executes multiple queries', async () => { - const client = new helper.Client({ - pipelineMode: true - }) - await client.connect() - - try { - // Create a test table - await client.query('CREATE TEMP TABLE pipeline_test (id SERIAL PRIMARY KEY, val TEXT)') - - // Execute multiple queries in parallel (pipeline mode batches them) - const [r1, r2, r3] = await Promise.all([ - client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['first']), - client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['second']), - client.query('SELECT count(*)::int AS count FROM pipeline_test') - ]) - - assert.strictEqual(r1.rows[0].id, 1, 'first insert should return id 1') - assert.strictEqual(r2.rows[0].id, 2, 'second insert should return id 2') - // Note: count may be 0, 1, or 2 depending on order - pipeline doesn't guarantee order - // The key test is that all queries complete successfully - assert.ok(typeof r3.rows[0].count === 'number', 'count should be a number') - } finally { - await client.end() - } + const client = new helper.Client({ + pipelineMode: true + }) + await client.connect() + + try { + // Create a test table + await client.query('CREATE TEMP TABLE pipeline_test (id SERIAL PRIMARY KEY, val TEXT)') + + // Execute multiple queries in parallel (pipeline mode batches them) + const [r1, r2, r3] = await Promise.all([ + client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['first']), + client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['second']), + client.query('SELECT count(*)::int AS count FROM pipeline_test') + ]) + + assert.strictEqual(r1.rows[0].id, 1, 'first insert should return id 1') + assert.strictEqual(r2.rows[0].id, 2, 'second insert should return id 2') + // Note: count may be 0, 1, or 2 depending on order - pipeline doesn't guarantee order + // The key test is that all queries complete successfully + assert.ok(typeof r3.rows[0].count === 'number', 'count should be a number') + } finally { + await client.end() + } }) suite.testAsync('pipeline mode handles errors correctly', async () => { - const client = new helper.Client({ - pipelineMode: true - }) - await client.connect() - - try { - // Execute multiple queries where one will fail - const results = await Promise.allSettled([ - client.query('SELECT 1 AS num'), - client.query('SELECT * FROM nonexistent_table_12345'), // This should fail - client.query('SELECT 2 AS num') - ]) - - assert.strictEqual(results[0].status, 'fulfilled', 'first query should succeed') - assert.strictEqual(results[0].value.rows[0].num, 1) - - assert.strictEqual(results[1].status, 'rejected', 'second query should fail') - - // The third query might succeed or be aborted depending on timing - // In pipeline mode with error, remaining queries in the batch are aborted - } finally { - await client.end() - } + const client = new helper.Client({ + pipelineMode: true + }) + await client.connect() + + try { + // Execute multiple queries where one will fail + const results = await Promise.allSettled([ + client.query('SELECT 1 AS num'), + client.query('SELECT * FROM nonexistent_table_12345'), // This should fail + client.query('SELECT 2 AS num') + ]) + + assert.strictEqual(results[0].status, 'fulfilled', 'first query should succeed') + assert.strictEqual(results[0].value.rows[0].num, 1) + + assert.strictEqual(results[1].status, 'rejected', 'second query should fail') + + // The third query might succeed or be aborted depending on timing + // In pipeline mode with error, remaining queries in the batch are aborted + } finally { + await client.end() + } }) suite.testAsync('pipeline mode with prepared statements', async () => { - const client = new helper.Client({ - pipelineMode: true + const client = new helper.Client({ + pipelineMode: true + }) + await client.connect() + + try { + // Create a test table + await client.query('CREATE TEMP TABLE prep_test (id SERIAL PRIMARY KEY, val TEXT)') + + // Execute multiple inserts with the same prepared statement name + const results = await Promise.all([ + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['a'] }), + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['b'] }), + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['c'] }) + ]) + + assert.strictEqual(results.length, 3, 'should have 3 results') + results.forEach((r, i) => { + assert.strictEqual(r.rows[0].id, i + 1, `insert ${i + 1} should return correct id`) }) - await client.connect() - - try { - // Create a test table - await client.query('CREATE TEMP TABLE prep_test (id SERIAL PRIMARY KEY, val TEXT)') - - // Execute multiple inserts with the same prepared statement name - const results = await Promise.all([ - client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['a'] }), - client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['b'] }), - client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['c'] }) - ]) - - assert.strictEqual(results.length, 3, 'should have 3 results') - results.forEach((r, i) => { - assert.strictEqual(r.rows[0].id, i + 1, `insert ${i + 1} should return correct id`) - }) - } finally { - await client.end() - } + } finally { + await client.end() + } }) suite.testAsync('non-pipeline mode still works', async () => { - const client = new helper.Client({ - pipelineMode: false - }) - await client.connect() - - try { - const r1 = await client.query('SELECT 1 AS num') - const r2 = await client.query('SELECT 2 AS num') - - assert.strictEqual(r1.rows[0].num, 1) - assert.strictEqual(r2.rows[0].num, 2) - } finally { - await client.end() - } + const client = new helper.Client({ + pipelineMode: false + }) + await client.connect() + + try { + const r1 = await client.query('SELECT 1 AS num') + const r2 = await client.query('SELECT 2 AS num') + + assert.strictEqual(r1.rows[0].num, 1) + assert.strictEqual(r2.rows[0].num, 2) + } finally { + await client.end() + } }) From c160737bb9bb03a15913d124dff80ff3ebafc00d Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 19:47:53 +0100 Subject: [PATCH 3/7] fix: lint --- packages/pg/lib/connection-parameters.js | 6 ++++-- .../pg/test/integration/client/pipeline-tests.js | 16 ++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 898402708..642cf7906 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -129,8 +129,10 @@ class ConnectionParameters { // Pipeline mode configuration this.pipelineMode = config.pipelineMode !== undefined ? Boolean(config.pipelineMode) : defaults.pipelineMode - this.pipelineBatchSize = config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize - this.pipelineBatchTimeout = config.pipelineBatchTimeout !== undefined ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout + this.pipelineBatchSize = + config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize + this.pipelineBatchTimeout = + config.pipelineBatchTimeout !== undefined ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout } getLibpqConnectionString(cb) { diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index 340d4ad4f..980588887 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -17,7 +17,7 @@ suite.test('pipeline mode can be enabled via config', (done) => { const client = new helper.Client({ pipelineMode: true, pipelineBatchSize: 50, - pipelineBatchTimeout: 100 + pipelineBatchTimeout: 100, }) assert.strictEqual(client.pipelineMode, true, 'pipelineMode should be true') assert.strictEqual(client._pipelineBatchSize, 50, 'pipelineBatchSize should be 50') @@ -27,7 +27,7 @@ suite.test('pipeline mode can be enabled via config', (done) => { suite.testAsync('pipeline mode executes multiple queries', async () => { const client = new helper.Client({ - pipelineMode: true + pipelineMode: true, }) await client.connect() @@ -39,7 +39,7 @@ suite.testAsync('pipeline mode executes multiple queries', async () => { const [r1, r2, r3] = await Promise.all([ client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['first']), client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['second']), - client.query('SELECT count(*)::int AS count FROM pipeline_test') + client.query('SELECT count(*)::int AS count FROM pipeline_test'), ]) assert.strictEqual(r1.rows[0].id, 1, 'first insert should return id 1') @@ -54,7 +54,7 @@ suite.testAsync('pipeline mode executes multiple queries', async () => { suite.testAsync('pipeline mode handles errors correctly', async () => { const client = new helper.Client({ - pipelineMode: true + pipelineMode: true, }) await client.connect() @@ -63,7 +63,7 @@ suite.testAsync('pipeline mode handles errors correctly', async () => { const results = await Promise.allSettled([ client.query('SELECT 1 AS num'), client.query('SELECT * FROM nonexistent_table_12345'), // This should fail - client.query('SELECT 2 AS num') + client.query('SELECT 2 AS num'), ]) assert.strictEqual(results[0].status, 'fulfilled', 'first query should succeed') @@ -80,7 +80,7 @@ suite.testAsync('pipeline mode handles errors correctly', async () => { suite.testAsync('pipeline mode with prepared statements', async () => { const client = new helper.Client({ - pipelineMode: true + pipelineMode: true, }) await client.connect() @@ -92,7 +92,7 @@ suite.testAsync('pipeline mode with prepared statements', async () => { const results = await Promise.all([ client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['a'] }), client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['b'] }), - client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['c'] }) + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['c'] }), ]) assert.strictEqual(results.length, 3, 'should have 3 results') @@ -106,7 +106,7 @@ suite.testAsync('pipeline mode with prepared statements', async () => { suite.testAsync('non-pipeline mode still works', async () => { const client = new helper.Client({ - pipelineMode: false + pipelineMode: false, }) await client.connect() From e5a3f5d9814f6b04adbd2711a5bd71a88976f0c3 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 19:51:29 +0100 Subject: [PATCH 4/7] fix: lint --- packages/pg/lib/client.js | 2 +- packages/pg/lib/connection-parameters.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 926ff3e4a..20506c9b7 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -25,7 +25,7 @@ const queryQueueDeprecationNotice = nodeUtils.deprecate( const pgPassDeprecationNotice = nodeUtils.deprecate( () => {}, 'pgpass support is deprecated and will be removed in a future version. ' + - 'You can provide an async function as the password property to the Client/Pool constructor that returns a password instead. Within this funciton you can call the pgpass module in your own code.' + 'You can provide an async function as the password property to the Client/Pool constructor that returns a password instead. Within this funciton you can call the pgpass module in your own code.' ) const byoPromiseDeprecationNotice = nodeUtils.deprecate( diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 642cf7906..70dad104a 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -129,9 +129,9 @@ class ConnectionParameters { // Pipeline mode configuration this.pipelineMode = config.pipelineMode !== undefined ? Boolean(config.pipelineMode) : defaults.pipelineMode - this.pipelineBatchSize = + this.pipelineBatchSize = config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize - this.pipelineBatchTimeout = + this.pipelineBatchTimeout = config.pipelineBatchTimeout !== undefined ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout } From 8c00f5f5b34ee210be1c107d809b1801fe160895 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 19:54:03 +0100 Subject: [PATCH 5/7] fix: lint --- packages/pg/lib/connection-parameters.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index 70dad104a..fe98f7a73 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -132,7 +132,8 @@ class ConnectionParameters { this.pipelineBatchSize = config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize this.pipelineBatchTimeout = - config.pipelineBatchTimeout !== undefined ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout + config.pipelineBatchTimeout !== undefined + ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout } getLibpqConnectionString(cb) { From 5647fc2cc01d62be93475f443a16f322bb75f12a Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 19:58:32 +0100 Subject: [PATCH 6/7] fix: lint :( --- packages/pg/lib/connection-parameters.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index fe98f7a73..1617c65aa 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -133,7 +133,8 @@ class ConnectionParameters { config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize this.pipelineBatchTimeout = config.pipelineBatchTimeout !== undefined - ? parseInt(config.pipelineBatchTimeout, 10) : defaults.pipelineBatchTimeout + ? parseInt(config.pipelineBatchTimeout, 10) + : defaults.pipelineBatchTimeout } getLibpqConnectionString(cb) { From 9e34abd1499398dfa0a43cd3e866236a20acc072 Mon Sep 17 00:00:00 2001 From: Nigro Simone Date: Tue, 3 Feb 2026 20:03:28 +0100 Subject: [PATCH 7/7] fix: import --- packages/pg/test/integration/client/pipeline-tests.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js index 980588887..0e8eb74bf 100644 --- a/packages/pg/test/integration/client/pipeline-tests.js +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -1,7 +1,7 @@ 'use strict' const assert = require('assert') -const helper = require('./helper') +const helper = require('../test-helper') const suite = new helper.Suite()