Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 268 additions & 2 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class Client extends EventEmitter {
}

this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0

// Pipeline mode configuration
this._pipelineMode = this.connectionParameters.pipelineMode
this._pipelineBatchSize = this.connectionParameters.pipelineBatchSize
this._pipelineBatchTimeout = this.connectionParameters.pipelineBatchTimeout

// Pipeline state
// Pipeline state
this._activePipelineBatch = [] // Queries being buffered (sent but no Sync)
this._pendingPipelineBatches = [] // Batches sent with Sync, awaiting results
this._pipelineSyncTimer = null
this._pipelineCurrentIndex = 0 // Index into _pendingPipelineBatches[0]
}

get activeQuery() {
Expand All @@ -110,6 +122,13 @@ class Client extends EventEmitter {
return this._activeQuery
}

/**
* Check if pipeline mode is enabled
*/
get pipelineMode() {
return this._pipelineMode
}

_errorAllQueries(err) {
const enqueueError = (query) => {
process.nextTick(() => {
Expand All @@ -123,6 +142,13 @@ class Client extends EventEmitter {
this._activeQuery = null
}

// Clear pipeline pending queries
this._activePipelineBatch.forEach(enqueueError)
this._activePipelineBatch.length = 0
this._pendingPipelineBatches.forEach((batch) => batch.forEach(enqueueError))
this._pendingPipelineBatches.length = 0
this._pipelineCurrentIndex = 0

this._queryQueue.forEach(enqueueError)
this._queryQueue.length = 0
}
Expand Down Expand Up @@ -354,6 +380,19 @@ class Client extends EventEmitter {
}
this.emit('connect')
}

if (this._pipelineMode) {
// In pipeline mode, ReadyForQuery marks end of a pipeline batch sync
// The first batch in _pendingPipelineBatches is now complete
if (this._pendingPipelineBatches.length > 0) {
this._pendingPipelineBatches.shift()
}
this._pipelineCurrentIndex = 0
this.readyForQuery = true
this._pulseQueryQueue()
return
}

const activeQuery = this._getActiveQuery()
this._activeQuery = null
this.readyForQuery = true
Expand Down Expand Up @@ -395,6 +434,26 @@ class Client extends EventEmitter {
if (this._connecting) {
return this._handleErrorWhileConnecting(msg)
}

if (this._pipelineMode && this._pendingPipelineBatches.length > 0) {
// In pipeline mode, error aborts the current query in the current batch
// Server will skip remaining commands until Sync
const batch = this._pendingPipelineBatches[0]
const query = batch[this._pipelineCurrentIndex]
if (query) {
query.handleError(msg, this.connection)
}
// Mark remaining pending queries IN THIS BATCH as aborted
for (let i = this._pipelineCurrentIndex + 1; i < batch.length; i++) {
const pendingQuery = batch[i]
const abortError = new Error('Query aborted due to pipeline error')
abortError.pipelineAborted = true
abortError.originalError = msg
pendingQuery.handleError(abortError, this.connection)
}
return
}

const activeQuery = this._getActiveQuery()

if (!activeQuery) {
Expand All @@ -407,6 +466,14 @@ class Client extends EventEmitter {
}

_handleRowDescription(msg) {
if (this._pipelineMode) {
const query = this._getCurrentPipelineQuery()
if (query) {
query.handleRowDescription(msg)
}
return
}

const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected rowDescription message from backend.')
Expand All @@ -418,6 +485,14 @@ class Client extends EventEmitter {
}

_handleDataRow(msg) {
if (this._pipelineMode) {
const query = this._getCurrentPipelineQuery()
if (query) {
query.handleDataRow(msg)
}
return
}

const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected dataRow message from backend.')
Expand All @@ -440,6 +515,16 @@ class Client extends EventEmitter {
}

_handleEmptyQuery(msg) {
if (this._pipelineMode) {
const query = this._getCurrentPipelineQuery()
if (query) {
query.handleEmptyQuery(this.connection)
query.handleReadyForQuery(this.connection)
}
this._pipelineCurrentIndex++
return
}

const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected emptyQuery message from backend.')
Expand All @@ -451,6 +536,18 @@ class Client extends EventEmitter {
}

_handleCommandComplete(msg) {
if (this._pipelineMode) {
const query = this._getCurrentPipelineQuery()
if (query) {
query.handleCommandComplete(msg, this.connection)
// In pipeline mode, commandComplete marks query completion
// Signal readyForQuery to the individual query
query.handleReadyForQuery(this.connection)
}
this._pipelineCurrentIndex++
return
}

const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected commandComplete message from backend.')
Expand All @@ -462,6 +559,14 @@ class Client extends EventEmitter {
}

_handleParseComplete() {
if (this._pipelineMode) {
const query = this._getCurrentPipelineQuery()
if (query && query.name) {
this.connection.parsedStatements[query.name] = query.text
}
return
}

const activeQuery = this._getActiveQuery()
if (activeQuery == null) {
const error = new Error('Received unexpected parseComplete message from backend.')
Expand Down Expand Up @@ -574,6 +679,10 @@ class Client extends EventEmitter {
}

_pulseQueryQueue() {
if (this._pipelineMode) {
return this._pulsePipelineQueue()
}

if (this.readyForQuery === true) {
this._activeQuery = this._queryQueue.shift()
const activeQuery = this._getActiveQuery()
Expand All @@ -596,6 +705,152 @@ class Client extends EventEmitter {
}
}

/**
* Pipeline mode query queue processing.
* Sends queries without waiting for individual ReadyForQuery responses.
*/
/**
* Pipeline mode query queue processing.
* Sends queries without waiting for individual ReadyForQuery responses.
*/
_pulsePipelineQueue() {
// If no queries to send, check if we need to drain or sync current batch
if (this._queryQueue.length === 0) {
if (this._activePipelineBatch.length > 0) {
// Queue is empty but we have unsynced queries, force a sync
this._sendPipelineSync()
} else if (this.hasExecuted && this._pendingPipelineBatches.length === 0) {
this.emit('drain')
}
return
}

//Cork the stream to batch all messages
this.connection.stream.cork && this.connection.stream.cork()

try {
// Send queries up to batch size
while (this._queryQueue.length > 0 && this._activePipelineBatch.length < this._pipelineBatchSize) {
const query = this._queryQueue.shift()
this._activePipelineBatch.push(query)

// Submit query without sync
const queryError = this._submitPipelineQuery(query)
if (queryError) {
// Handle submit error immediately
this._activePipelineBatch.pop()
process.nextTick(() => {
query.handleError(queryError, this.connection)
})
continue
}
}

// Check if we should sync
const isFull = this._activePipelineBatch.length >= this._pipelineBatchSize
const isEmpty = this._queryQueue.length === 0
const useTimeout = this._pipelineBatchTimeout > 0

if (isFull) {
this._sendPipelineSync()
} else if (isEmpty && !useTimeout) {
// Default behavior: sync immediately if empty and no timeout
this._sendPipelineSync()
} else if (useTimeout) {
// Schedule sync (wait for more queries or timeout)
this._schedulePipelineSync()
}
} finally {
// Uncork to send all buffered data
this.connection.stream.uncork && this.connection.stream.uncork()
}
}

/**
* Submit a query in pipeline mode (without sync).
* Returns an error if submit fails, null otherwise.
*/
_submitPipelineQuery(query) {
const connection = this.connection

this.hasExecuted = true
this.readyForQuery = false

if (query.requiresPreparation()) {
// Extended query protocol
try {
if (!query.hasBeenParsed(connection)) {
connection.parse({
text: query.text,
name: query.name,
types: query.types,
})
}

connection.bind({
portal: query.portal || '',
statement: query.name || '',
values: query.values || [],
binary: query.binary,
valueMapper: utils.prepareValue,
})

connection.describe({
type: 'P',
name: query.portal || '',
})

connection.execute({
portal: query.portal || '',
rows: query.rows || 0,
})
} catch (err) {
return err
}
} else {
// Simple query (can't be truly pipelined, but we queue it)
connection.query(query.text)
}

return null
}

_schedulePipelineSync() {
if (this._pipelineSyncTimer) return

this._pipelineSyncTimer = setTimeout(() => {
this._pipelineSyncTimer = null
if (this._activePipelineBatch.length > 0) {
this._sendPipelineSync()
}
}, this._pipelineBatchTimeout)
}

/**
* Send a Sync message to mark pipeline boundary
*/
_sendPipelineSync() {
if (this._pipelineSyncTimer) {
clearTimeout(this._pipelineSyncTimer)
this._pipelineSyncTimer = null
}

// Move active batch to pending batches
if (this._activePipelineBatch.length > 0) {
this._pendingPipelineBatches.push(this._activePipelineBatch)
this._activePipelineBatch = []
this.connection.sync()
}
}

/**
* Get the current query being processed in pipeline mode
*/
_getCurrentPipelineQuery() {
const currentBatch = this._pendingPipelineBatches[0]
return currentBatch ? currentBatch[this._pipelineCurrentIndex] : null
}

query(config, values, callback) {
// can take in strings, config object or query object
let query
Expand Down Expand Up @@ -696,6 +951,12 @@ class Client extends EventEmitter {
end(cb) {
this._ending = true

// Clear pipeline timer
if (this._pipelineSyncTimer) {
clearTimeout(this._pipelineSyncTimer)
this._pipelineSyncTimer = null
}

// if we have never connected, then end is a noop, callback immediately
if (!this.connection._connecting || this._ended) {
if (cb) {
Expand All @@ -705,8 +966,13 @@ class Client extends EventEmitter {
}
}

if (this._getActiveQuery() || !this._queryable) {
// if we have an active query we need to force a disconnect
if (
this._getActiveQuery() ||
this._activePipelineBatch.length > 0 ||
this._pendingPipelineBatches.length > 0 ||
!this._queryable
) {
// if we have an active query or pipeline queries we need to force a disconnect
// on the socket - otherwise a hung query could block end forever
this.connection.stream.destroy()
} else {
Expand Down
9 changes: 9 additions & 0 deletions packages/pg/lib/connection-parameters.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ class ConnectionParameters {
if (typeof config.keepAliveInitialDelayMillis === 'number') {
this.keepalives_idle = Math.floor(config.keepAliveInitialDelayMillis / 1000)
}

// Pipeline mode configuration
this.pipelineMode = config.pipelineMode !== undefined ? Boolean(config.pipelineMode) : defaults.pipelineMode
this.pipelineBatchSize =
config.pipelineBatchSize !== undefined ? parseInt(config.pipelineBatchSize, 10) : defaults.pipelineBatchSize
this.pipelineBatchTimeout =
config.pipelineBatchTimeout !== undefined
? parseInt(config.pipelineBatchTimeout, 10)
: defaults.pipelineBatchTimeout
}

getLibpqConnectionString(cb) {
Expand Down
Loading
Loading