diff --git a/docs/pages/features/queries.mdx b/docs/pages/features/queries.mdx index 39bcfbe1d..4085ea43e 100644 --- a/docs/pages/features/queries.mdx +++ b/docs/pages/features/queries.mdx @@ -133,3 +133,72 @@ const query = { }, } ``` + +## Pipeline Mode + +Pipeline mode allows you to send multiple queries to the PostgreSQL server without waiting for the results of previous queries. This can significantly reduce network latency, especially for batch operations or high-latency connections. + +### Enabling Pipeline Mode + +Pipeline mode is opt-in and can be enabled when creating a client or pool: + +```js +// With Client +const client = new Client({ pipelineMode: true }) +await client.connect() + +// With Pool - all clients will have pipeline mode enabled +const pool = new Pool({ pipelineMode: true }) +``` + +When pipeline mode is enabled, you can submit multiple queries and they will be sent to the server immediately: + +```js +// All three queries are sent immediately to the server +const [result1, result2, result3] = await Promise.all([ + client.query('SELECT 1 as num'), + client.query('SELECT 2 as num'), + client.query('SELECT 3 as num'), +]) +``` + +### Error Isolation + +If one query fails in pipeline mode, other queries continue to execute: + +```js +const results = await Promise.allSettled([ + client.query('SELECT 1 as num'), + client.query('SELECT * FROM nonexistent_table'), // This will fail + client.query('SELECT 3 as num'), +]) + +console.log(results[0].status) // 'fulfilled' +console.log(results[1].status) // 'rejected' +console.log(results[2].status) // 'fulfilled' - still succeeds! +``` + +### Prepared Statements in Pipeline Mode + +Prepared statements work in pipeline mode, including concurrent queries with the same statement name: + +```js +const results = await Promise.all([ + client.query({ name: 'get-user', text: 'SELECT $1::int as id', values: [1] }), + client.query({ name: 'get-user', text: 'SELECT $1::int as id', values: [2] }), + client.query({ name: 'get-user', text: 'SELECT $1::int as id', values: [3] }), +]) +``` + +### Pipeline Mode Restrictions + +- **No multi-statement queries**: Queries with multiple SQL statements separated by `;` are rejected +- **No COPY operations**: COPY commands are not supported in pipeline mode +- **JavaScript client only**: Pipeline mode is not available in `pg-native` + +### When to Use Pipeline Mode + +Pipeline mode is most beneficial when: +- You have many independent queries to execute +- Network latency is high (cloud databases, cross-region connections) +- You're doing batch operations diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 459439037..dd7afdcdd 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -80,6 +80,16 @@ class Client extends EventEmitter { encoding: this.connectionParameters.client_encoding || 'utf8', }) this._queryQueue = [] + + // Pipeline mode configuration + this._pipelineMode = Boolean(c.pipelineMode) || false + + // Queue for tracking pending query results in pipeline mode + this._pendingQueries = [] + + // Track prepared statements that have been sent but not yet confirmed (for pipeline mode) + this._pendingParsedStatements = {} + this.binary = c.binary || defaults.binary this.processID = null this.secretKey = null @@ -110,6 +120,10 @@ class Client extends EventEmitter { return this._activeQuery } + get pipelineMode() { + return this._pipelineMode + } + _errorAllQueries(err) { const enqueueError = (query) => { process.nextTick(() => { @@ -123,6 +137,10 @@ class Client extends EventEmitter { this._activeQuery = null } + // Also error all pending queries in pipeline mode + this._pendingQueries.forEach(enqueueError) + this._pendingQueries.length = 0 + this._queryQueue.forEach(enqueueError) this._queryQueue.length = 0 } @@ -354,13 +372,39 @@ class Client extends EventEmitter { } this.emit('connect') } - const activeQuery = this._getActiveQuery() - this._activeQuery = null - this.readyForQuery = true - if (activeQuery) { - activeQuery.handleReadyForQuery(this.connection) + + if (this._pipelineMode) { + // In pipeline mode, we're always ready to send more queries + this.readyForQuery = true + + // In pipeline mode, complete the current pending query + // A query is ready to complete when it has received results or an error + const currentQuery = this._pendingQueries[0] + if ( + currentQuery && + (currentQuery._gotRowDescription || currentQuery._gotError || currentQuery._gotCommandComplete) + ) { + const completedQuery = this._pendingQueries.shift() + completedQuery.handleReadyForQuery(this.connection) + } + + // Check if more queries to send + this._pulseQueryQueue() + + // Emit drain when all queries complete + if (this._pendingQueries.length === 0 && this._queryQueue.length === 0) { + this.emit('drain') + } + } else { + // Existing non-pipeline behavior + const activeQuery = this._getActiveQuery() + this._activeQuery = null + this.readyForQuery = true + if (activeQuery) { + activeQuery.handleReadyForQuery(this.connection) + } + this._pulseQueryQueue() } - this._pulseQueryQueue() } // if we receive an error event or error message @@ -395,6 +439,20 @@ class Client extends EventEmitter { if (this._connecting) { return this._handleErrorWhileConnecting(msg) } + + if (this._pipelineMode) { + // In pipeline mode, error affects only the current query + // The connection remains usable for subsequent queries + const currentQuery = this._getCurrentPipelineQuery() + if (currentQuery) { + // Mark that this query received an error (for pipeline mode completion tracking) + currentQuery._gotError = true + currentQuery.handleError(msg, this.connection) + } + return + } + + // Existing non-pipeline error handling const activeQuery = this._getActiveQuery() if (!activeQuery) { @@ -407,18 +465,20 @@ class Client extends EventEmitter { } _handleRowDescription(msg) { - const activeQuery = this._getActiveQuery() + const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected rowDescription message from backend.') this._handleErrorEvent(error) return } + // Mark that this query has started receiving results (for pipeline mode) + activeQuery._gotRowDescription = true // delegate rowDescription to active query activeQuery.handleRowDescription(msg) } _handleDataRow(msg) { - const activeQuery = this._getActiveQuery() + const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected dataRow message from backend.') this._handleErrorEvent(error) @@ -451,19 +511,26 @@ class Client extends EventEmitter { } _handleCommandComplete(msg) { - const activeQuery = this._getActiveQuery() + const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected commandComplete message from backend.') this._handleErrorEvent(error) return } + // Mark that this query has completed (for pipeline mode) + activeQuery._gotCommandComplete = true // delegate commandComplete to active query activeQuery.handleCommandComplete(msg, this.connection) } _handleParseComplete() { - const activeQuery = this._getActiveQuery() + const activeQuery = this._pipelineMode ? this._getCurrentPipelineQuery() : this._getActiveQuery() if (activeQuery == null) { + // In pipeline mode, parseComplete can arrive before we've processed the query + // This is expected behavior - just ignore it if we don't have a query yet + if (this._pipelineMode) { + return + } const error = new Error('Received unexpected parseComplete message from backend.') this._handleErrorEvent(error) return @@ -477,6 +544,17 @@ class Client extends EventEmitter { } _handleCopyInResponse(msg) { + // In pipeline mode, COPY operations are not supported + if (this._pipelineMode) { + const activeQuery = this._getCurrentPipelineQuery() + if (activeQuery) { + const error = new Error('COPY operations are not supported in pipeline mode') + activeQuery.handleError(error, this.connection) + } + return + } + + // Existing COPY handling for non-pipeline mode const activeQuery = this._getActiveQuery() if (activeQuery == null) { const error = new Error('Received unexpected copyInResponse message from backend.') @@ -574,28 +652,109 @@ class Client extends EventEmitter { } _pulseQueryQueue() { - if (this.readyForQuery === true) { - this._activeQuery = this._queryQueue.shift() - const activeQuery = this._getActiveQuery() - if (activeQuery) { - this.readyForQuery = false - this.hasExecuted = true - - const queryError = activeQuery.submit(this.connection) - if (queryError) { - process.nextTick(() => { - activeQuery.handleError(queryError, this.connection) - this.readyForQuery = true - this._pulseQueryQueue() - }) + if (this._pipelineMode) { + // In pipeline mode, we can send queries as soon as we're connected + // We don't need to wait for readyForQuery between queries + if (!this._connected) { + return + } + // In pipeline mode, send all queued queries immediately + while (this._queryQueue.length > 0) { + const query = this._queryQueue.shift() + this._pendingQueries.push(query) + this._submitPipelineQuery(query) + } + } else { + // Existing non-pipeline behavior + if (this.readyForQuery === true) { + this._activeQuery = this._queryQueue.shift() + const activeQuery = this._getActiveQuery() + if (activeQuery) { + this.readyForQuery = false + this.hasExecuted = true + + const queryError = activeQuery.submit(this.connection) + if (queryError) { + process.nextTick(() => { + activeQuery.handleError(queryError, this.connection) + this.readyForQuery = true + this._pulseQueryQueue() + }) + } + } else if (this.hasExecuted) { + this._activeQuery = null + this.emit('drain') } - } else if (this.hasExecuted) { - this._activeQuery = null - this.emit('drain') } } } + // Submit a query using the Extended Query Protocol for pipeline mode + // Sends Parse/Bind/Describe/Execute/Sync for each query + _submitPipelineQuery(query) { + const connection = this.connection + + // Use cork/uncork to buffer messages before sending them all at once + // This optimizes network performance by avoiding multiple small writes + connection.stream.cork && connection.stream.cork() + try { + // Parse - only if the statement hasn't been parsed before (for named statements) + // In pipeline mode, we also track "in-flight" prepared statements to avoid + // sending duplicate Parse commands for the same named statement + const needsParse = query.name && !query.hasBeenParsed(connection) && !this._pendingParsedStatements[query.name] + + if (!query.name || needsParse) { + // For unnamed queries, always parse + // For named queries, only parse if not already parsed or in-flight + if (query.name) { + // Track this statement as "in-flight" to prevent duplicate Parse commands + this._pendingParsedStatements[query.name] = query.text + } + connection.parse({ + text: query.text, + name: query.name, + types: query.types, + }) + } + + // Bind - map user values to postgres wire protocol compatible values + // This could throw an exception, so we handle it in the try block + connection.bind({ + portal: query.portal, + statement: query.name, + values: query.values, + binary: query.binary, + valueMapper: utils.prepareValue, + }) + + // Describe - request description of the portal + connection.describe({ + type: 'P', + name: query.portal || '', + }) + + // Execute - execute the query + connection.execute({ + portal: query.portal, + rows: query.rows, + }) + + // Sync - establishes synchronization point + // This tells the server to process all messages up to this point + connection.sync() + } finally { + // Always uncork the stream, even if an error occurs + // This prevents the client from becoming unresponsive + connection.stream.uncork && connection.stream.uncork() + } + } + + // Returns the current query being processed in pipeline mode + // This is the first element of _pendingQueries (the oldest query awaiting results) + _getCurrentPipelineQuery() { + return this._pendingQueries[0] + } + query(config, values, callback) { // can take in strings, config object or query object let query @@ -666,6 +825,23 @@ class Client extends EventEmitter { query._result._types = this._types } + // Pipeline mode restriction: reject multi-statement queries + if (this._pipelineMode) { + const queryText = typeof config === 'string' ? config : config && config.text + if (queryText && typeof queryText === 'string' && queryText.includes(';')) { + // Count non-empty statements separated by semicolons + const multiStatementCheck = queryText.split(';').filter((s) => s.trim()).length + if (multiStatementCheck > 1) { + const error = new Error('Multiple SQL statements are not allowed in pipeline mode') + if (query.callback) { + process.nextTick(() => query.callback(error)) + return result + } + return Promise.reject(error) + } + } + } + if (!this._queryable) { process.nextTick(() => { query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection) @@ -705,6 +881,24 @@ class Client extends EventEmitter { } } + // In pipeline mode, wait for pending queries to complete before ending + if (this._pipelineMode && this._pendingQueries.length > 0) { + // Wait for all pending queries to complete (drain event) + const endConnection = () => { + this.connection.end() + } + this.once('drain', endConnection) + + if (cb) { + this.connection.once('end', cb) + return + } else { + return new this._Promise((resolve) => { + this.connection.once('end', resolve) + }) + } + } + if (this._getActiveQuery() || !this._queryable) { // if we have an active query we need to force a disconnect // on the socket - otherwise a hung query could block end forever diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index c153932bb..2127b1cb3 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -126,6 +126,9 @@ class ConnectionParameters { if (typeof config.keepAliveInitialDelayMillis === 'number') { this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000) } + + // Pipeline mode configuration - enables sending multiple queries without waiting for responses + this.pipelineMode = val('pipelineMode', config, false) } getLibpqConnectionString(cb) { diff --git a/packages/pg/lib/defaults.js b/packages/pg/lib/defaults.js index 673696f79..c3219f121 100644 --- a/packages/pg/lib/defaults.js +++ b/packages/pg/lib/defaults.js @@ -77,6 +77,10 @@ module.exports = { keepalives: 1, keepalives_idle: 0, + + // Enable pipeline mode for sending multiple queries without waiting for responses + // false=disabled (default), true=enabled + pipelineMode: false, } const pgTypes = require('pg-types') diff --git a/packages/pg/lib/query.js b/packages/pg/lib/query.js index 64aab5ff2..f9c522d05 100644 --- a/packages/pg/lib/query.js +++ b/packages/pg/lib/query.js @@ -22,6 +22,7 @@ class Query extends EventEmitter { this.portal = config.portal || '' this.callback = config.callback this._rowMode = config.rowMode + this.pipelineMode = config.pipelineMode || false if (process.domain && config.callback) { this.callback = process.domain.bind(config.callback) } @@ -33,6 +34,11 @@ class Query extends EventEmitter { } requiresPreparation() { + // In pipeline mode, always use extended query protocol + if (this.pipelineMode) { + return true + } + if (this.queryMode === 'extended') { return true } diff --git a/packages/pg/test/integration/client/pipeline-mode-tests.js b/packages/pg/test/integration/client/pipeline-mode-tests.js new file mode 100644 index 000000000..3bbdd6083 --- /dev/null +++ b/packages/pg/test/integration/client/pipeline-mode-tests.js @@ -0,0 +1,394 @@ +'use strict' + +const helper = require('../test-helper') +const pg = helper.pg +const assert = require('assert') +const Client = pg.Client + +const suite = new helper.Suite('pipeline mode integration') + +// Skip tests if native mode (pipeline mode is only for JS client) +if (helper.args.native) { + console.log('Skipping pipeline mode tests in native mode') + return +} + +suite.test('pipeline mode - basic multiple queries', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + const results = [] + const promises = [ + client.query('SELECT 1 as num').then((r) => results.push({ order: 0, value: r.rows[0].num })), + client.query('SELECT 2 as num').then((r) => results.push({ order: 1, value: r.rows[0].num })), + client.query('SELECT 3 as num').then((r) => results.push({ order: 2, value: r.rows[0].num })), + ] + + Promise.all(promises).then(() => { + // Verify all results received + assert.equal(results.length, 3, 'Should have 3 results') + // Verify correct values (order may vary due to async push) + const values = results.map((r) => r.value).sort() + assert.deepEqual(values, ['1', '2', '3']) + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - query ordering preservation', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + // Use pg_sleep to ensure queries take different times + // but results should still come back in order + const startTimes = [] + const endTimes = [] + + const p1 = client.query('SELECT 1 as num, pg_sleep(0.05)').then((r) => { + endTimes.push({ query: 1, time: Date.now() }) + return r.rows[0].num + }) + startTimes.push({ query: 1, time: Date.now() }) + + const p2 = client.query('SELECT 2 as num').then((r) => { + endTimes.push({ query: 2, time: Date.now() }) + return r.rows[0].num + }) + startTimes.push({ query: 2, time: Date.now() }) + + const p3 = client.query('SELECT 3 as num, pg_sleep(0.02)').then((r) => { + endTimes.push({ query: 3, time: Date.now() }) + return r.rows[0].num + }) + startTimes.push({ query: 3, time: Date.now() }) + + Promise.all([p1, p2, p3]).then((values) => { + // Results should be in submission order + assert.deepEqual(values, ['1', '2', '3'], 'Results should be in submission order') + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - error isolation', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + // First query succeeds + const p1 = client.query('SELECT 1 as num') + + // Second query fails (invalid table) + const p2 = client.query('SELECT * FROM nonexistent_table_xyz') + + // Third query should still succeed + const p3 = client.query('SELECT 3 as num') + + Promise.allSettled([p1, p2, p3]).then((results) => { + // First query should succeed + assert.equal(results[0].status, 'fulfilled') + assert.equal(results[0].value.rows[0].num, '1') + + // Second query should fail + assert.equal(results[1].status, 'rejected') + assert.ok(results[1].reason instanceof Error) + + // Third query should succeed despite second failing + assert.equal(results[2].status, 'fulfilled') + assert.equal(results[2].value.rows[0].num, '3') + + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - transactions work correctly', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + // Create temp table and run transaction + client + .query('CREATE TEMP TABLE pipeline_test (id serial, value text)') + .then(() => client.query('BEGIN')) + .then(() => client.query("INSERT INTO pipeline_test (value) VALUES ('test1') RETURNING id")) + .then((r) => { + assert.ok(r.rows[0].id, 'Should return inserted id') + return client.query("INSERT INTO pipeline_test (value) VALUES ('test2') RETURNING id") + }) + .then(() => client.query('COMMIT')) + .then(() => client.query('SELECT COUNT(*) as count FROM pipeline_test')) + .then((r) => { + assert.equal(r.rows[0].count, '2', 'Should have 2 rows after commit') + client.end(done) + }) + .catch((err) => { + client.end(() => done(err)) + }) + }) +}) + +suite.test('pipeline mode - transaction rollback', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + client + .query('CREATE TEMP TABLE pipeline_rollback_test (id serial, value text)') + .then(() => client.query('BEGIN')) + .then(() => client.query("INSERT INTO pipeline_rollback_test (value) VALUES ('test1')")) + .then(() => client.query('ROLLBACK')) + .then(() => client.query('SELECT COUNT(*) as count FROM pipeline_rollback_test')) + .then((r) => { + assert.equal(r.rows[0].count, '0', 'Should have 0 rows after rollback') + client.end(done) + }) + .catch((err) => { + client.end(() => done(err)) + }) + }) +}) + +suite.test('pipeline mode - prepared statements', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + // In pipeline mode, we need to use different names for concurrent prepared statements + // or execute them sequentially. Here we use sequential execution. + client + .query({ + name: 'get-number-1', + text: 'SELECT $1::int as num', + values: [1], + }) + .then((r1) => { + assert.equal(r1.rows[0].num, 1) + // Now reuse the same prepared statement + return client.query({ + name: 'get-number-1', + text: 'SELECT $1::int as num', + values: [2], + }) + }) + .then((r2) => { + assert.equal(r2.rows[0].num, 2) + return client.query({ + name: 'get-number-1', + text: 'SELECT $1::int as num', + values: [3], + }) + }) + .then((r3) => { + assert.equal(r3.rows[0].num, 3) + client.end(done) + }) + .catch((err) => { + client.end(() => done(err)) + }) + }) +}) + +suite.test('pipeline mode - concurrent prepared statements with same name', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + // Test that concurrent prepared statements with the same name work correctly + // The client should only send Parse once and reuse it for subsequent queries + const promises = [ + client.query({ + name: 'concurrent-test', + text: 'SELECT $1::int as num', + values: [1], + }), + client.query({ + name: 'concurrent-test', + text: 'SELECT $1::int as num', + values: [2], + }), + client.query({ + name: 'concurrent-test', + text: 'SELECT $1::int as num', + values: [3], + }), + ] + + Promise.all(promises) + .then((results) => { + assert.equal(results[0].rows[0].num, 1) + assert.equal(results[1].rows[0].num, 2) + assert.equal(results[2].rows[0].num, 3) + client.end(done) + }) + .catch((err) => { + client.end(() => done(err)) + }) + }) +}) + +suite.test('pipeline mode - rejects multi-statement queries', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + client.query('SELECT 1; SELECT 2').catch((err) => { + assert.ok(err instanceof Error) + assert.equal(err.message, 'Multiple SQL statements are not allowed in pipeline mode') + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - many concurrent queries', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + const numQueries = 50 + const promises = [] + + for (let i = 0; i < numQueries; i++) { + promises.push( + client.query('SELECT $1::int as num', [i]).then((r) => ({ + expected: i, + actual: parseInt(r.rows[0].num), + })) + ) + } + + Promise.all(promises).then((results) => { + // Verify all queries returned correct results + results.forEach((r) => { + assert.equal(r.actual, r.expected, `Query ${r.expected} should return ${r.expected}`) + }) + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - client.end() waits for pending queries', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + let queryCompleted = false + + // Start a query that takes some time + client.query('SELECT pg_sleep(0.1), 1 as num').then((r) => { + queryCompleted = true + assert.equal(r.rows[0].num, '1') + }) + + // Immediately call end - should wait for query to complete + client.end(() => { + assert.ok(queryCompleted, 'Query should have completed before end() callback') + done() + }) + }) +}) + +suite.test('pipeline mode - pipelineMode property returns true', (done) => { + const client = new Client({ pipelineMode: true }) + assert.equal(client.pipelineMode, true, 'pipelineMode should be true') + client.connect((err) => { + if (err) return done(err) + assert.equal(client.pipelineMode, true, 'pipelineMode should still be true after connect') + client.end(done) + }) +}) + +suite.test('non-pipeline mode - pipelineMode property returns false', (done) => { + const client = new Client({ pipelineMode: false }) + assert.equal(client.pipelineMode, false, 'pipelineMode should be false') + client.connect((err) => { + if (err) return done(err) + assert.equal(client.pipelineMode, false, 'pipelineMode should still be false after connect') + client.end(done) + }) +}) + +suite.test('default client - pipelineMode property returns false', (done) => { + const client = new Client() + assert.equal(client.pipelineMode, false, 'pipelineMode should default to false') + client.connect((err) => { + if (err) return done(err) + client.end(done) + }) +}) + +suite.test('pipeline mode - row events are emitted', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + const query = new pg.Query('SELECT generate_series(1, 3) as num') + const rows = [] + + query.on('row', (row) => { + rows.push(row.num) + }) + + query.on('end', () => { + assert.deepEqual(rows, ['1', '2', '3'], 'Should receive all rows via events') + client.end(done) + }) + + client.query(query) + }) +}) + +suite.test('pipeline mode - handles NULL values correctly', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + Promise.all([ + client.query('SELECT NULL as val'), + client.query('SELECT 1 as val'), + client.query('SELECT NULL as val'), + ]).then((results) => { + assert.equal(results[0].rows[0].val, null) + assert.equal(results[1].rows[0].val, '1') + assert.equal(results[2].rows[0].val, null) + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - handles empty result sets', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + client + .query('CREATE TEMP TABLE empty_test (id int)') + .then(() => client.query('SELECT * FROM empty_test')) + .then((r) => { + assert.equal(r.rows.length, 0, 'Should return empty array') + assert.ok(r.fields, 'Should have fields metadata') + client.end(done) + }) + }) +}) + +suite.test('pipeline mode - connection remains usable after error', (done) => { + const client = new Client({ pipelineMode: true }) + client.connect((err) => { + if (err) return done(err) + + // Cause an error + client + .query('SELECT * FROM this_table_does_not_exist') + .catch(() => { + // Error expected, now verify connection still works + return client.query('SELECT 42 as answer') + }) + .then((r) => { + assert.equal(r.rows[0].answer, '42', 'Connection should still work after error') + client.end(done) + }) + }) +}) diff --git a/packages/pg/test/unit/client/pipeline-error-handling-tests.js b/packages/pg/test/unit/client/pipeline-error-handling-tests.js new file mode 100644 index 000000000..a5590ef92 --- /dev/null +++ b/packages/pg/test/unit/client/pipeline-error-handling-tests.js @@ -0,0 +1,545 @@ +'use strict' +const Connection = require('../../../lib/connection') +const Client = require('../../../lib/client') +const Query = require('../../../lib/query') +const assert = require('assert') +const Suite = require('../../suite') +const EventEmitter = require('events').EventEmitter + +const suite = new Suite() + +// Create a mock stream for testing +const createMockStream = function () { + const stream = new EventEmitter() + stream.setNoDelay = () => {} + stream.connect = function () {} + stream.write = function () {} + stream.cork = function () {} + stream.uncork = function () {} + stream.writable = true + return stream +} + +// Create a pipeline mode client for testing +const createPipelineClient = function () { + const stream = createMockStream() + const connection = new Connection({ stream: stream }) + connection.startup = function () {} + connection.connect = function () {} + connection.parse = function () {} + connection.bind = function () {} + connection.describe = function () {} + connection.execute = function () {} + connection.sync = function () {} + connection.parsedStatements = {} + + const client = new Client({ + connection: connection, + pipelineMode: true, + }) + client.connect() + client.connection.emit('connect') + return client +} + +suite.test('pipeline mode - _getCurrentPipelineQuery returns first pending query', function (done) { + const client = createPipelineClient() + + // Initially no pending queries + assert.equal(client._getCurrentPipelineQuery(), undefined) + + // Simulate connection ready and submit queries + client.connection.emit('readyForQuery') + + client.query('SELECT 1') + client.query('SELECT 2') + + // First pending query should be returned + const currentQuery = client._getCurrentPipelineQuery() + assert.ok(currentQuery, 'Should have a current pipeline query') + assert.equal(currentQuery.text, 'SELECT 1') + done() +}) + +suite.test('pipeline mode error handling - error is passed to current query', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create a query with a callback to capture the error + let capturedError = null + const query = new Query('SELECT 1', [], function (err) { + capturedError = err + }) + + // Add query to pending queries (simulating what _pulseQueryQueue does) + client._pendingQueries.push(query) + + // Simulate error message + const errorMsg = new Error('Test error for query') + errorMsg.severity = 'ERROR' + errorMsg.code = '42P01' + + // Emit error message - should call handleError on current query + client.connection.emit('errorMessage', errorMsg) + + // Verify the error was passed to the query + process.nextTick(() => { + assert.ok(capturedError, 'Query should have received an error') + assert.equal(capturedError.message, 'Test error for query') + done() + }) +}) + +suite.test('pipeline mode error handling - connection remains usable', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create a query with a callback + const query = new Query('SELECT 1', [], function () {}) + client._pendingQueries.push(query) + + // Simulate error + const errorMsg = new Error('Test error') + client.connection.emit('errorMessage', errorMsg) + + // Verify client is still usable + process.nextTick(() => { + assert.ok(client._queryable, 'Client should still be queryable') + assert.ok(!client._ended, 'Client should not be ended') + done() + }) +}) + +suite.test('pipeline mode error - does not emit connection-level error', function (done) { + const client = createPipelineClient() + + // Track if error event is emitted on client + let clientErrorEmitted = false + client.on('error', () => { + clientErrorEmitted = true + }) + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create a query with a callback + const query = new Query('SELECT 1', [], function () {}) + client._pendingQueries.push(query) + + // Simulate error + const errorMsg = new Error('Test error') + client.connection.emit('errorMessage', errorMsg) + + process.nextTick(() => { + assert.ok(!clientErrorEmitted, 'Client should not emit error event in pipeline mode') + done() + }) +}) + +suite.test('pipeline mode error - does not clear _activeQuery', function (done) { + const client = createPipelineClient() + + // Set a fake active query (shouldn't be used in pipeline mode) + client._activeQuery = { text: 'FAKE ACTIVE QUERY' } + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create a query with a callback + const query = new Query('SELECT 1', [], function () {}) + client._pendingQueries.push(query) + + // Simulate error + const errorMsg = new Error('Test error') + client.connection.emit('errorMessage', errorMsg) + + process.nextTick(() => { + // In pipeline mode, _activeQuery should not be cleared + assert.ok(client._activeQuery, '_activeQuery should not be cleared in pipeline mode') + assert.equal(client._activeQuery.text, 'FAKE ACTIVE QUERY') + done() + }) +}) + +suite.test('non-pipeline mode error handling - existing behavior preserved', function (done) { + // Create a non-pipeline client + const stream = createMockStream() + const connection = new Connection({ stream: stream }) + connection.startup = function () {} + connection.connect = function () {} + connection.query = function () {} + connection.parsedStatements = {} + + const client = new Client({ + connection: connection, + pipelineMode: false, // Explicitly non-pipeline + }) + client.connect() + client.connection.emit('connect') + client.connection.emit('readyForQuery') + + // Create a query with a callback + let capturedError = null + const query = new Query('SELECT 1', [], function (err) { + capturedError = err + }) + + // Set as active query (non-pipeline mode uses _activeQuery) + client._activeQuery = query + + // Simulate error + const errorMsg = new Error('Test error') + client.connection.emit('errorMessage', errorMsg) + + process.nextTick(() => { + assert.ok(capturedError, 'Query should have received an error') + assert.equal(capturedError.message, 'Test error') + // In non-pipeline mode, _activeQuery should be cleared + assert.equal(client._activeQuery, null, '_activeQuery should be cleared in non-pipeline mode') + done() + }) +}) + +suite.test('pipeline mode error - no pending query does not crash', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // No pending queries - _pendingQueries is empty + + // Simulate error - should not crash + const errorMsg = new Error('Test error') + client.connection.emit('errorMessage', errorMsg) + + process.nextTick(() => { + // Should not crash and client should still be usable + assert.ok(client._queryable, 'Client should still be queryable') + done() + }) +}) + +suite.test('pipeline mode - _errorAllQueries rejects all pending queries', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create multiple queries with callbacks to capture errors + const errors = [] + const query1 = new Query('SELECT 1', [], function (err) { + errors.push({ query: 'SELECT 1', error: err }) + }) + const query2 = new Query('SELECT 2', [], function (err) { + errors.push({ query: 'SELECT 2', error: err }) + }) + const query3 = new Query('SELECT 3', [], function (err) { + errors.push({ query: 'SELECT 3', error: err }) + }) + + // Add queries to pending queries (simulating pipeline mode) + client._pendingQueries.push(query1) + client._pendingQueries.push(query2) + client._pendingQueries.push(query3) + + // Simulate connection error that triggers _errorAllQueries + const connectionError = new Error('Connection terminated unexpectedly') + client._errorAllQueries(connectionError) + + // Verify all pending queries were cleared + assert.equal(client._pendingQueries.length, 0, 'Pending queries should be cleared') + + // Wait for all errors to be processed (they are enqueued with process.nextTick) + setTimeout(() => { + assert.equal(errors.length, 3, 'All 3 queries should have received errors') + errors.forEach((item) => { + assert.equal(item.error.message, 'Connection terminated unexpectedly') + }) + done() + }, 10) +}) + +suite.test('pipeline mode - _errorAllQueries handles both pending and queued queries', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create queries for both pending and queued + const errors = [] + const pendingQuery = new Query('SELECT pending', [], function (err) { + errors.push({ query: 'pending', error: err }) + }) + const queuedQuery = new Query('SELECT queued', [], function (err) { + errors.push({ query: 'queued', error: err }) + }) + + // Add to pending queries (already sent to server) + client._pendingQueries.push(pendingQuery) + // Add to query queue (not yet sent) + client._queryQueue.push(queuedQuery) + + // Simulate connection error + const connectionError = new Error('Connection lost') + client._errorAllQueries(connectionError) + + // Verify both queues were cleared + assert.equal(client._pendingQueries.length, 0, 'Pending queries should be cleared') + assert.equal(client._queryQueue.length, 0, 'Query queue should be cleared') + + // Wait for all errors to be processed + setTimeout(() => { + assert.equal(errors.length, 2, 'Both queries should have received errors') + done() + }, 10) +}) + +suite.test('pipeline mode - connection end triggers _errorAllQueries for pending queries', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Handle the error event that will be emitted on unexpected connection end + client.on('error', () => { + // Expected - connection terminated unexpectedly + }) + + // Create a query with callback + let capturedError = null + const query = new Query('SELECT 1', [], function (err) { + capturedError = err + }) + + // Add to pending queries + client._pendingQueries.push(query) + + // Simulate unexpected connection end (this triggers _errorAllQueries internally) + client.connection.emit('end') + + // Wait for error to be processed + setTimeout(() => { + assert.ok(capturedError, 'Query should have received an error') + assert.ok(capturedError.message.includes('Connection terminated'), 'Error should indicate connection termination') + assert.equal(client._pendingQueries.length, 0, 'Pending queries should be cleared') + done() + }, 10) +}) + +// Multi-statement validation tests (Requirement 6.2) +suite.test('pipeline mode - rejects multi-statement query with callback', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Submit a multi-statement query with callback + let capturedError = null + client.query('SELECT 1; SELECT 2', function (err) { + capturedError = err + }) + + process.nextTick(() => { + assert.ok(capturedError, 'Should have received an error') + assert.equal(capturedError.message, 'Multiple SQL statements are not allowed in pipeline mode') + done() + }) +}) + +suite.test('pipeline mode - rejects multi-statement query with Promise', async function () { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Submit a multi-statement query (Promise-based) + try { + await client.query('SELECT 1; SELECT 2') + assert.fail('Should have thrown an error') + } catch (err) { + assert.equal(err.message, 'Multiple SQL statements are not allowed in pipeline mode') + } +}) + +suite.test('pipeline mode - allows single statement with trailing semicolon', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Submit a single statement with trailing semicolon - should be allowed + client.query('SELECT 1;') + + // Verify query was added to pending queries (not rejected) + assert.ok(client._pendingQueries.length > 0, 'Query should be added to pending queries') + assert.equal(client._pendingQueries[0].text, 'SELECT 1;') + done() +}) + +suite.test('pipeline mode - allows single statement without semicolon', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Submit a single statement without semicolon - should be allowed + client.query('SELECT 1') + + // Verify query was added to pending queries (not rejected) + assert.ok(client._pendingQueries.length > 0, 'Query should be added to pending queries') + assert.equal(client._pendingQueries[0].text, 'SELECT 1') + done() +}) + +suite.test('pipeline mode - rejects multi-statement query with config object', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Submit a multi-statement query using config object + let capturedError = null + client.query({ text: 'SELECT 1; SELECT 2' }, function (err) { + capturedError = err + }) + + process.nextTick(() => { + assert.ok(capturedError, 'Should have received an error') + assert.equal(capturedError.message, 'Multiple SQL statements are not allowed in pipeline mode') + done() + }) +}) + +suite.test('pipeline mode - rejects query with multiple statements and whitespace', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Submit a multi-statement query with whitespace + let capturedError = null + client.query('SELECT 1; SELECT 2; ', function (err) { + capturedError = err + }) + + process.nextTick(() => { + assert.ok(capturedError, 'Should have received an error') + assert.equal(capturedError.message, 'Multiple SQL statements are not allowed in pipeline mode') + done() + }) +}) + +suite.test('non-pipeline mode - allows multi-statement queries', function (done) { + // Create a non-pipeline client + const stream = createMockStream() + const connection = new Connection({ stream: stream }) + connection.startup = function () {} + connection.connect = function () {} + connection.query = function () {} + connection.parsedStatements = {} + + const client = new Client({ + connection: connection, + pipelineMode: false, // Explicitly non-pipeline + }) + client.connect() + client.connection.emit('connect') + client.connection.emit('readyForQuery') + + // Submit a multi-statement query - should be allowed in non-pipeline mode + client.query('SELECT 1; SELECT 2') + + // Verify query was added to queue (not rejected) + // In non-pipeline mode, the query becomes the active query + assert.ok(client._activeQuery, 'Query should be set as active query') + assert.equal(client._activeQuery.text, 'SELECT 1; SELECT 2') + done() +}) + +// COPY operation rejection tests (Requirement 6.1) +suite.test('pipeline mode - rejects COPY operation with error', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create a query with a callback to capture the error + let capturedError = null + const query = new Query('COPY test FROM STDIN', [], function (err) { + capturedError = err + }) + + // Add query to pending queries (simulating what _pulseQueryQueue does) + client._pendingQueries.push(query) + + // Simulate copyInResponse message from server + client.connection.emit('copyInResponse', {}) + + // Verify the error was passed to the query + process.nextTick(() => { + assert.ok(capturedError, 'Query should have received an error') + assert.equal(capturedError.message, 'COPY operations are not supported in pipeline mode') + done() + }) +}) + +suite.test('pipeline mode - COPY rejection does not affect connection', function (done) { + const client = createPipelineClient() + + // Simulate connection ready + client.connection.emit('readyForQuery') + + // Create a query with a callback + const query = new Query('COPY test FROM STDIN', [], function () {}) + client._pendingQueries.push(query) + + // Simulate copyInResponse + client.connection.emit('copyInResponse', {}) + + process.nextTick(() => { + // Verify client is still usable + assert.ok(client._queryable, 'Client should still be queryable') + assert.ok(!client._ended, 'Client should not be ended') + done() + }) +}) + +suite.test('non-pipeline mode - allows COPY operations', function (done) { + // Create a non-pipeline client + const stream = createMockStream() + const connection = new Connection({ stream: stream }) + connection.startup = function () {} + connection.connect = function () {} + connection.query = function () {} + connection.parsedStatements = {} + + const client = new Client({ + connection: connection, + pipelineMode: false, // Explicitly non-pipeline + }) + client.connect() + client.connection.emit('connect') + client.connection.emit('readyForQuery') + + // Create a query with a callback + let copyInResponseCalled = false + const query = new Query('COPY test FROM STDIN', [], function () {}) + query.handleCopyInResponse = function () { + copyInResponseCalled = true + } + + // Set as active query (non-pipeline mode uses _activeQuery) + client._activeQuery = query + + // Simulate copyInResponse + client.connection.emit('copyInResponse', {}) + + process.nextTick(() => { + assert.ok(copyInResponseCalled, 'handleCopyInResponse should be called in non-pipeline mode') + done() + }) +})