Skip to content

Commit

Permalink
Remove idleListener when client is in-use (#123)
Browse files Browse the repository at this point in the history
* Prevent double release with callback

When using the callback instead of client.release, double releasing
a client was possible causing clients to be re-added multiple times.

* Remove idleListener when client is in-use

When a client is in-use, the error handling should be done by the
consumer and not by the pool itself as this otherwise might cause
errors to be handled multiple times.

* Handle verify failures
  • Loading branch information
johanneswuerbach authored and brianc committed Jul 25, 2019
1 parent d7f6ed0 commit f9fc232
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 47 deletions.
120 changes: 77 additions & 43 deletions index.js
Expand Up @@ -12,8 +12,9 @@ const removeWhere = (list, predicate) => {
}

class IdleItem {
constructor (client, timeoutId) {
constructor (client, idleListener, timeoutId) {
this.client = client
this.idleListener = idleListener
this.timeoutId = timeoutId
}
}
Expand All @@ -28,27 +29,6 @@ function throwOnRelease () {
throw new Error('Release called on client which has already been released to the pool.')
}

function release (client, err) {
client.release = throwOnRelease
if (err || this.ending) {
this._remove(client)
this._pulseQueue()
return
}

// idle timeout
let tid
if (this.options.idleTimeoutMillis) {
tid = setTimeout(() => {
this.log('remove idle client')
this._remove(client)
}, this.options.idleTimeoutMillis)
}

this._idle.push(new IdleItem(client, tid))
this._pulseQueue()
}

function promisify (Promise, callback) {
if (callback) {
return { callback: callback, result: undefined }
Expand All @@ -68,6 +48,7 @@ function promisify (Promise, callback) {
function makeIdleListener (pool, client) {
return function idleListener (err) {
err.client = client

client.removeListener('error', idleListener)
client.on('error', () => {
pool.log('additional client error after disconnection due to error', err)
Expand Down Expand Up @@ -132,17 +113,17 @@ class Pool extends EventEmitter {
if (!this._idle.length && this._isFull()) {
return
}
const waiter = this._pendingQueue.shift()
const pendingItem = this._pendingQueue.shift()
if (this._idle.length) {
const idleItem = this._idle.pop()
clearTimeout(idleItem.timeoutId)
const client = idleItem.client
client.release = release.bind(this, client)
this.emit('acquire', client)
return waiter.callback(undefined, client, client.release)
const idleListener = idleItem.idleListener

return this._acquireClient(client, pendingItem, idleListener, false)
}
if (!this._isFull()) {
return this.newClient(waiter)
return this.newClient(pendingItem)
}
throw new Error('unexpected condition')
}
Expand Down Expand Up @@ -249,26 +230,79 @@ class Pool extends EventEmitter {
}
} else {
this.log('new client connected')
client.release = release.bind(this, client)
this.emit('connect', client)
this.emit('acquire', client)
if (!pendingItem.timedOut) {
if (this.options.verify) {
this.options.verify(client, pendingItem.callback)
} else {
pendingItem.callback(undefined, client, client.release)
}
} else {
if (this.options.verify) {
this.options.verify(client, client.release)
} else {
client.release()
}
}

return this._acquireClient(client, pendingItem, idleListener, true)
}
})
}

// acquire a client for a pending work item
_acquireClient (client, pendingItem, idleListener, isNew) {
if (isNew) {
this.emit('connect', client)
}

this.emit('acquire', client)

let released = false

client.release = (err) => {
if (released) {
throwOnRelease()
}

released = true
this._release(client, idleListener, err)
}

client.removeListener('error', idleListener)

if (!pendingItem.timedOut) {
if (isNew && this.options.verify) {
this.options.verify(client, (err) => {
if (err) {
client.release(err)
return pendingItem.callback(err, undefined, NOOP)
}

pendingItem.callback(undefined, client, client.release)
})
} else {
pendingItem.callback(undefined, client, client.release)
}
} else {
if (isNew && this.options.verify) {
this.options.verify(client, client.release)
} else {
client.release()
}
}
}

// release a client back to the poll, include an error
// to remove it from the pool
_release (client, idleListener, err) {
client.on('error', idleListener)

if (err || this.ending) {
this._remove(client)
this._pulseQueue()
return
}

// idle timeout
let tid
if (this.options.idleTimeoutMillis) {
tid = setTimeout(() => {
this.log('remove idle client')
this._remove(client)
}, this.options.idleTimeoutMillis)
}

this._idle.push(new IdleItem(client, idleListener, tid))
this._pulseQueue()
}

query (text, values, cb) {
// guard clause against passing a function as the first parameter
if (typeof text === 'function') {
Expand Down
65 changes: 61 additions & 4 deletions test/error-handling.js
Expand Up @@ -43,6 +43,20 @@ describe('pool error handling', function () {
expect(() => client.release()).to.throwError()
return yield pool.end()
}))

it('should throw each time with callbacks', function (done) {
const pool = new Pool()

pool.connect(function (err, client, clientDone) {
expect(err).not.to.be.an(Error)
clientDone()

expect(() => clientDone()).to.throwError()
expect(() => clientDone()).to.throwError()

pool.end(done)
})
})
})

describe('calling connect after end', () => {
Expand Down Expand Up @@ -101,13 +115,56 @@ describe('pool error handling', function () {
client.release()
yield new Promise((resolve, reject) => {
process.nextTick(() => {
let poolError
pool.once('error', (err) => {
poolError = err
})

let clientError
client.once('error', (err) => {
clientError = err
})

client.emit('error', new Error('expected'))

expect(clientError.message).to.equal('expected')
expect(poolError.message).to.equal('expected')
expect(pool.idleCount).to.equal(0)
expect(pool.totalCount).to.equal(0)
pool.end().then(resolve, reject)
})
})
}))
})

describe('error from in-use client', () => {
it('keeps the client in the pool', co.wrap(function * () {
const pool = new Pool()
const client = yield pool.connect()
expect(pool.totalCount).to.equal(1)
expect(pool.waitingCount).to.equal(0)
expect(pool.idleCount).to.equal(0)

yield new Promise((resolve, reject) => {
process.nextTick(() => {
let poolError
pool.once('error', (err) => {
expect(err.message).to.equal('expected')
expect(pool.idleCount).to.equal(0)
expect(pool.totalCount).to.equal(0)
pool.end().then(resolve, reject)
poolError = err
})

let clientError
client.once('error', (err) => {
clientError = err
})

client.emit('error', new Error('expected'))

expect(clientError.message).to.equal('expected')
expect(poolError).not.to.be.ok()
expect(pool.idleCount).to.equal(0)
expect(pool.totalCount).to.equal(1)
client.release()
pool.end().then(resolve, reject)
})
})
}))
Expand Down

0 comments on commit f9fc232

Please sign in to comment.