diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 8c83406bb..b7dc4ec8c 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -28,6 +28,14 @@ const Client = (module.exports = function (config) { this._rows = undefined this._results = undefined + // Pipeline mode support + this._pipelineMode = config.pipelineMode || false + this._pendingCallbacks = [] + this._currentQueryError = undefined + this._currentQueryRows = undefined + this._currentQueryResults = undefined + this._currentQueryResultCount = 0 + // lazy start the reader if notifications are listened for // this way if you only run sync queries you wont block // the event loop artificially @@ -42,12 +50,44 @@ const Client = (module.exports = function (config) { util.inherits(Client, EventEmitter) +// Pipeline mode property getter +Object.defineProperty(Client.prototype, 'pipelineMode', { + get: function () { + return this._pipelineMode + }, +}) + +// Check if pipeline mode is supported +Client.prototype.pipelineModeSupported = function () { + return typeof this.pq.pipelineModeSupported === 'function' && this.pq.pipelineModeSupported() +} + Client.prototype.connect = function (params, cb) { - this.pq.connect(params, cb) + const self = this + this.pq.connect(params, function (err) { + if (err) return cb(err) + if (self._pipelineMode) { + if (!self.pipelineModeSupported()) { + return cb(new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.')) + } + if (!self.pq.enterPipelineMode()) { + return cb(new Error('Failed to enter pipeline mode: ' + self.pq.errorMessage())) + } + } + cb() + }) } Client.prototype.connectSync = function (params) { this.pq.connectSync(params) + if (this._pipelineMode) { + if (!this.pipelineModeSupported()) { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.') + } + if (!this.pq.enterPipelineMode()) { + throw new Error('Failed to enter pipeline mode: ' + this.pq.errorMessage()) + } + } } Client.prototype.query = function (text, values, cb) { @@ -55,6 +95,12 @@ Client.prototype.query = function (text, values, cb) { if (typeof values === 'function') { cb = values + values = undefined + } + + // Use pipeline mode if enabled + if (this._pipelineMode) { + return this._pipelineQuery(text, values, cb) } if (Array.isArray(values)) { @@ -73,6 +119,135 @@ Client.prototype.query = function (text, values, cb) { }) } +// Pipeline mode query implementation +Client.prototype._pipelineQuery = function (text, values, cb) { + const self = this + + // Validate: no multi-statement queries in pipeline mode + const statements = text.split(';').filter((s) => s.trim()) + if (statements.length > 1) { + return setImmediate(() => cb(new Error('Multi-statement queries are not supported in pipeline mode'))) + } + + // Validate: no COPY in pipeline mode + const upperText = text.trim().toUpperCase() + if (upperText.startsWith('COPY ')) { + return setImmediate(() => cb(new Error('COPY operations are not supported in pipeline mode'))) + } + + this._stopReading() + + const success = this.pq.setNonBlocking(true) + if (!success) { + return setImmediate(() => cb(new Error('Unable to set non-blocking to true'))) + } + + // Send the query + let sent + if (Array.isArray(values)) { + sent = this.pq.sendQueryParams(text, values) + } else { + sent = this.pq.sendQuery(text) + } + + if (!sent) { + return setImmediate(() => cb(new Error(this.pq.errorMessage() || 'Failed to send query'))) + } + + // Send sync point + if (!this.pq.pipelineSync()) { + return setImmediate(() => cb(new Error('Failed to send pipeline sync'))) + } + + // Store callback for this query + this._pendingCallbacks.push(cb) + + // Flush and start reading + this._waitForDrain(this.pq, (err) => { + if (err) { + const pendingCb = self._pendingCallbacks.shift() + if (pendingCb) pendingCb(new Error(err)) + return + } + self._startPipelineReading() + }) +} + +// Start reading in pipeline mode +Client.prototype._startPipelineReading = function () { + if (this._reading) return + this._reading = true + this.pq.on('readable', this._readPipeline.bind(this)) + this.pq.startReader() +} + +// Read results in pipeline mode +Client.prototype._readPipeline = function () { + const pq = this.pq + + if (!pq.consumeInput()) { + return this._readError() + } + + if (pq.isBusy()) { + return + } + + while (pq.getResult()) { + const status = pq.resultStatus() + + if (status === 'PGRES_PIPELINE_SYNC') { + // Query complete, call the callback + const cb = this._pendingCallbacks.shift() + if (cb) { + const err = this._currentQueryError + this._currentQueryError = undefined + const rows = this._currentQueryRows + this._currentQueryRows = undefined + const results = this._currentQueryResults + this._currentQueryResults = undefined + this._currentQueryResultCount = 0 + cb(err, rows || [], results) + } + + // If no more pending callbacks, stop reading + if (this._pendingCallbacks.length === 0) { + this._stopReading() + } + continue + } + + if (status === 'PGRES_FATAL_ERROR') { + this._currentQueryError = new Error(pq.resultErrorMessage()) + continue + } + + if (status === 'PGRES_TUPLES_OK' || status === 'PGRES_COMMAND_OK' || status === 'PGRES_EMPTY_QUERY') { + const result = this._consumeQueryResults(pq) + this._onPipelineResult(result) + } + + if (pq.isBusy()) { + return + } + } +} + +// Handle result in pipeline mode +Client.prototype._onPipelineResult = function (result) { + if (this._currentQueryResultCount === 0) { + this._currentQueryResults = result + this._currentQueryRows = result.rows + } else if (this._currentQueryResultCount === 1) { + this._currentQueryResults = [this._currentQueryResults, result] + this._currentQueryRows = [this._currentQueryRows, result.rows] + } else { + this._currentQueryResults.push(result) + this._currentQueryRows.push(result.rows) + } + this._currentQueryResultCount++ +} + Client.prototype.prepare = function (statementName, text, nParams, cb) { const self = this const fn = function () { @@ -149,6 +324,28 @@ Client.prototype.escapeIdentifier = function (value) { module.exports.version = require('./package.json').version Client.prototype.end = function (cb) { + const self = this + + // In pipeline mode, wait for pending queries + if (this._pipelineMode && this._pendingCallbacks.length > 0) { + const checkPending = function () { + if (self._pendingCallbacks.length === 0) { + self._finishEnd(cb) + } else { + setTimeout(checkPending, 10) + } + } + checkPending() + return + } + + this._finishEnd(cb) +} + +Client.prototype._finishEnd = function (cb) { + if (this._pipelineMode && this.pipelineModeSupported()) { + this.pq.exitPipelineMode() + } this._stopReading() this.pq.finish() if (cb) setImmediate(cb) @@ -164,6 +361,7 @@ Client.prototype._stopReading = function () { this._reading = false this.pq.stopReader() this.pq.removeListener('readable', this._read) + this.pq.removeListener('readable', this._readPipeline) } Client.prototype._consumeQueryResults = function (pq) { diff --git a/packages/pg-native/test/pipeline-mode.js b/packages/pg-native/test/pipeline-mode.js new file mode 100644 index 000000000..dff483525 --- /dev/null +++ b/packages/pg-native/test/pipeline-mode.js @@ -0,0 +1,255 @@ +const Client = require('../') +const assert = require('assert') + +describe('pipeline mode', function () { + this.timeout(10000) + + describe('pipelineModeSupported', function () { + it('returns a boolean', function (done) { + const client = Client() + client.connect(function (err) { + if (err) return done(err) + const supported = client.pipelineModeSupported() + assert.strictEqual(typeof supported, 'boolean') + client.end(done) + }) + }) + }) + + describe('pipelineMode property', function () { + it('returns false when not enabled', function (done) { + const client = Client() + client.connect(function (err) { + if (err) return done(err) + assert.strictEqual(client.pipelineMode, false) + client.end(done) + }) + }) + + it('returns true when enabled', function (done) { + const client = Client({ pipelineMode: true }) + // Check property before connect + assert.strictEqual(client.pipelineMode, true) + + client.connect(function (err) { + // If pipeline mode is not supported, skip the test + if (err && err.message.includes('Pipeline mode is not supported')) { + console.log('Pipeline mode not supported, skipping test') + return done() + } + if (err) return done(err) + assert.strictEqual(client.pipelineMode, true) + client.end(done) + }) + }) + }) + + // Skip pipeline operation tests if not supported + describe('pipeline operations', function () { + let supported = false + + before(function (done) { + const testClient = Client() + testClient.connect(function (err) { + if (err) return done(err) + supported = testClient.pipelineModeSupported() + const serverVersion = testClient.pq.serverVersion() + testClient.end(function () { + if (!supported) { + console.log('Pipeline mode not supported by client library. Skipping pipeline tests.') + } else if (serverVersion < 140000) { + console.log( + `Pipeline mode not supported by server (version: ${serverVersion}, requires 140000+). Skipping pipeline tests.` + ) + supported = false + } + done() + }) + }) + }) + + beforeEach(function () { + if (!supported) { + this.skip() + } + }) + + it('can execute a simple query in pipeline mode', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + client.query('SELECT 1 as num', function (err, rows) { + if (err) { + client.end() + return done(err) + } + assert.strictEqual(rows.length, 1) + assert.strictEqual(rows[0].num, 1) + client.end(done) + }) + }) + }) + + it('can execute a query with parameters in pipeline mode', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + client.query('SELECT $1::int as num', [42], function (err, rows) { + if (err) { + client.end() + return done(err) + } + assert.strictEqual(rows.length, 1) + assert.strictEqual(rows[0].num, 42) + client.end(done) + }) + }) + }) + + it('can execute multiple queries concurrently', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + + let completed = 0 + const results = [] + + const checkDone = function () { + completed++ + if (completed === 3) { + // All queries completed + assert.strictEqual(results.length, 3) + // Results should be in order + results.sort((a, b) => a.index - b.index) + assert.strictEqual(results[0].value, 1) + assert.strictEqual(results[1].value, 2) + assert.strictEqual(results[2].value, 3) + client.end(done) + } + } + + client.query('SELECT $1::int as num', [1], function (err, rows) { + if (err) return done(err) + results.push({ index: 0, value: rows[0].num }) + checkDone() + }) + + client.query('SELECT $1::int as num', [2], function (err, rows) { + if (err) return done(err) + results.push({ index: 1, value: rows[0].num }) + checkDone() + }) + + client.query('SELECT $1::int as num', [3], function (err, rows) { + if (err) return done(err) + results.push({ index: 2, value: rows[0].num }) + checkDone() + }) + }) + }) + + it('handles query errors in pipeline mode', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + // Division by zero error + client.query('SELECT 1/0', function (err, rows) { + assert(err instanceof Error, 'Should return an error') + assert(err.message.includes('division by zero'), 'Error should mention division by zero') + client.end(done) + }) + }) + }) + + it('rejects multi-statement queries in pipeline mode', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + client.query('SELECT 1; SELECT 2', function (err) { + assert(err instanceof Error, 'Should return an error') + assert(err.message.includes('Multi-statement'), 'Error should mention multi-statement') + client.end(done) + }) + }) + }) + + it('rejects COPY operations in pipeline mode', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + client.query('COPY (SELECT 1) TO STDOUT', function (err) { + assert(err instanceof Error, 'Should return an error') + assert(err.message.includes('COPY'), 'Error should mention COPY') + client.end(done) + }) + }) + }) + + it('is still usable after an error', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + // First, cause an error + client.query('SELECT 1/0', function (err) { + assert(err instanceof Error, 'Should return an error') + // Then, execute a valid query + client.query('SELECT 1 as num', function (err, rows) { + if (err) { + client.end() + return done(err) + } + assert.strictEqual(rows.length, 1) + assert.strictEqual(rows[0].num, 1) + client.end(done) + }) + }) + }) + }) + + it('waits for pending queries on end', function (done) { + const client = Client({ pipelineMode: true }) + client.connect(function (err) { + if (err) return done(err) + + let queryCompleted = false + + client.query('SELECT pg_sleep(0.1), 1 as num', function (err, rows) { + if (err) return done(err) + queryCompleted = true + assert.strictEqual(rows[0].num, 1) + }) + + // Call end immediately - it should wait for the query + client.end(function () { + assert.strictEqual(queryCompleted, true, 'Query should have completed before end') + done() + }) + }) + }) + }) + + describe('error handling when not supported', function () { + it('returns error when pipeline mode not supported', function (done) { + // Create a client with a mock pq that doesn't support pipeline mode + const client = Client({ pipelineMode: true }) + + // Override pipelineModeSupported to return false + const originalSupported = client.pipelineModeSupported.bind(client) + client.pipelineModeSupported = function () { + return false + } + + client.connect(function (err) { + // Restore original function + client.pipelineModeSupported = originalSupported + + assert(err instanceof Error, 'Should return an error') + assert( + err.message.includes('Pipeline mode is not supported'), + 'Error should mention pipeline mode not supported' + ) + done() + }) + }) + }) +})