diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4d60441..fbe47bd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest services: postgres: - image: postgres:11 + image: postgres:14 env: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres diff --git a/README.md b/README.md index 97a82b1..7f0882c 100644 --- a/README.md +++ b/README.md @@ -310,6 +310,99 @@ Issues a request to cancel the currently executing query _on this instance of li Returns the version of the connected PostgreSQL backend server as a number. +### Pipeline Mode (PostgreSQL 14+) + +Pipeline mode allows sending multiple queries to the server without waiting for results, significantly reducing round-trip latency. These functions are only available when compiled against PostgreSQL 14 or later client libraries. + +##### `pq.pipelineModeSupported():boolean` + +Returns `true` if pipeline mode is supported (compiled against PostgreSQL 14+), `false` otherwise. + +##### `pq.enterPipelineMode():boolean` + +Enters pipeline mode on the connection. In pipeline mode, you can send multiple queries using the async send functions (`sendQuery`, `sendQueryParams`, etc.) without waiting for results. + +Returns `true` if successful, `false` if failed. Throws an error if pipeline mode is not supported. + +##### `pq.exitPipelineMode():boolean` + +Exits pipeline mode. Can only be called when the pipeline is empty (all results have been processed). + +Returns `true` if successful, `false` if failed. + +##### `pq.pipelineStatus():int` + +Returns the current pipeline status: +- `PQ.PIPELINE_OFF` (0): Not in pipeline mode +- `PQ.PIPELINE_ON` (1): In pipeline mode +- `PQ.PIPELINE_ABORTED` (2): Pipeline aborted due to error + +##### `pq.pipelineSync():boolean` + +Sends a synchronization point in the pipeline. The server will process all queries up to this point and send their results before processing any further queries. This is essential for error handling - if a query fails, all subsequent queries until the next sync point are skipped. + +Returns `true` if successful, `false` if failed. + +##### `pq.sendFlushRequest():boolean` + +Sends a request for the server to flush its output buffer. Useful when you want to receive results before sending a sync. + +Returns `true` if successful, `false` if failed. + +#### Pipeline Mode Constants + +```js +PQ.PIPELINE_OFF // 0 - Not in pipeline mode +PQ.PIPELINE_ON // 1 - In pipeline mode +PQ.PIPELINE_ABORTED // 2 - Pipeline aborted due to error +``` + +#### Pipeline Mode Example + +```js +var PQ = require('libpq'); +var pq = new PQ(); +pq.connectSync(); + +if (!pq.pipelineModeSupported()) { + console.log('Pipeline mode requires PostgreSQL 14+ client libraries'); + process.exit(1); +} + +// Enter pipeline mode +pq.enterPipelineMode(); +pq.setNonBlocking(true); + +// Send multiple queries without waiting +pq.sendQueryParams('SELECT $1::int', ['1']); +pq.sendQueryParams('SELECT $1::int', ['2']); +pq.sendQueryParams('SELECT $1::int', ['3']); + +// Mark end of pipeline batch +pq.pipelineSync(); +pq.flush(); + +// Process results asynchronously +pq.on('readable', function() { + pq.consumeInput(); + while (!pq.isBusy()) { + if (!pq.getResult()) break; + + var status = pq.resultStatus(); + if (status === 'PGRES_TUPLES_OK') { + console.log('Result:', pq.getvalue(0, 0)); + } else if (status === 'PGRES_PIPELINE_SYNC') { + // All results received + pq.stopReader(); + pq.exitPipelineMode(); + pq.finish(); + } + } +}); + +pq.startReader(); +``` + ## testing ```sh diff --git a/index.js b/index.js index 383a585..0f94140 100644 --- a/index.js +++ b/index.js @@ -396,3 +396,67 @@ PQ.prototype.getCopyData = function (async) { PQ.prototype.cancel = function () { return this.$cancel(); }; + +// Pipeline mode functions (PostgreSQL 14+) +// These functions are only available if compiled against PostgreSQL 14 or later + +//Enters pipeline mode on the connection +//Returns true if successful, false if failed +//Pipeline mode allows sending multiple queries without waiting for results +PQ.prototype.enterPipelineMode = function () { + if (typeof this.$enterPipelineMode !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$enterPipelineMode(); +}; + +//Exits pipeline mode on the connection +//Returns true if successful, false if failed +//Can only exit pipeline mode when the queue is empty and no pending results +PQ.prototype.exitPipelineMode = function () { + if (typeof this.$exitPipelineMode !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$exitPipelineMode(); +}; + +//Returns the current pipeline status +//0 = PQ_PIPELINE_OFF (not in pipeline mode) +//1 = PQ_PIPELINE_ON (in pipeline mode) +//2 = PQ_PIPELINE_ABORTED (pipeline aborted due to error) +PQ.prototype.pipelineStatus = function () { + if (typeof this.$pipelineStatus !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$pipelineStatus(); +}; + +//Sends a sync message in pipeline mode +//This marks a synchronization point - the server will send results +//for all queries up to this point before processing further queries +//Returns true if successful, false if failed +PQ.prototype.pipelineSync = function () { + if (typeof this.$pipelineSync !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$pipelineSync(); +}; + +//Sends a request for the server to flush its output buffer +//Returns true if successful, false if failed +PQ.prototype.sendFlushRequest = function () { + if (typeof this.$sendFlushRequest !== 'function') { + throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'); + } + return this.$sendFlushRequest(); +}; + +//Check if pipeline mode is supported (compiled against PostgreSQL 14+) +PQ.prototype.pipelineModeSupported = function () { + return typeof this.$enterPipelineMode === 'function'; +}; + +// Pipeline status constants +PQ.PIPELINE_OFF = 0; +PQ.PIPELINE_ON = 1; +PQ.PIPELINE_ABORTED = 2; diff --git a/src/addon.cc b/src/addon.cc index c8c6711..e9433c2 100644 --- a/src/addon.cc +++ b/src/addon.cc @@ -74,6 +74,15 @@ NAN_MODULE_INIT(InitAddon) { //Cancel Nan::SetPrototypeMethod(tpl, "$cancel", Connection::Cancel); +#ifdef PIPELINE_MODE_SUPPORTED + //Pipeline mode (PostgreSQL 14+) + Nan::SetPrototypeMethod(tpl, "$enterPipelineMode", Connection::EnterPipelineMode); + Nan::SetPrototypeMethod(tpl, "$exitPipelineMode", Connection::ExitPipelineMode); + Nan::SetPrototypeMethod(tpl, "$pipelineStatus", Connection::PipelineStatus); + Nan::SetPrototypeMethod(tpl, "$pipelineSync", Connection::PipelineSync); + Nan::SetPrototypeMethod(tpl, "$sendFlushRequest", Connection::SendFlushRequest); +#endif + Nan::Set(target, Nan::New("PQ").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked()); } diff --git a/src/addon.h b/src/addon.h index f5dbb3c..9448e47 100644 --- a/src/addon.h +++ b/src/addon.h @@ -13,6 +13,10 @@ #define MORE_ERROR_FIELDS_SUPPORTED #endif +#if PG_VERSION_NUM >= 140000 +#define PIPELINE_MODE_SUPPORTED +#endif + #include "connection.h" #include "connect-async-worker.h" diff --git a/src/connection.cc b/src/connection.cc index 5021e73..7d1ea43 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -727,6 +727,59 @@ NAN_METHOD(Connection::Cancel) { delete[] errBuff; } +#ifdef PIPELINE_MODE_SUPPORTED +NAN_METHOD(Connection::EnterPipelineMode) { + TRACE("Connection::EnterPipelineMode"); + + Connection* self = NODE_THIS(); + + int result = PQenterPipelineMode(self->pq); + + info.GetReturnValue().Set(result == 1); +} + +NAN_METHOD(Connection::ExitPipelineMode) { + TRACE("Connection::ExitPipelineMode"); + + Connection* self = NODE_THIS(); + + int result = PQexitPipelineMode(self->pq); + + info.GetReturnValue().Set(result == 1); +} + +NAN_METHOD(Connection::PipelineStatus) { + TRACE("Connection::PipelineStatus"); + + Connection* self = NODE_THIS(); + + PGpipelineStatus status = PQpipelineStatus(self->pq); + + // PQ_PIPELINE_OFF = 0, PQ_PIPELINE_ON = 1, PQ_PIPELINE_ABORTED = 2 + info.GetReturnValue().Set(static_cast(status)); +} + +NAN_METHOD(Connection::PipelineSync) { + TRACE("Connection::PipelineSync"); + + Connection* self = NODE_THIS(); + + int result = PQpipelineSync(self->pq); + + info.GetReturnValue().Set(result == 1); +} + +NAN_METHOD(Connection::SendFlushRequest) { + TRACE("Connection::SendFlushRequest"); + + Connection* self = NODE_THIS(); + + int result = PQsendFlushRequest(self->pq); + + info.GetReturnValue().Set(result == 1); +} +#endif + bool Connection::ConnectDB(const char* paramString) { TRACEF("Connection::ConnectDB:Connection parameters: %s\n", paramString); this->pq = PQconnectdb(paramString); diff --git a/src/connection.h b/src/connection.h index 3a2bb89..a61fbaf 100644 --- a/src/connection.h +++ b/src/connection.h @@ -56,6 +56,13 @@ class Connection : public Nan::ObjectWrap { static NAN_METHOD(PutCopyEnd); static NAN_METHOD(GetCopyData); static NAN_METHOD(Cancel); +#ifdef PIPELINE_MODE_SUPPORTED + static NAN_METHOD(EnterPipelineMode); + static NAN_METHOD(ExitPipelineMode); + static NAN_METHOD(PipelineStatus); + static NAN_METHOD(PipelineSync); + static NAN_METHOD(SendFlushRequest); +#endif bool ConnectDB(const char* paramString); void InitPollSocket(); diff --git a/test/pipeline-mode.js b/test/pipeline-mode.js new file mode 100644 index 0000000..4dc9ba7 --- /dev/null +++ b/test/pipeline-mode.js @@ -0,0 +1,239 @@ +var PQ = require('../'); +var assert = require('assert'); + +describe('pipeline mode', function () { + this.timeout(10000); + + var pq; + + beforeEach(function () { + pq = new PQ(); + pq.connectSync(); + }); + + afterEach(function () { + if (pq.connected) { + pq.finish(); + } + }); + + describe('pipelineModeSupported', function () { + it('returns a boolean', function () { + var supported = pq.pipelineModeSupported(); + assert.strictEqual(typeof supported, 'boolean'); + }); + }); + + // Skip pipeline tests if not supported + // Pipeline mode requires BOTH: + // 1. Client library compiled with PostgreSQL 14+ (pipelineModeSupported) + // 2. Server version 14+ (serverVersion >= 140000) + describe('pipeline operations', function () { + before(function () { + // Check if pipeline mode is supported using a temporary connection + var testPq = new PQ(); + testPq.connectSync(); + var clientSupported = testPq.pipelineModeSupported(); + var serverVersion = testPq.serverVersion(); + testPq.finish(); + + // Pipeline mode requires PostgreSQL 14+ on both client and server + var serverSupported = serverVersion >= 140000; + + if (!clientSupported) { + console.log('Pipeline mode not supported by client library. Skipping pipeline tests.'); + this.skip(); + } + + if (!serverSupported) { + console.log('Pipeline mode not supported by server (version: ' + serverVersion + ', requires 140000+). Skipping pipeline tests.'); + this.skip(); + } + }); + + it('starts in non-pipeline mode', function () { + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_OFF); + }); + + it('can enter pipeline mode', function () { + var result = pq.enterPipelineMode(); + assert.strictEqual(result, true); + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_ON); + }); + + it('can exit pipeline mode', function () { + pq.enterPipelineMode(); + var result = pq.exitPipelineMode(); + assert.strictEqual(result, true); + assert.strictEqual(pq.pipelineStatus(), PQ.PIPELINE_OFF); + }); + + it('can send multiple queries in pipeline mode', function (done) { + pq.enterPipelineMode(); + pq.setNonBlocking(true); + + // Send multiple queries + assert.strictEqual(pq.sendQueryParams('SELECT $1::int as num', ['1']), true); + assert.strictEqual(pq.sendQueryParams('SELECT $1::int as num', ['2']), true); + assert.strictEqual(pq.sendQueryParams('SELECT $1::int as num', ['3']), true); + + // Send sync to mark end of pipeline + assert.strictEqual(pq.pipelineSync(), true); + + // Flush the queries + pq.flush(); + + // Read results using polling approach + var results = []; + + var readResults = function() { + // Consume any available input + pq.consumeInput(); + + // Process all available results + while (!pq.isBusy()) { + var hasResult = pq.getResult(); + if (!hasResult) { + break; + } + + var status = pq.resultStatus(); + + if (status === 'PGRES_TUPLES_OK') { + results.push(parseInt(pq.getvalue(0, 0), 10)); + } else if (status === 'PGRES_PIPELINE_SYNC') { + return true; + } + } + return false; + }; + + // Poll for results + var attempts = 0; + var maxAttempts = 100; + + var poll = function() { + attempts++; + if (readResults()) { + assert.deepStrictEqual(results, [1, 2, 3]); + pq.exitPipelineMode(); + done(); + return; + } + + if (attempts >= maxAttempts) { + done(new Error('Timeout waiting for pipeline results. Got: ' + JSON.stringify(results))); + return; + } + + setTimeout(poll, 50); + }; + + poll(); + }); + + it('handles errors in pipeline mode', function (done) { + pq.enterPipelineMode(); + pq.setNonBlocking(true); + + // Send a valid query + pq.sendQueryParams('SELECT $1::int as num', ['1']); + // Send an invalid query - division by zero error + pq.sendQueryParams('SELECT 1/0', []); + // Send another valid query (will be skipped due to error) + pq.sendQueryParams('SELECT $1::int as num', ['3']); + // Send sync + pq.pipelineSync(); + pq.flush(); + + var gotError = false; + var statuses = []; + + var readResults = function() { + pq.consumeInput(); + + while (!pq.isBusy()) { + var hasResult = pq.getResult(); + if (!hasResult) { + break; + } + + var status = pq.resultStatus(); + statuses.push(status); + + // Check for error status or pipeline aborted state + if (status === 'PGRES_FATAL_ERROR') { + gotError = true; + } + + // Also check if pipeline became aborted (error occurred) + if (pq.pipelineStatus() === PQ.PIPELINE_ABORTED) { + gotError = true; + } + + if (status === 'PGRES_PIPELINE_SYNC') { + return true; + } + } + return false; + }; + + var attempts = 0; + var maxAttempts = 100; + + var poll = function() { + attempts++; + if (readResults()) { + // Either we got an explicit error or the pipeline was aborted + assert.strictEqual(gotError, true, 'Should have received an error. Statuses: ' + statuses.join(', ')); + pq.exitPipelineMode(); + done(); + return; + } + + if (attempts >= maxAttempts) { + done(new Error('Timeout waiting for pipeline error results. Statuses: ' + statuses.join(', '))); + return; + } + + setTimeout(poll, 50); + }; + + poll(); + }); + + it('sendFlushRequest works', function () { + pq.enterPipelineMode(); + pq.setNonBlocking(true); + pq.sendQueryParams('SELECT 1 as num', []); + var result = pq.sendFlushRequest(); + assert.strictEqual(result, true); + // Just verify the function works, don't need to read results + pq.pipelineSync(); + }); + }); + + describe('pipeline constants', function () { + it('exports pipeline status constants', function () { + assert.strictEqual(PQ.PIPELINE_OFF, 0); + assert.strictEqual(PQ.PIPELINE_ON, 1); + assert.strictEqual(PQ.PIPELINE_ABORTED, 2); + }); + }); + + describe('error handling when not supported', function () { + it('throws helpful error when pipeline mode not supported', function () { + // Create a mock PQ without pipeline methods + var mockPQ = { + $enterPipelineMode: undefined + }; + + // Bind the prototype method to our mock + var enterFn = PQ.prototype.enterPipelineMode.bind(mockPQ); + + assert.throws(function () { + enterFn(); + }, /Pipeline mode is not supported/); + }); + }); +});