From 20aa6abf6509db4601277747995e8eb361d93a09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20W=C3=BCrbach?= Date: Sun, 2 Dec 2018 22:37:49 +0100 Subject: [PATCH] Fix two timeout races --- index.js | 67 +++++++++++++++++---------- test/connection-timeout.js | 92 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 24 deletions(-) diff --git a/index.js b/index.js index 2ccbbe7..fe921cb 100644 --- a/index.js +++ b/index.js @@ -3,14 +3,6 @@ const EventEmitter = require('events').EventEmitter const NOOP = function () { } -const remove = (list, value) => { - const i = list.indexOf(value) - - if (i !== -1) { - list.splice(i, 1) - } -} - const removeWhere = (list, predicate) => { const i = list.findIndex(predicate) @@ -26,6 +18,12 @@ class IdleItem { } } +class PendingItem { + constructor (callback) { + this.callback = callback + } +} + function throwOnRelease () { throw new Error('Release called on client which has already been released to the pool.') } @@ -85,6 +83,7 @@ class Pool extends EventEmitter { this._pendingQueue = [] this._endCallback = undefined this.ending = false + this.ended = false } _isFull () { @@ -93,6 +92,10 @@ class Pool extends EventEmitter { _pulseQueue () { this.log('pulse queue') + if (this.ended) { + this.log('pulse queue ended') + return + } if (this.ending) { this.log('pulse queue on ending') if (this._idle.length) { @@ -101,6 +104,7 @@ class Pool extends EventEmitter { }) } if (!this._clients.length) { + this.ended = true this._endCallback() } return @@ -121,10 +125,10 @@ class Pool extends EventEmitter { const client = idleItem.client client.release = release.bind(this, client) this.emit('acquire', client) - return waiter(undefined, client, client.release) + return waiter.callback(undefined, client, client.release) } if (!this._isFull()) { - return this.connect(waiter) + return this.newClient(waiter) } throw new Error('unexpected condition') } @@ -150,18 +154,18 @@ class Pool extends EventEmitter { return cb ? cb(err) : this.Promise.reject(err) } + const response = promisify(this.Promise, cb) + const result = response.result + // if we don't have to connect a new client, don't do so if (this._clients.length >= this.options.max || this._idle.length) { - const response = promisify(this.Promise, cb) - const result = response.result - // if we have idle clients schedule a pulse immediately if (this._idle.length) { process.nextTick(() => this._pulseQueue()) } if (!this.options.connectionTimeoutMillis) { - this._pendingQueue.push(response.callback) + this._pendingQueue.push(new PendingItem(response.callback)) return result } @@ -170,18 +174,27 @@ class Pool extends EventEmitter { response.callback(err, res, done) } + const pendingItem = new PendingItem(queueCallback) + // set connection timeout on checking out an existing client const tid = setTimeout(() => { // remove the callback from pending waiters because // we're going to call it with a timeout error - remove(this._pendingQueue, queueCallback) + removeWhere(this._pendingQueue, (i) => i.callback === queueCallback) + pendingItem.timedOut = true response.callback(new Error('timeout exceeded when trying to connect')) }, this.options.connectionTimeoutMillis) - this._pendingQueue.push(queueCallback) + this._pendingQueue.push(pendingItem) return result } + this.newClient(new PendingItem(response.callback)) + + return result + } + + newClient (pendingItem) { const client = new this.Client(this.options) this._clients.push(client) const idleListener = (err) => { @@ -210,9 +223,6 @@ class Pool extends EventEmitter { }, this.options.connectionTimeoutMillis) } - const response = promisify(this.Promise, cb) - cb = response.callback - this.log('connecting new client') client.connect((err) => { if (tid) { @@ -230,20 +240,29 @@ class Pool extends EventEmitter { // this client won’t be released, so move on immediately this._pulseQueue() - cb(err, undefined, NOOP) + if (!pendingItem.timedOut) { + pendingItem.callback(err, undefined, NOOP) + } } else { this.log('new client connected') client.release = release.bind(this, client) this.emit('connect', client) this.emit('acquire', client) - if (this.options.verify) { - this.options.verify(client, cb) + if (!pendingItem.timedOut) { + if (this.options.verify) { + this.options.verify(client, pendingItem.callback) + } else { + pendingItem.callback(undefined, client, client.release) + } } else { - cb(undefined, client, client.release) + if (this.options.verify) { + this.options.verify(client, client.release) + } else { + client.release() + } } } }) - return response.result } query (text, values, cb) { diff --git a/test/connection-timeout.js b/test/connection-timeout.js index f7a2fd8..2c4d68f 100644 --- a/test/connection-timeout.js +++ b/test/connection-timeout.js @@ -11,6 +11,8 @@ const after = require('mocha').after const Pool = require('../') describe('connection timeout', () => { + const connectionFailure = new Error('Temporary connection failure') + before((done) => { this.server = net.createServer((socket) => { }) @@ -126,4 +128,94 @@ describe('connection timeout', () => { }) }) }) + + it('continues processing after a connection failure', (done) => { + const Client = require('pg').Client + const orgConnect = Client.prototype.connect + let called = false + + Client.prototype.connect = function (cb) { + // Simulate a failure on first call + if (!called) { + called = true + + return setTimeout(() => { + cb(connectionFailure) + }, 100) + } + // And pass-through the second call + orgConnect.call(this, cb) + } + + const pool = new Pool({ + Client: Client, + connectionTimeoutMillis: 1000, + max: 1 + }) + + pool.connect((err, client, release) => { + expect(err).to.be(connectionFailure) + + pool.query('select $1::text as name', ['brianc'], (err, res) => { + expect(err).to.be(undefined) + expect(res.rows).to.have.length(1) + pool.end(done) + }) + }) + }) + + it('releases newly connected clients if the queued already timed out', (done) => { + const Client = require('pg').Client + + const orgConnect = Client.prototype.connect + + let connection = 0 + + Client.prototype.connect = function (cb) { + // Simulate a failure on first call + if (connection === 0) { + connection++ + + return setTimeout(() => { + cb(connectionFailure) + }, 300) + } + + // And second connect taking > connection timeout + if (connection === 1) { + connection++ + + return setTimeout(() => { + orgConnect.call(this, cb) + }, 1000) + } + + orgConnect.call(this, cb) + } + + const pool = new Pool({ + Client: Client, + connectionTimeoutMillis: 1000, + max: 1 + }) + + // Direct connect + pool.connect((err, client, release) => { + expect(err).to.be(connectionFailure) + }) + + // Queued + let called = 0 + pool.connect((err, client, release) => { + // Verify the callback is only called once + expect(called++).to.be(0) + expect(err).to.be.an(Error) + + pool.query('select $1::text as name', ['brianc'], (err, res) => { + expect(err).to.be(undefined) + expect(res.rows).to.have.length(1) + pool.end(done) + }) + }) + }) })