Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 199 additions & 1 deletion packages/pg-native/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ const Client = (module.exports = function (config) {
this._rows = undefined
this._results = undefined

// Pipeline mode support
this._pipelineMode = config.pipelineMode || false
this._pendingCallbacks = []
this._currentQueryError = undefined
this._currentQueryRows = undefined
this._currentQueryResults = undefined
this._currentQueryResultCount = 0

// lazy start the reader if notifications are listened for
// this way if you only run sync queries you wont block
// the event loop artificially
Expand All @@ -42,19 +50,57 @@ const Client = (module.exports = function (config) {

util.inherits(Client, EventEmitter)

// Pipeline mode property getter
Object.defineProperty(Client.prototype, 'pipelineMode', {
get: function () {
return this._pipelineMode
},
})

// Check if pipeline mode is supported
Client.prototype.pipelineModeSupported = function () {
return typeof this.pq.pipelineModeSupported === 'function' && this.pq.pipelineModeSupported()
}

Client.prototype.connect = function (params, cb) {
this.pq.connect(params, cb)
const self = this
this.pq.connect(params, function (err) {
if (err) return cb(err)
if (self._pipelineMode) {
if (!self.pipelineModeSupported()) {
return cb(new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.'))
}
if (!self.pq.enterPipelineMode()) {
return cb(new Error('Failed to enter pipeline mode: ' + self.pq.errorMessage()))
}
}
cb()
})
}

Client.prototype.connectSync = function (params) {
this.pq.connectSync(params)
if (this._pipelineMode) {
if (!this.pipelineModeSupported()) {
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.')
}
if (!this.pq.enterPipelineMode()) {
throw new Error('Failed to enter pipeline mode: ' + this.pq.errorMessage())
}
}
}

Client.prototype.query = function (text, values, cb) {
let queryFn

if (typeof values === 'function') {
cb = values
values = undefined
}

// Use pipeline mode if enabled
if (this._pipelineMode) {
return this._pipelineQuery(text, values, cb)
}

if (Array.isArray(values)) {
Expand All @@ -73,6 +119,135 @@ Client.prototype.query = function (text, values, cb) {
})
}

// Pipeline mode query implementation
Client.prototype._pipelineQuery = function (text, values, cb) {
const self = this

// Validate: no multi-statement queries in pipeline mode
const statements = text.split(';').filter((s) => s.trim())
if (statements.length > 1) {
return setImmediate(() => cb(new Error('Multi-statement queries are not supported in pipeline mode')))
}

// Validate: no COPY in pipeline mode
const upperText = text.trim().toUpperCase()
if (upperText.startsWith('COPY ')) {
return setImmediate(() => cb(new Error('COPY operations are not supported in pipeline mode')))
}

this._stopReading()

const success = this.pq.setNonBlocking(true)
if (!success) {
return setImmediate(() => cb(new Error('Unable to set non-blocking to true')))
}

// Send the query
let sent
if (Array.isArray(values)) {
sent = this.pq.sendQueryParams(text, values)
} else {
sent = this.pq.sendQuery(text)
}

if (!sent) {
return setImmediate(() => cb(new Error(this.pq.errorMessage() || 'Failed to send query')))
}

// Send sync point
if (!this.pq.pipelineSync()) {
return setImmediate(() => cb(new Error('Failed to send pipeline sync')))
}

// Store callback for this query
this._pendingCallbacks.push(cb)

// Flush and start reading
this._waitForDrain(this.pq, (err) => {
if (err) {
const pendingCb = self._pendingCallbacks.shift()
if (pendingCb) pendingCb(new Error(err))
return
}
self._startPipelineReading()
})
}

// Start reading in pipeline mode
Client.prototype._startPipelineReading = function () {
if (this._reading) return
this._reading = true
this.pq.on('readable', this._readPipeline.bind(this))
this.pq.startReader()
}

// Read results in pipeline mode
Client.prototype._readPipeline = function () {
const pq = this.pq

if (!pq.consumeInput()) {
return this._readError()
}

if (pq.isBusy()) {
return
}

while (pq.getResult()) {
const status = pq.resultStatus()

if (status === 'PGRES_PIPELINE_SYNC') {
// Query complete, call the callback
const cb = this._pendingCallbacks.shift()
if (cb) {
const err = this._currentQueryError
this._currentQueryError = undefined
const rows = this._currentQueryRows
this._currentQueryRows = undefined
const results = this._currentQueryResults
this._currentQueryResults = undefined
this._currentQueryResultCount = 0
cb(err, rows || [], results)
}

// If no more pending callbacks, stop reading
if (this._pendingCallbacks.length === 0) {
this._stopReading()
}
continue
}

if (status === 'PGRES_FATAL_ERROR') {
this._currentQueryError = new Error(pq.resultErrorMessage())
continue
}

if (status === 'PGRES_TUPLES_OK' || status === 'PGRES_COMMAND_OK' || status === 'PGRES_EMPTY_QUERY') {
const result = this._consumeQueryResults(pq)
this._onPipelineResult(result)
}

if (pq.isBusy()) {
return
}
}
}

// Handle result in pipeline mode
Client.prototype._onPipelineResult = function (result) {
if (this._currentQueryResultCount === 0) {
this._currentQueryResults = result
this._currentQueryRows = result.rows
} else if (this._currentQueryResultCount === 1) {
this._currentQueryResults = [this._currentQueryResults, result]
this._currentQueryRows = [this._currentQueryRows, result.rows]
} else {
this._currentQueryResults.push(result)
this._currentQueryRows.push(result.rows)
}
this._currentQueryResultCount++
}

Client.prototype.prepare = function (statementName, text, nParams, cb) {
const self = this
const fn = function () {
Expand Down Expand Up @@ -149,6 +324,28 @@ Client.prototype.escapeIdentifier = function (value) {
module.exports.version = require('./package.json').version

Client.prototype.end = function (cb) {
const self = this

// In pipeline mode, wait for pending queries
if (this._pipelineMode && this._pendingCallbacks.length > 0) {
const checkPending = function () {
if (self._pendingCallbacks.length === 0) {
self._finishEnd(cb)
} else {
setTimeout(checkPending, 10)
}
}
checkPending()
return
}

this._finishEnd(cb)
}

Client.prototype._finishEnd = function (cb) {
if (this._pipelineMode && this.pipelineModeSupported()) {
this.pq.exitPipelineMode()
}
this._stopReading()
this.pq.finish()
if (cb) setImmediate(cb)
Expand All @@ -164,6 +361,7 @@ Client.prototype._stopReading = function () {
this._reading = false
this.pq.stopReader()
this.pq.removeListener('readable', this._read)
this.pq.removeListener('readable', this._readPipeline)
}

Client.prototype._consumeQueryResults = function (pq) {
Expand Down
Loading
Loading