From e2f61ec7fb944612511224f1028eeac62bc801e4 Mon Sep 17 00:00:00 2001 From: francesco Date: Wed, 4 Feb 2026 15:55:48 +0100 Subject: [PATCH 1/6] feat: pipeline --- README.md | 93 ++++++++++++++++++++++ index.js | 64 +++++++++++++++ src/addon.cc | 9 +++ src/addon.h | 4 + src/connection.cc | 53 +++++++++++++ src/connection.h | 7 ++ test/pipeline-mode.js | 178 ++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 408 insertions(+) create mode 100644 test/pipeline-mode.js diff --git a/README.md b/README.md index 97a82b1..7f0882c 100644 --- a/README.md +++ b/README.md @@ -310,6 +310,99 @@ Issues a request to cancel the currently executing query _on this instance of li Returns the version of the connected PostgreSQL backend server as a number. +### Pipeline Mode (PostgreSQL 14+) + +Pipeline mode allows sending multiple queries to the server without waiting for results, significantly reducing round-trip latency. These functions are only available when compiled against PostgreSQL 14 or later client libraries. + +##### `pq.pipelineModeSupported():boolean` + +Returns `true` if pipeline mode is supported (compiled against PostgreSQL 14+), `false` otherwise. + +##### `pq.enterPipelineMode():boolean` + +Enters pipeline mode on the connection. In pipeline mode, you can send multiple queries using the async send functions (`sendQuery`, `sendQueryParams`, etc.) without waiting for results. + +Returns `true` if successful, `false` if failed. Throws an error if pipeline mode is not supported. + +##### `pq.exitPipelineMode():boolean` + +Exits pipeline mode. Can only be called when the pipeline is empty (all results have been processed). + +Returns `true` if successful, `false` if failed. + +##### `pq.pipelineStatus():int` + +Returns the current pipeline status: +- `PQ.PIPELINE_OFF` (0): Not in pipeline mode +- `PQ.PIPELINE_ON` (1): In pipeline mode +- `PQ.PIPELINE_ABORTED` (2): Pipeline aborted due to error + +##### `pq.pipelineSync():boolean` + +Sends a synchronization point in the pipeline. The server will process all queries up to this point and send their results before processing any further queries. This is essential for error handling - if a query fails, all subsequent queries until the next sync point are skipped. + +Returns `true` if successful, `false` if failed. + +##### `pq.sendFlushRequest():boolean` + +Sends a request for the server to flush its output buffer. Useful when you want to receive results before sending a sync. + +Returns `true` if successful, `false` if failed. + +#### Pipeline Mode Constants + +```js +PQ.PIPELINE_OFF // 0 - Not in pipeline mode +PQ.PIPELINE_ON // 1 - In pipeline mode +PQ.PIPELINE_ABORTED // 2 - Pipeline aborted due to error +``` + +#### Pipeline Mode Example + +```js +var PQ = require('libpq'); +var pq = new PQ(); +pq.connectSync(); + +if (!pq.pipelineModeSupported()) { + console.log('Pipeline mode requires PostgreSQL 14+ client libraries'); + process.exit(1); +} + +// Enter pipeline mode +pq.enterPipelineMode(); +pq.setNonBlocking(true); + +// Send multiple queries without waiting +pq.sendQueryParams('SELECT $1::int', ['1']); +pq.sendQueryParams('SELECT $1::int', ['2']); +pq.sendQueryParams('SELECT $1::int', ['3']); + +// Mark end of pipeline batch +pq.pipelineSync(); +pq.flush(); + +// Process results asynchronously +pq.on('readable', function() { + pq.consumeInput(); + while (!pq.isBusy()) { + if (!pq.getResult()) break; + + var status = pq.resultStatus(); + if (status === 'PGRES_TUPLES_OK') { + console.log('Result:', pq.getvalue(0, 0)); + } else if (status === 'PGRES_PIPELINE_SYNC') { + // All results received + pq.stopReader(); + pq.exitPipelineMode(); + pq.finish(); + } + } +}); + +pq.startReader(); +``` + ## testing ```sh diff --git a/index.js b/index.js index 383a585..0f94140 100644 --- a/index.js +++ b/index.js @@ -396,3 +396,67 @@ PQ.prototype.getCopyData = function (async) { PQ.prototype.cancel = function () { return this.$cancel(); }; + +// Pipeline mode functions (PostgreSQL 14+) +// These functions are only available if compiled against PostgreSQL 14 or later + +//Enters pipeline mode on the connection +//Returns true if successful, false if failed +//Pipeline mode allows sending multiple queries without waiting for results +PQ.prototype.enterPipelineMode = function () { + if (typeof this.$enterPipelineMode !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$enterPipelineMode(); +}; + +//Exits pipeline mode on the connection +//Returns true if successful, false if failed +//Can only exit pipeline mode when the queue is empty and no pending results +PQ.prototype.exitPipelineMode = function () { + if (typeof this.$exitPipelineMode !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$exitPipelineMode(); +}; + +//Returns the current pipeline status +//0 = PQ_PIPELINE_OFF (not in pipeline mode) +//1 = PQ_PIPELINE_ON (in pipeline mode) +//2 = PQ_PIPELINE_ABORTED (pipeline aborted due to error) +PQ.prototype.pipelineStatus = function () { + if (typeof this.$pipelineStatus !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$pipelineStatus(); +}; + +//Sends a sync message in pipeline mode +//This marks a synchronization point - the server will send results +//for all queries up to this point before processing further queries +//Returns true if successful, false if failed +PQ.prototype.pipelineSync = function () { + if (typeof this.$pipelineSync !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$pipelineSync(); +}; + +//Sends a request for the server to flush its output buffer +//Returns true if successful, false if failed +PQ.prototype.sendFlushRequest = function () { + if (typeof this.$sendFlushRequest !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$sendFlushRequest(); +}; + +//Check if pipeline mode is supported (compiled against PostgreSQL 14+) +PQ.prototype.pipelineModeSupported = function () { + return typeof this.$enterPipelineMode === 'function'; +}; + +// Pipeline status constants +PQ.PIPELINE_OFF = 0; +PQ.PIPELINE_ON = 1; +PQ.PIPELINE_ABORTED = 2; diff --git a/src/addon.cc b/src/addon.cc index c8c6711..e9433c2 100644 --- a/src/addon.cc +++ b/src/addon.cc @@ -74,6 +74,15 @@ NAN_MODULE_INIT(InitAddon) { //Cancel Nan::SetPrototypeMethod(tpl, "$cancel", Connection::Cancel); +#ifdef PIPELINE_MODE_SUPPORTED + //Pipeline mode (PostgreSQL 14+) + Nan::SetPrototypeMethod(tpl, "$enterPipelineMode", Connection::EnterPipelineMode); + Nan::SetPrototypeMethod(tpl, "$exitPipelineMode", Connection::ExitPipelineMode); + Nan::SetPrototypeMethod(tpl, "$pipelineStatus", Connection::PipelineStatus); + Nan::SetPrototypeMethod(tpl, "$pipelineSync", Connection::PipelineSync); + Nan::SetPrototypeMethod(tpl, "$sendFlushRequest", Connection::SendFlushRequest); +#endif + Nan::Set(target, Nan::New("PQ").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked()); } diff --git a/src/addon.h b/src/addon.h index f5dbb3c..9448e47 100644 --- a/src/addon.h +++ b/src/addon.h @@ -13,6 +13,10 @@ #define MORE_ERROR_FIELDS_SUPPORTED #endif +#if PG_VERSION_NUM >= 140000 +#define PIPELINE_MODE_SUPPORTED +#endif + #include "connection.h" #include "connect-async-worker.h" diff --git a/src/connection.cc b/src/connection.cc index 5021e73..7d1ea43 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -727,6 +727,59 @@ NAN_METHOD(Connection::Cancel) { delete[] errBuff; } +#ifdef PIPELINE_MODE_SUPPORTED +NAN_METHOD(Connection::EnterPipelineMode) { + TRACE("Connection::EnterPipelineMode"); + + Connection* self = NODE_THIS(); + + int result = PQenterPipelineMode(self->pq); + + info.GetReturnValue().Set(result == 1); +} + +NAN_METHOD(Connection::ExitPipelineMode) { + TRACE("Connection::ExitPipelineMode"); + + Connection* self = NODE_THIS(); + + int result = PQexitPipelineMode(self->pq); + + info.GetReturnValue().Set(result == 1); +} + +NAN_METHOD(Connection::PipelineStatus) { + TRACE("Connection::PipelineStatus"); + + Connection* self = NODE_THIS(); + + PGpipelineStatus status = PQpipelineStatus(self->pq); + + // PQ_PIPELINE_OFF = 0, PQ_PIPELINE_ON = 1, PQ_PIPELINE_ABORTED = 2 + info.GetReturnValue().Set(static_cast(status)); +} + +NAN_METHOD(Connection::PipelineSync) { + TRACE("Connection::PipelineSync"); + + Connection* self = NODE_THIS(); + + int result = PQpipelineSync(self->pq); + + info.GetReturnValue().Set(result == 1); +} + +NAN_METHOD(Connection::SendFlushRequest) { + TRACE("Connection::SendFlushRequest"); + + Connection* self = NODE_THIS(); + + int result = PQsendFlushRequest(self->pq); + + info.GetReturnValue().Set(result == 1); +} +#endif + bool Connection::ConnectDB(const char* paramString) { TRACEF("Connection::ConnectDB:Connection parameters: %s\n", paramString); this->pq = PQconnectdb(paramString); diff --git a/src/connection.h b/src/connection.h index 3a2bb89..a61fbaf 100644 --- a/src/connection.h +++ b/src/connection.h @@ -56,6 +56,13 @@ class Connection : public Nan::ObjectWrap { static NAN_METHOD(PutCopyEnd); static NAN_METHOD(GetCopyData); static NAN_METHOD(Cancel); +#ifdef PIPELINE_MODE_SUPPORTED + static NAN_METHOD(EnterPipelineMode); + static NAN_METHOD(ExitPipelineMode); + static NAN_METHOD(PipelineStatus); + static NAN_METHOD(PipelineSync); + static NAN_METHOD(SendFlushRequest); +#endif bool ConnectDB(const char* paramString); void InitPollSocket(); diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js new file mode 100644 index 0000000..365bc35 --- /dev/null +++ b/test/pipeline-mode.js @@ -0,0 +1,178 @@ +var PQ = require('../'); +var assert = require('assert'); + +describe('pipeline mode', function () { + var pq; + + beforeEach(function () { + pq = new PQ(); + pq.connectSync(); + }); + + afterEach(function () { + if (pq.connected) { + pq.finish(); + } + }); + + describe('pipelineModeSupported', function () { + it('returns a boolean', function () { + var supported = pq.pipelineModeSupported(); + assert.strictEqual(typeof supported, 'boolean'); + }); + }); + + // Skip pipeline tests if not supported (PostgreSQL < 14) + describe('pipeline operations', function () { + beforeEach(function () { + if (!pq.pipelineModeSupported()) { + this.skip(); + } + }); + + it('starts in non-pipeline mode', function () { + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_OFF); + }); + + it('can enter pipeline mode', function () { + var result = pq.enterPipelineMode(); + assert.strictEqual(result, true); + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_ON); + }); + + it('can exit pipeline mode', function () { + pq.enterPipelineMode(); + var result = pq.exitPipelineMode(); + assert.strictEqual(result, true); + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_OFF); + }); + + it('can send multiple queries in pipeline mode', function (done) { + pq.enterPipelineMode(); + pq.setNonBlocking(true); + + // Send multiple queries + assert.strictEqual(pq.sendQueryParams('SELECT $1::int as num', ['1']), true); + assert.strictEqual(pq.sendQueryParams('SELECT $1::int as num', ['2']), true); + assert.strictEqual(pq.sendQueryParams('SELECT $1::int as num', ['3']), true); + + // Send sync to mark end of pipeline + assert.strictEqual(pq.pipelineSync(), true); + + // Flush the queries + pq.flush(); + + // Read results + var results = []; + + pq.on('readable', function () { + if (!pq.consumeInput()) { + done(new Error('consumeInput failed')); + return; + } + + while (!pq.isBusy()) { + if (!pq.getResult()) { + break; + } + + var status = pq.resultStatus(); + if (status === 'PGRES_TUPLES_OK') { + results.push(parseInt(pq.getvalue(0, 0), 10)); + } else if (status === 'PGRES_PIPELINE_SYNC') { + // Pipeline sync received, we're done + pq.stopReader(); + assert.deepStrictEqual(results, [1, 2, 3]); + pq.exitPipelineMode(); + done(); + return; + } + } + }); + + pq.startReader(); + }); + + it('handles errors in pipeline mode', function (done) { + pq.enterPipelineMode(); + pq.setNonBlocking(true); + + // Send a valid query + pq.sendQueryParams('SELECT $1::int as num', ['1']); + // Send an invalid query (will cause error) + pq.sendQuery('SELECT * FROM nonexistent_table_xyz'); + // Send another valid query + pq.sendQueryParams('SELECT $1::int as num', ['3']); + // Send sync + pq.pipelineSync(); + pq.flush(); + + var gotError = false; + var gotAborted = false; + + pq.on('readable', function () { + if (!pq.consumeInput()) { + done(new Error('consumeInput failed')); + return; + } + + while (!pq.isBusy()) { + if (!pq.getResult()) { + break; + } + + var status = pq.resultStatus(); + if (status === 'PGRES_FATAL_ERROR') { + gotError = true; + // After error, pipeline should be aborted + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_ABORTED); + gotAborted = true; + } else if (status === 'PGRES_PIPELINE_SYNC') { + pq.stopReader(); + assert.strictEqual(gotError, true, 'Should have received an error'); + assert.strictEqual(gotAborted, true, 'Pipeline should have been aborted'); + pq.exitPipelineMode(); + done(); + return; + } + } + }); + + pq.startReader(); + }); + + it('sendFlushRequest works', function () { + pq.enterPipelineMode(); + pq.setNonBlocking(true); + pq.sendQueryParams('SELECT 1', []); + var result = pq.sendFlushRequest(); + assert.strictEqual(result, true); + pq.pipelineSync(); + pq.exitPipelineMode(); + }); + }); + + describe('pipeline constants', function () { + it('exports pipeline status constants', function () { + assert.strictEqual(PQ.PIPELINE_OFF, 0); + assert.strictEqual(PQ.PIPELINE_ON, 1); + assert.strictEqual(PQ.PIPELINE_ABORTED, 2); + }); + }); + + describe('error handling when not supported', function () { + it('throws helpful error when pipeline mode not supported', function () { + // Create a mock PQ without pipeline methods + var mockPQ = { + $enterPipelineMode: undefined + }; + + // Bind the prototype method to our mock + var enterFn = PQ.prototype.enterPipelineMode.bind(mockPQ); + + assert.throws(function () { + enterFn(); + }, /Pipeline mode is not supported/); + }); + }); +}); From 262f6ad5a233a8728ff3581959bc9cac1bb82621 Mon Sep 17 00:00:00 2001 From: francesco Date: Wed, 4 Feb 2026 15:59:21 +0100 Subject: [PATCH 2/6] fix: test pipeline --- test/pipeline-mode.js | 66 +++++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js index 365bc35..e9c11e6 100644 --- a/test/pipeline-mode.js +++ b/test/pipeline-mode.js @@ -2,6 +2,8 @@ var PQ = require('../'); var assert = require('assert'); describe('pipeline mode', function () { + this.timeout(5000); // Increase timeout for async tests + var pq; beforeEach(function () { @@ -64,15 +66,14 @@ describe('pipeline mode', function () { // Read results var results = []; + var finished = false; - pq.on('readable', function () { - if (!pq.consumeInput()) { - done(new Error('consumeInput failed')); - return; - } - + var processResults = function() { + // Keep reading results while not busy while (!pq.isBusy()) { - if (!pq.getResult()) { + var hasResult = pq.getResult(); + if (!hasResult) { + // No more results available right now break; } @@ -81,13 +82,28 @@ describe('pipeline mode', function () { results.push(parseInt(pq.getvalue(0, 0), 10)); } else if (status === 'PGRES_PIPELINE_SYNC') { // Pipeline sync received, we're done + finished = true; pq.stopReader(); + pq.removeAllListeners('readable'); assert.deepStrictEqual(results, [1, 2, 3]); pq.exitPipelineMode(); done(); return; } + // PGRES_COMMAND_OK and other statuses are ignored } + }; + + pq.on('readable', function () { + if (finished) return; + + if (!pq.consumeInput()) { + pq.stopReader(); + done(new Error('consumeInput failed: ' + pq.errorMessage())); + return; + } + + processResults(); }); pq.startReader(); @@ -100,7 +116,7 @@ describe('pipeline mode', function () { // Send a valid query pq.sendQueryParams('SELECT $1::int as num', ['1']); // Send an invalid query (will cause error) - pq.sendQuery('SELECT * FROM nonexistent_table_xyz'); + pq.sendQuery('SELECT * FROM nonexistent_table_xyz_12345'); // Send another valid query pq.sendQueryParams('SELECT $1::int as num', ['3']); // Send sync @@ -108,16 +124,12 @@ describe('pipeline mode', function () { pq.flush(); var gotError = false; - var gotAborted = false; - - pq.on('readable', function () { - if (!pq.consumeInput()) { - done(new Error('consumeInput failed')); - return; - } + var finished = false; + var processResults = function() { while (!pq.isBusy()) { - if (!pq.getResult()) { + var hasResult = pq.getResult(); + if (!hasResult) { break; } @@ -126,16 +138,28 @@ describe('pipeline mode', function () { gotError = true; // After error, pipeline should be aborted assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_ABORTED); - gotAborted = true; } else if (status === 'PGRES_PIPELINE_SYNC') { + finished = true; pq.stopReader(); + pq.removeAllListeners('readable'); assert.strictEqual(gotError, true, 'Should have received an error'); - assert.strictEqual(gotAborted, true, 'Pipeline should have been aborted'); pq.exitPipelineMode(); done(); return; } } + }; + + pq.on('readable', function () { + if (finished) return; + + if (!pq.consumeInput()) { + pq.stopReader(); + done(new Error('consumeInput failed: ' + pq.errorMessage())); + return; + } + + processResults(); }); pq.startReader(); @@ -148,6 +172,12 @@ describe('pipeline mode', function () { var result = pq.sendFlushRequest(); assert.strictEqual(result, true); pq.pipelineSync(); + // Need to consume results before exiting pipeline mode + pq.flush(); + // Read and discard results synchronously + while (pq.getResult()) { + // consume all results + } pq.exitPipelineMode(); }); }); From 26a39c7767a3d974ccb42f074a67fa545a747b6b Mon Sep 17 00:00:00 2001 From: francesco Date: Wed, 4 Feb 2026 16:54:44 +0100 Subject: [PATCH 3/6] fix: test --- .github/workflows/ci.yaml | 2 +- test/pipeline-mode.js | 137 ++++++++++++++++++++++---------------- 2 files changed, 80 insertions(+), 59 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4d60441..fbe47bd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest services: postgres: - image: postgres:11 + image: postgres:14 env: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js index e9c11e6..e89e1d0 100644 --- a/test/pipeline-mode.js +++ b/test/pipeline-mode.js @@ -2,7 +2,7 @@ var PQ = require('../'); var assert = require('assert'); describe('pipeline mode', function () { - this.timeout(5000); // Increase timeout for async tests + this.timeout(10000); var pq; @@ -24,10 +24,29 @@ describe('pipeline mode', function () { }); }); - // Skip pipeline tests if not supported (PostgreSQL < 14) + // Skip pipeline tests if not supported + // Pipeline mode requires BOTH: + // 1. Client library compiled with PostgreSQL 14+ (pipelineModeSupported) + // 2. Server version 14+ (serverVersion >= 140000) describe('pipeline operations', function () { - beforeEach(function () { - if (!pq.pipelineModeSupported()) { + before(function () { + // Check if pipeline mode is supported using a temporary connection + var testPq = new PQ(); + testPq.connectSync(); + var clientSupported = testPq.pipelineModeSupported(); + var serverVersion = testPq.serverVersion(); + testPq.finish(); + + // Pipeline mode requires PostgreSQL 14+ on both client and server + var serverSupported = serverVersion >= 140000; + + if (!clientSupported) { + console.log('Pipeline mode not supported by client library. Skipping pipeline tests.'); + this.skip(); + } + + if (!serverSupported) { + console.log('Pipeline mode not supported by server (version: ' + serverVersion + ', requires 140000+). Skipping pipeline tests.'); this.skip(); } }); @@ -63,50 +82,54 @@ describe('pipeline mode', function () { // Flush the queries pq.flush(); - - // Read results + + // Read results using polling approach var results = []; - var finished = false; - var processResults = function() { - // Keep reading results while not busy + var readResults = function() { + // Consume any available input + pq.consumeInput(); + + // Process all available results while (!pq.isBusy()) { var hasResult = pq.getResult(); if (!hasResult) { - // No more results available right now break; } var status = pq.resultStatus(); + if (status === 'PGRES_TUPLES_OK') { results.push(parseInt(pq.getvalue(0, 0), 10)); } else if (status === 'PGRES_PIPELINE_SYNC') { - // Pipeline sync received, we're done - finished = true; - pq.stopReader(); - pq.removeAllListeners('readable'); - assert.deepStrictEqual(results, [1, 2, 3]); - pq.exitPipelineMode(); - done(); - return; + return true; } - // PGRES_COMMAND_OK and other statuses are ignored } + return false; }; - pq.on('readable', function () { - if (finished) return; + // Poll for results + var attempts = 0; + var maxAttempts = 100; + + var poll = function() { + attempts++; + if (readResults()) { + assert.deepStrictEqual(results, [1, 2, 3]); + pq.exitPipelineMode(); + done(); + return; + } - if (!pq.consumeInput()) { - pq.stopReader(); - done(new Error('consumeInput failed: ' + pq.errorMessage())); + if (attempts >= maxAttempts) { + done(new Error('Timeout waiting for pipeline results. Got: ' + JSON.stringify(results))); return; } - - processResults(); - }); - - pq.startReader(); + + setTimeout(poll, 50); + }; + + poll(); }); it('handles errors in pipeline mode', function (done) { @@ -117,16 +140,17 @@ describe('pipeline mode', function () { pq.sendQueryParams('SELECT $1::int as num', ['1']); // Send an invalid query (will cause error) pq.sendQuery('SELECT * FROM nonexistent_table_xyz_12345'); - // Send another valid query + // Send another valid query (will be skipped due to error) pq.sendQueryParams('SELECT $1::int as num', ['3']); // Send sync pq.pipelineSync(); pq.flush(); var gotError = false; - var finished = false; - var processResults = function() { + var readResults = function() { + pq.consumeInput(); + while (!pq.isBusy()) { var hasResult = pq.getResult(); if (!hasResult) { @@ -134,51 +158,48 @@ describe('pipeline mode', function () { } var status = pq.resultStatus(); + if (status === 'PGRES_FATAL_ERROR') { gotError = true; - // After error, pipeline should be aborted assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_ABORTED); } else if (status === 'PGRES_PIPELINE_SYNC') { - finished = true; - pq.stopReader(); - pq.removeAllListeners('readable'); - assert.strictEqual(gotError, true, 'Should have received an error'); - pq.exitPipelineMode(); - done(); - return; + return true; } } + return false; }; - pq.on('readable', function () { - if (finished) return; + var attempts = 0; + var maxAttempts = 100; + + var poll = function() { + attempts++; + if (readResults()) { + assert.strictEqual(gotError, true, 'Should have received an error'); + pq.exitPipelineMode(); + done(); + return; + } - if (!pq.consumeInput()) { - pq.stopReader(); - done(new Error('consumeInput failed: ' + pq.errorMessage())); + if (attempts >= maxAttempts) { + done(new Error('Timeout waiting for pipeline error results')); return; } - - processResults(); - }); - - pq.startReader(); + + setTimeout(poll, 50); + }; + + poll(); }); it('sendFlushRequest works', function () { pq.enterPipelineMode(); pq.setNonBlocking(true); - pq.sendQueryParams('SELECT 1', []); + pq.sendQueryParams('SELECT 1 as num', []); var result = pq.sendFlushRequest(); assert.strictEqual(result, true); + // Just verify the function works, don't need to read results pq.pipelineSync(); - // Need to consume results before exiting pipeline mode - pq.flush(); - // Read and discard results synchronously - while (pq.getResult()) { - // consume all results - } - pq.exitPipelineMode(); }); }); From dbcd4a960bcf0c007d8327426a472a2ea10f6f2e Mon Sep 17 00:00:00 2001 From: francesco Date: Wed, 4 Feb 2026 16:58:35 +0100 Subject: [PATCH 4/6] fix: test --- test/pipeline-mode.js | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js index e89e1d0..da3c8db 100644 --- a/test/pipeline-mode.js +++ b/test/pipeline-mode.js @@ -147,6 +147,7 @@ describe('pipeline mode', function () { pq.flush(); var gotError = false; + var statuses = []; var readResults = function() { pq.consumeInput(); @@ -158,11 +159,19 @@ describe('pipeline mode', function () { } var status = pq.resultStatus(); + statuses.push(status); + // Check for error status or pipeline aborted state if (status === 'PGRES_FATAL_ERROR') { gotError = true; - assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_ABORTED); - } else if (status === 'PGRES_PIPELINE_SYNC') { + } + + // Also check if pipeline became aborted (error occurred) + if (pq.pipelineStatus() === PQ.PIPELINE_ABORTED) { + gotError = true; + } + + if (status === 'PGRES_PIPELINE_SYNC') { return true; } } @@ -175,14 +184,15 @@ describe('pipeline mode', function () { var poll = function() { attempts++; if (readResults()) { - assert.strictEqual(gotError, true, 'Should have received an error'); + // Either we got an explicit error or the pipeline was aborted + assert.strictEqual(gotError, true, 'Should have received an error. Statuses: ' + statuses.join(', ')); pq.exitPipelineMode(); done(); return; } if (attempts >= maxAttempts) { - done(new Error('Timeout waiting for pipeline error results')); + done(new Error('Timeout waiting for pipeline error results. Statuses: ' + statuses.join(', '))); return; } From a830edd10e550599edeb25b5be53208c9377454f Mon Sep 17 00:00:00 2001 From: francesco Date: Wed, 4 Feb 2026 17:01:01 +0100 Subject: [PATCH 5/6] fix: force fail --- test/pipeline-mode.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js index da3c8db..7ed3461 100644 --- a/test/pipeline-mode.js +++ b/test/pipeline-mode.js @@ -138,8 +138,8 @@ describe('pipeline mode', function () { // Send a valid query pq.sendQueryParams('SELECT $1::int as num', ['1']); - // Send an invalid query (will cause error) - pq.sendQuery('SELECT * FROM nonexistent_table_xyz_12345'); + // Send an invalid query (syntax error will definitely fail) + pq.sendQuery('SELEC INVALID SYNTAX'); // Send another valid query (will be skipped due to error) pq.sendQueryParams('SELECT $1::int as num', ['3']); // Send sync From 0ef1577f57db52a967821bafad50e93e919111ac Mon Sep 17 00:00:00 2001 From: francesco Date: Wed, 4 Feb 2026 17:06:39 +0100 Subject: [PATCH 6/6] fix: try another error --- test/pipeline-mode.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js index 7ed3461..4dc9ba7 100644 --- a/test/pipeline-mode.js +++ b/test/pipeline-mode.js @@ -138,8 +138,8 @@ describe('pipeline mode', function () { // Send a valid query pq.sendQueryParams('SELECT $1::int as num', ['1']); - // Send an invalid query (syntax error will definitely fail) - pq.sendQuery('SELEC INVALID SYNTAX'); + // Send an invalid query - division by zero error + pq.sendQueryParams('SELECT 1/0', []); // Send another valid query (will be skipped due to error) pq.sendQueryParams('SELECT $1::int as num', ['3']); // Send sync