diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 459439037..20506c9b7 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -94,6 +94,18 @@ class Client extends EventEmitter { } this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 + + // Pipeline mode configuration + this._pipelineMode = this.connectionParameters.pipelineMode + this._pipelineBatchSize = this.connectionParameters.pipelineBatchSize + this._pipelineBatchTimeout = this.connectionParameters.pipelineBatchTimeout + + // Pipeline state + // Pipeline state + this._activePipelineBatch = [] // Queries being buffered (sent but no Sync) + this._pendingPipelineBatches = [] // Batches sent with Sync, awaiting results + this._pipelineSyncTimer = null + this._pipelineCurrentIndex = 0 // Index into _pendingPipelineBatches[0] } get activeQuery() { @@ -110,6 +122,13 @@ class Client extends EventEmitter { return this._activeQuery } + /** + * Check if pipeline mode is enabled + */ + get pipelineMode() { + return this._pipelineMode + } + _errorAllQueries(err) { const enqueueError = (query) => { process.nextTick(() => { @@ -123,6 +142,13 @@ class Client extends EventEmitter { this._activeQuery = null } + // Clear pipeline pending queries + this._activePipelineBatch.forEach(enqueueError) + this._activePipelineBatch.length = 0 + this._pendingPipelineBatches.forEach((batch) => batch.forEach(enqueueError)) + this._pendingPipelineBatches.length = 0 + this._pipelineCurrentIndex = 0 + this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 } @@ -354,6 +380,19 @@ class Client extends EventEmitter { } this.emit('connect') } + + if (this._pipelineMode) { + // In pipeline mode, ReadyForQuery marks end of a pipeline batch sync + // The first batch in _pendingPipelineBatches is now complete + if (this._pendingPipelineBatches.length > 0) { + this._pendingPipelineBatches.shift() + } + this._pipelineCurrentIndex = 0 + this.readyForQuery = true + this._pulseQueryQueue() + return + } + const activeQuery = this._getActiveQuery() this._activeQuery = null this.readyForQuery = true @@ -395,6 +434,26 @@ class Client extends EventEmitter { if (this._connecting) { return this._handleErrorWhileConnecting(msg) } + + if (this._pipelineMode && this._pendingPipelineBatches.length > 0) { + // In pipeline mode, error aborts the current query in the current batch + // Server will skip remaining commands until Sync + const batch = this._pendingPipelineBatches[0] + const query = batch[this._pipelineCurrentIndex] + if (query) { + query.handleError(msg, this.connection) + } + // Mark remaining pending queries IN THIS BATCH as aborted + for (let i = this._pipelineCurrentIndex + 1; i < batch.length; i++) { + const pendingQuery = batch[i] + const abortError = new Error('Query aborted due to pipeline error') + abortError.pipelineAborted = true + abortError.originalError = msg + pendingQuery.handleError(abortError, this.connection) + } + return + } + const activeQuery = this._getActiveQuery() if (!activeQuery) { @@ -407,6 +466,14 @@ class Client extends EventEmitter { } _handleRowDescription(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleRowDescription(msg) + } + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected rowDescription message from backend.') @@ -418,6 +485,14 @@ class Client extends EventEmitter { } _handleDataRow(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleDataRow(msg) + } + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected dataRow message from backend.') @@ -440,6 +515,16 @@ class Client extends EventEmitter { } _handleEmptyQuery(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleEmptyQuery(this.connection) + query.handleReadyForQuery(this.connection) + } + this._pipelineCurrentIndex++ + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected emptyQuery message from backend.') @@ -451,6 +536,18 @@ class Client extends EventEmitter { } _handleCommandComplete(msg) { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query) { + query.handleCommandComplete(msg, this.connection) + // In pipeline mode, commandComplete marks query completion + // Signal readyForQuery to the individual query + query.handleReadyForQuery(this.connection) + } + this._pipelineCurrentIndex++ + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected commandComplete message from backend.') @@ -462,6 +559,14 @@ class Client extends EventEmitter { } _handleParseComplete() { + if (this._pipelineMode) { + const query = this._getCurrentPipelineQuery() + if (query && query.name) { + this.connection.parsedStatements[query.name] = query.text + } + return + } + const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected parseComplete message from backend.') @@ -574,6 +679,10 @@ class Client extends EventEmitter { } _pulseQueryQueue() { + if (this._pipelineMode) { + return this._pulsePipelineQueue() + } + if (this.readyForQuery === true) { this._activeQuery = this._queryQueue.shift() const activeQuery = this._getActiveQuery() @@ -596,6 +705,152 @@ class Client extends EventEmitter { } } + /** + * Pipeline mode query queue processing. + * Sends queries without waiting for individual ReadyForQuery responses. + */ + /** + * Pipeline mode query queue processing. + * Sends queries without waiting for individual ReadyForQuery responses. + */ + _pulsePipelineQueue() { + // If no queries to send, check if we need to drain or sync current batch + if (this._queryQueue.length === 0) { + if (this._activePipelineBatch.length > 0) { + // Queue is empty but we have unsynced queries, force a sync + this._sendPipelineSync() + } else if (this.hasExecuted && this._pendingPipelineBatches.length === 0) { + this.emit('drain') + } + return + } + + //Cork the stream to batch all messages + this.connection.stream.cork && this.connection.stream.cork() + + try { + // Send queries up to batch size + while (this._queryQueue.length > 0 && this._activePipelineBatch.length < this._pipelineBatchSize) { + const query = this._queryQueue.shift() + this._activePipelineBatch.push(query) + + // Submit query without sync + const queryError = this._submitPipelineQuery(query) + if (queryError) { + // Handle submit error immediately + this._activePipelineBatch.pop() + process.nextTick(() => { + query.handleError(queryError, this.connection) + }) + continue + } + } + + // Check if we should sync + const isFull = this._activePipelineBatch.length >= this._pipelineBatchSize + const isEmpty = this._queryQueue.length === 0 + const useTimeout = this._pipelineBatchTimeout > 0 + + if (isFull) { + this._sendPipelineSync() + } else if (isEmpty && !useTimeout) { + // Default behavior: sync immediately if empty and no timeout + this._sendPipelineSync() + } else if (useTimeout) { + // Schedule sync (wait for more queries or timeout) + this._schedulePipelineSync() + } + } finally { + // Uncork to send all buffered data + this.connection.stream.uncork && this.connection.stream.uncork() + } + } + + /** + * Submit a query in pipeline mode (without sync). + * Returns an error if submit fails, null otherwise. + */ + _submitPipelineQuery(query) { + const connection = this.connection + + this.hasExecuted = true + this.readyForQuery = false + + if (query.requiresPreparation()) { + // Extended query protocol + try { + if (!query.hasBeenParsed(connection)) { + connection.parse({ + text: query.text, + name: query.name, + types: query.types, + }) + } + + connection.bind({ + portal: query.portal || '', + statement: query.name || '', + values: query.values || [], + binary: query.binary, + valueMapper: utils.prepareValue, + }) + + connection.describe({ + type: 'P', + name: query.portal || '', + }) + + connection.execute({ + portal: query.portal || '', + rows: query.rows || 0, + }) + } catch (err) { + return err + } + } else { + // Simple query (can't be truly pipelined, but we queue it) + connection.query(query.text) + } + + return null + } + + _schedulePipelineSync() { + if (this._pipelineSyncTimer) return + + this._pipelineSyncTimer = setTimeout(() => { + this._pipelineSyncTimer = null + if (this._activePipelineBatch.length > 0) { + this._sendPipelineSync() + } + }, this._pipelineBatchTimeout) + } + + /** + * Send a Sync message to mark pipeline boundary + */ + _sendPipelineSync() { + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + this._pipelineSyncTimer = null + } + + // Move active batch to pending batches + if (this._activePipelineBatch.length > 0) { + this._pendingPipelineBatches.push(this._activePipelineBatch) + this._activePipelineBatch = [] + this.connection.sync() + } + } + + /** + * Get the current query being processed in pipeline mode + */ + _getCurrentPipelineQuery() { + const currentBatch = this._pendingPipelineBatches[0] + return currentBatch ? currentBatch[this._pipelineCurrentIndex] : null + } + query(config, values, callback) { // can take in strings, config object or query object let query @@ -696,6 +951,12 @@ class Client extends EventEmitter { end(cb) { this._ending = true + // Clear pipeline timer + if (this._pipelineSyncTimer) { + clearTimeout(this._pipelineSyncTimer) + this._pipelineSyncTimer = null + } + // if we have never connected, then end is a noop, callback immediately if (!this.connection._connecting || this._ended) { if (cb) { @@ -705,8 +966,13 @@ class Client extends EventEmitter { } } - if (this._getActiveQuery() || !this._queryable) { - // if we have an active query we need to force a disconnect + if ( + this._getActiveQuery() || + this._activePipelineBatch.length > 0 || + this._pendingPipelineBatches.length > 0 || + !this._queryable + ) { + // if we have an active query or pipeline queries we need to force a disconnect // on the socket - otherwise a hung query could block end forever this.connection.stream.destroy() } else { diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index c153932bb..1617c65aa 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -126,6 +126,15 @@ class ConnectionParameters { if (typeof config.keepAliveInitialDelayMillis === 'number') { this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) } + + // Pipeline mode configuration + this.pipelineMode = config.pipelineMode !== undefined ? Boolean(config.pipelineMode) : defaults.pipelineMode + this.pipelineBatchSize = + config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize + this.pipelineBatchTimeout = + config.pipelineBatchTimeout !== undefined + ? parseInt(config.pipelineBatchTimeout, 10) + : defaults.pipelineBatchTimeout } getLibpqConnectionString(cb) { diff --git a/packages/pg/lib/defaults.js b/packages/pg/lib/defaults.js index 673696f79..eb7e93f9a 100644 --- a/packages/pg/lib/defaults.js +++ b/packages/pg/lib/defaults.js @@ -77,6 +77,19 @@ module.exports = { keepalives: 1, keepalives_idle: 0, + + // Pipeline mode - send multiple queries without waiting for individual responses + // false = disabled (default), true = enabled + pipelineMode: false, + + // Maximum number of queries to batch before sending a Sync message + // Only applies when pipelineMode is true + pipelineBatchSize: 100, + + // Timeout in milliseconds before automatically sending a Sync message + // Set to 0 to disable (sync only on batch size or when queue is empty) + // Only applies when pipelineMode is true + pipelineBatchTimeout: 0, } const pgTypes = require('pg-types') diff --git a/packages/pg/test/integration/client/pipeline-tests.js b/packages/pg/test/integration/client/pipeline-tests.js new file mode 100644 index 000000000..0e8eb74bf --- /dev/null +++ b/packages/pg/test/integration/client/pipeline-tests.js @@ -0,0 +1,122 @@ +'use strict' + +const assert = require('assert') +const helper = require('../test-helper') + +const suite = new helper.Suite() + +suite.test('pipeline mode configuration defaults', (done) => { + const client = new helper.Client() + assert.strictEqual(client.pipelineMode, false, 'pipelineMode should default to false') + assert.strictEqual(client._pipelineBatchSize, 100, 'pipelineBatchSize should default to 100') + assert.strictEqual(client._pipelineBatchTimeout, 0, 'pipelineBatchTimeout should default to 0') + done() +}) + +suite.test('pipeline mode can be enabled via config', (done) => { + const client = new helper.Client({ + pipelineMode: true, + pipelineBatchSize: 50, + pipelineBatchTimeout: 100, + }) + assert.strictEqual(client.pipelineMode, true, 'pipelineMode should be true') + assert.strictEqual(client._pipelineBatchSize, 50, 'pipelineBatchSize should be 50') + assert.strictEqual(client._pipelineBatchTimeout, 100, 'pipelineBatchTimeout should be 100') + done() +}) + +suite.testAsync('pipeline mode executes multiple queries', async () => { + const client = new helper.Client({ + pipelineMode: true, + }) + await client.connect() + + try { + // Create a test table + await client.query('CREATE TEMP TABLE pipeline_test (id SERIAL PRIMARY KEY, val TEXT)') + + // Execute multiple queries in parallel (pipeline mode batches them) + const [r1, r2, r3] = await Promise.all([ + client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['first']), + client.query('INSERT INTO pipeline_test(val) VALUES($1) RETURNING id', ['second']), + client.query('SELECT count(*)::int AS count FROM pipeline_test'), + ]) + + assert.strictEqual(r1.rows[0].id, 1, 'first insert should return id 1') + assert.strictEqual(r2.rows[0].id, 2, 'second insert should return id 2') + // Note: count may be 0, 1, or 2 depending on order - pipeline doesn't guarantee order + // The key test is that all queries complete successfully + assert.ok(typeof r3.rows[0].count === 'number', 'count should be a number') + } finally { + await client.end() + } +}) + +suite.testAsync('pipeline mode handles errors correctly', async () => { + const client = new helper.Client({ + pipelineMode: true, + }) + await client.connect() + + try { + // Execute multiple queries where one will fail + const results = await Promise.allSettled([ + client.query('SELECT 1 AS num'), + client.query('SELECT * FROM nonexistent_table_12345'), // This should fail + client.query('SELECT 2 AS num'), + ]) + + assert.strictEqual(results[0].status, 'fulfilled', 'first query should succeed') + assert.strictEqual(results[0].value.rows[0].num, 1) + + assert.strictEqual(results[1].status, 'rejected', 'second query should fail') + + // The third query might succeed or be aborted depending on timing + // In pipeline mode with error, remaining queries in the batch are aborted + } finally { + await client.end() + } +}) + +suite.testAsync('pipeline mode with prepared statements', async () => { + const client = new helper.Client({ + pipelineMode: true, + }) + await client.connect() + + try { + // Create a test table + await client.query('CREATE TEMP TABLE prep_test (id SERIAL PRIMARY KEY, val TEXT)') + + // Execute multiple inserts with the same prepared statement name + const results = await Promise.all([ + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['a'] }), + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['b'] }), + client.query({ name: 'insert-val', text: 'INSERT INTO prep_test(val) VALUES($1) RETURNING id', values: ['c'] }), + ]) + + assert.strictEqual(results.length, 3, 'should have 3 results') + results.forEach((r, i) => { + assert.strictEqual(r.rows[0].id, i + 1, `insert ${i + 1} should return correct id`) + }) + } finally { + await client.end() + } +}) + +suite.testAsync('non-pipeline mode still works', async () => { + const client = new helper.Client({ + pipelineMode: false, + }) + await client.connect() + + try { + const r1 = await client.query('SELECT 1 AS num') + const r2 = await client.query('SELECT 2 AS num') + + assert.strictEqual(r1.rows[0].num, 1) + assert.strictEqual(r2.rows[0].num, 2) + } finally { + await client.end() + } +})