Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:11
image: postgres:14
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
Expand Down
93 changes: 93 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,99 @@ Issues a request to cancel the currently executing query _on this instance of li

Returns the version of the connected PostgreSQL backend server as a number.

### Pipeline Mode (PostgreSQL 14+)

Pipeline mode allows sending multiple queries to the server without waiting for results, significantly reducing round-trip latency. These functions are only available when compiled against PostgreSQL 14 or later client libraries.

##### `pq.pipelineModeSupported():boolean`

Returns `true` if pipeline mode is supported (compiled against PostgreSQL 14+), `false` otherwise.

##### `pq.enterPipelineMode():boolean`

Enters pipeline mode on the connection. In pipeline mode, you can send multiple queries using the async send functions (`sendQuery`, `sendQueryParams`, etc.) without waiting for results.

Returns `true` if successful, `false` if failed. Throws an error if pipeline mode is not supported.

##### `pq.exitPipelineMode():boolean`

Exits pipeline mode. Can only be called when the pipeline is empty (all results have been processed).

Returns `true` if successful, `false` if failed.

##### `pq.pipelineStatus():int`

Returns the current pipeline status:
- `PQ.PIPELINE_OFF` (0): Not in pipeline mode
- `PQ.PIPELINE_ON` (1): In pipeline mode
- `PQ.PIPELINE_ABORTED` (2): Pipeline aborted due to error

##### `pq.pipelineSync():boolean`

Sends a synchronization point in the pipeline. The server will process all queries up to this point and send their results before processing any further queries. This is essential for error handling - if a query fails, all subsequent queries until the next sync point are skipped.

Returns `true` if successful, `false` if failed.

##### `pq.sendFlushRequest():boolean`

Sends a request for the server to flush its output buffer. Useful when you want to receive results before sending a sync.

Returns `true` if successful, `false` if failed.

#### Pipeline Mode Constants

```js
PQ.PIPELINE_OFF // 0 - Not in pipeline mode
PQ.PIPELINE_ON // 1 - In pipeline mode
PQ.PIPELINE_ABORTED // 2 - Pipeline aborted due to error
```

#### Pipeline Mode Example

```js
var PQ = require('libpq');
var pq = new PQ();
pq.connectSync();

if (!pq.pipelineModeSupported()) {
console.log('Pipeline mode requires PostgreSQL 14+ client libraries');
process.exit(1);
}

// Enter pipeline mode
pq.enterPipelineMode();
pq.setNonBlocking(true);

// Send multiple queries without waiting
pq.sendQueryParams('SELECT $1::int', ['1']);
pq.sendQueryParams('SELECT $1::int', ['2']);
pq.sendQueryParams('SELECT $1::int', ['3']);

// Mark end of pipeline batch
pq.pipelineSync();
pq.flush();

// Process results asynchronously
pq.on('readable', function() {
pq.consumeInput();
while (!pq.isBusy()) {
if (!pq.getResult()) break;

var status = pq.resultStatus();
if (status === 'PGRES_TUPLES_OK') {
console.log('Result:', pq.getvalue(0, 0));
} else if (status === 'PGRES_PIPELINE_SYNC') {
// All results received
pq.stopReader();
pq.exitPipelineMode();
pq.finish();
}
}
});

pq.startReader();
```

## testing

