Skip to content
This repository was archived by the owner on Dec 30, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@ const EventEmitter = require('events').EventEmitter

const NOOP = function () { }

const remove = (list, value) => {
const i = list.indexOf(value)

if (i !== -1) {
list.splice(i, 1)
}
}

const removeWhere = (list, predicate) => {
const i = list.findIndex(predicate)

Expand All @@ -26,6 +18,12 @@ class IdleItem {
}
}

class PendingItem {
constructor (callback) {
this.callback = callback
}
}

function throwOnRelease () {
throw new Error('Release called on client which has already been released to the pool.')
}
Expand Down Expand Up @@ -85,6 +83,7 @@ class Pool extends EventEmitter {
this._pendingQueue = []
this._endCallback = undefined
this.ending = false
this.ended = false
}

_isFull () {
Expand All @@ -93,6 +92,10 @@ class Pool extends EventEmitter {

_pulseQueue () {
this.log('pulse queue')
if (this.ended) {
this.log('pulse queue ended')
return
}
if (this.ending) {
this.log('pulse queue on ending')
if (this._idle.length) {
Expand All @@ -101,6 +104,7 @@ class Pool extends EventEmitter {
})
}
if (!this._clients.length) {
this.ended = true
this._endCallback()
}
return
Expand All @@ -121,10 +125,10 @@ class Pool extends EventEmitter {
const client = idleItem.client
client.release = release.bind(this, client)
this.emit('acquire', client)
return waiter(undefined, client, client.release)
return waiter.callback(undefined, client, client.release)
}
if (!this._isFull()) {
return this.connect(waiter)
Copy link
Contributor Author

@johanneswuerbach johanneswuerbach Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found re-entering into "full" connect here a bit confusing as we already know that we need to create a new client and can never add the waiter to the pending queue again as far as I can see.

Now this makes it more clear by using the newClient method directly here and in connect, wdyt?

return this.newClient(waiter)
}
throw new Error('unexpected condition')
}
Expand All @@ -150,18 +154,18 @@ class Pool extends EventEmitter {
return cb ? cb(err) : this.Promise.reject(err)
}

const response = promisify(this.Promise, cb)
const result = response.result

// if we don't have to connect a new client, don't do so
if (this._clients.length >= this.options.max || this._idle.length) {
const response = promisify(this.Promise, cb)
const result = response.result

// if we have idle clients schedule a pulse immediately
if (this._idle.length) {
process.nextTick(() => this._pulseQueue())
}

if (!this.options.connectionTimeoutMillis) {
this._pendingQueue.push(response.callback)
this._pendingQueue.push(new PendingItem(response.callback))
return result
}

Expand All @@ -170,18 +174,27 @@ class Pool extends EventEmitter {
response.callback(err, res, done)
}

const pendingItem = new PendingItem(queueCallback)

// set connection timeout on checking out an existing client
const tid = setTimeout(() => {
// remove the callback from pending waiters because
// we're going to call it with a timeout error
remove(this._pendingQueue, queueCallback)
removeWhere(this._pendingQueue, (i) => i.callback === queueCallback)
pendingItem.timedOut = true
response.callback(new Error('timeout exceeded when trying to connect'))
}, this.options.connectionTimeoutMillis)

this._pendingQueue.push(queueCallback)
this._pendingQueue.push(pendingItem)
return result
}

this.newClient(new PendingItem(response.callback))

return result
}

newClient (pendingItem) {
const client = new this.Client(this.options)
this._clients.push(client)
const idleListener = (err) => {
Expand Down Expand Up @@ -210,9 +223,6 @@ class Pool extends EventEmitter {
}, this.options.connectionTimeoutMillis)
}

const response = promisify(this.Promise, cb)
cb = response.callback

this.log('connecting new client')
client.connect((err) => {
if (tid) {
Expand All @@ -230,20 +240,29 @@ class Pool extends EventEmitter {
// this client won’t be released, so move on immediately
this._pulseQueue()

cb(err, undefined, NOOP)
if (!pendingItem.timedOut) {
pendingItem.callback(err, undefined, NOOP)
}
} else {
this.log('new client connected')
client.release = release.bind(this, client)
this.emit('connect', client)
this.emit('acquire', client)
if (this.options.verify) {
this.options.verify(client, cb)
if (!pendingItem.timedOut) {
if (this.options.verify) {
this.options.verify(client, pendingItem.callback)
} else {
pendingItem.callback(undefined, client, client.release)
}
} else {
cb(undefined, client, client.release)
if (this.options.verify) {
this.options.verify(client, client.release)
} else {
client.release()
}
}
}
})
return response.result
}

query (text, values, cb) {
Expand Down
92 changes: 92 additions & 0 deletions test/connection-timeout.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const after = require('mocha').after
const Pool = require('../')

describe('connection timeout', () => {
const connectionFailure = new Error('Temporary connection failure')

before((done) => {
this.server = net.createServer((socket) => {
})
Expand Down Expand Up @@ -126,4 +128,94 @@ describe('connection timeout', () => {
})
})
})

it('continues processing after a connection failure', (done) => {
const Client = require('pg').Client
const orgConnect = Client.prototype.connect
let called = false

Client.prototype.connect = function (cb) {
// Simulate a failure on first call
if (!called) {
called = true

return setTimeout(() => {
cb(connectionFailure)
}, 100)
}
// And pass-through the second call
orgConnect.call(this, cb)
}

const pool = new Pool({
Client: Client,
connectionTimeoutMillis: 1000,
max: 1
})

pool.connect((err, client, release) => {
expect(err).to.be(connectionFailure)

pool.query('select $1::text as name', ['brianc'], (err, res) => {
expect(err).to.be(undefined)
expect(res.rows).to.have.length(1)
pool.end(done)
})
})
})

it('releases newly connected clients if the queued already timed out', (done) => {
const Client = require('pg').Client

const orgConnect = Client.prototype.connect

let connection = 0

Client.prototype.connect = function (cb) {
// Simulate a failure on first call
if (connection === 0) {
connection++

return setTimeout(() => {
cb(connectionFailure)
}, 300)
}

// And second connect taking > connection timeout
if (connection === 1) {
connection++

return setTimeout(() => {
orgConnect.call(this, cb)
}, 1000)
}

orgConnect.call(this, cb)
}

const pool = new Pool({
Client: Client,
connectionTimeoutMillis: 1000,
max: 1
})

// Direct connect
pool.connect((err, client, release) => {
expect(err).to.be(connectionFailure)
})

// Queued
let called = 0
pool.connect((err, client, release) => {
// Verify the callback is only called once
expect(called++).to.be(0)
expect(err).to.be.an(Error)

pool.query('select $1::text as name', ['brianc'], (err, res) => {
expect(err).to.be(undefined)
expect(res.rows).to.have.length(1)
pool.end(done)
})
})
})
})