```sh
Expand Down
64 changes: 64 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,67 @@ PQ.prototype.getCopyData = function (async) {
PQ.prototype.cancel = function () {
return this.$cancel();
};

// Pipeline mode functions (PostgreSQL 14+)
// These functions are only available if compiled against PostgreSQL 14 or later

//Enters pipeline mode on the connection
//Returns true if successful, false if failed
//Pipeline mode allows sending multiple queries without waiting for results
PQ.prototype.enterPipelineMode = function () {
if (typeof this.$enterPipelineMode !== 'function') {
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
}
return this.$enterPipelineMode();
};

//Exits pipeline mode on the connection
//Returns true if successful, false if failed
//Can only exit pipeline mode when the queue is empty and no pending results
PQ.prototype.exitPipelineMode = function () {
if (typeof this.$exitPipelineMode !== 'function') {
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
}
return this.$exitPipelineMode();
};

//Returns the current pipeline status
//0 = PQ_PIPELINE_OFF (not in pipeline mode)
//1 = PQ_PIPELINE_ON (in pipeline mode)
//2 = PQ_PIPELINE_ABORTED (pipeline aborted due to error)
PQ.prototype.pipelineStatus = function () {
if (typeof this.$pipelineStatus !== 'function') {
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
}
return this.$pipelineStatus();
};

//Sends a sync message in pipeline mode
//This marks a synchronization point - the server will send results
//for all queries up to this point before processing further queries
//Returns true if successful, false if failed
PQ.prototype.pipelineSync = function () {
if (typeof this.$pipelineSync !== 'function') {
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
}
return this.$pipelineSync();
};

//Sends a request for the server to flush its output buffer
//Returns true if successful, false if failed
PQ.prototype.sendFlushRequest = function () {
if (typeof this.$sendFlushRequest !== 'function') {
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
}
return this.$sendFlushRequest();
};

//Check if pipeline mode is supported (compiled against PostgreSQL 14+)
PQ.prototype.pipelineModeSupported = function () {
return typeof this.$enterPipelineMode === 'function';
};

// Pipeline status constants
PQ.PIPELINE_OFF = 0;
PQ.PIPELINE_ON = 1;
PQ.PIPELINE_ABORTED = 2;
9 changes: 9 additions & 0 deletions src/addon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ NAN_MODULE_INIT(InitAddon) {
//Cancel
Nan::SetPrototypeMethod(tpl, "$cancel", Connection::Cancel);

#ifdef PIPELINE_MODE_SUPPORTED
//Pipeline mode (PostgreSQL 14+)
Nan::SetPrototypeMethod(tpl, "$enterPipelineMode", Connection::EnterPipelineMode);
Nan::SetPrototypeMethod(tpl, "$exitPipelineMode", Connection::ExitPipelineMode);
Nan::SetPrototypeMethod(tpl, "$pipelineStatus", Connection::PipelineStatus);
Nan::SetPrototypeMethod(tpl, "$pipelineSync", Connection::PipelineSync);
Nan::SetPrototypeMethod(tpl, "$sendFlushRequest", Connection::SendFlushRequest);
#endif

Nan::Set(target,
Nan::New("PQ").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
}
Expand Down
4 changes: 4 additions & 0 deletions src/addon.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
#define MORE_ERROR_FIELDS_SUPPORTED
#endif

#if PG_VERSION_NUM >= 140000
#define PIPELINE_MODE_SUPPORTED
#endif

#include "connection.h"
#include "connect-async-worker.h"

Expand Down
53 changes: 53 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,59 @@ NAN_METHOD(Connection::Cancel) {
delete[] errBuff;
}

#ifdef PIPELINE_MODE_SUPPORTED
NAN_METHOD(Connection::EnterPipelineMode) {
TRACE("Connection::EnterPipelineMode");

Connection* self = NODE_THIS();

int result = PQenterPipelineMode(self->pq);

info.GetReturnValue().Set(result == 1);
}

NAN_METHOD(Connection::ExitPipelineMode) {
TRACE("Connection::ExitPipelineMode");

Connection* self = NODE_THIS();

int result = PQexitPipelineMode(self->pq);

info.GetReturnValue().Set(result == 1);
}

NAN_METHOD(Connection::PipelineStatus) {
TRACE("Connection::PipelineStatus");

Connection* self = NODE_THIS();

PGpipelineStatus status = PQpipelineStatus(self->pq);

// PQ_PIPELINE_OFF = 0, PQ_PIPELINE_ON = 1, PQ_PIPELINE_ABORTED = 2
info.GetReturnValue().Set(static_cast<int>(status));
}

NAN_METHOD(Connection::PipelineSync) {
TRACE("Connection::PipelineSync");

Connection* self = NODE_THIS();

int result = PQpipelineSync(self->pq);

info.GetReturnValue().Set(result == 1);
}

NAN_METHOD(Connection::SendFlushRequest) {
TRACE("Connection::SendFlushRequest");

Connection* self = NODE_THIS();

int result = PQsendFlushRequest(self->pq);

info.GetReturnValue().Set(result == 1);
}
#endif

bool Connection::ConnectDB(const char* paramString) {
TRACEF("Connection::ConnectDB:Connection parameters: %s\n", paramString);
this->pq = PQconnectdb(paramString);
Expand Down
7 changes: 7 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class Connection : public Nan::ObjectWrap {
static NAN_METHOD(PutCopyEnd);
static NAN_METHOD(GetCopyData);
static NAN_METHOD(Cancel);
#ifdef PIPELINE_MODE_SUPPORTED
static NAN_METHOD(EnterPipelineMode);
static NAN_METHOD(ExitPipelineMode);
static NAN_METHOD(PipelineStatus);
static NAN_METHOD(PipelineSync);
static NAN_METHOD(SendFlushRequest);
#endif

bool ConnectDB(const char* paramString);
void InitPollSocket();
Expand Down
Loading