diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index 624877680..727fe9081 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -182,7 +182,7 @@ Cursor.prototype.end = util.deprecate(function (cb) { }, 'Cursor.end is deprecated. Call end on the client itself to end a connection to the database.') Cursor.prototype.close = function (cb) { - if (this.state === 'done') { + if (!this.connection || this.state === 'done') { if (cb) { return setImmediate(cb) } else { diff --git a/packages/pg-cursor/test/close.js b/packages/pg-cursor/test/close.js index 785a71098..e63512abd 100644 --- a/packages/pg-cursor/test/close.js +++ b/packages/pg-cursor/test/close.js @@ -46,4 +46,9 @@ describe('close', function () { }) }) }) + + it('is a no-op to "close" the cursor before submitting it', function (done) { + const cursor = new Cursor(text) + cursor.close(done) + }) }) diff --git a/packages/pg-query-stream/README.md b/packages/pg-query-stream/README.md index d00550aec..312387aa7 100644 --- a/packages/pg-query-stream/README.md +++ b/packages/pg-query-stream/README.md @@ -47,7 +47,7 @@ I'm very open to contribution! Open a pull request with your code or idea and w The MIT License (MIT) -Copyright (c) 2013 Brian M. Carlson +Copyright (c) 2013-2019 Brian M. Carlson Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/packages/pg-query-stream/index.js b/packages/pg-query-stream/index.js index 9c34207ec..4576f5fb5 100644 --- a/packages/pg-query-stream/index.js +++ b/packages/pg-query-stream/index.js @@ -1,14 +1,12 @@ -'use strict' -var Cursor = require('pg-cursor') -var Readable = require('stream').Readable +const { Readable } = require('stream') +const Cursor = require('pg-cursor') class PgQueryStream extends Readable { - constructor (text, values, options) { - super(Object.assign({ objectMode: true }, options)) - this.cursor = new Cursor(text, values, options) - this._reading = false - this._closed = false - this.batchSize = (options || {}).batchSize || 100 + constructor(text, values, config = {}) { + const { batchSize = 100 } = config; + // https://nodejs.org/api/stream.html#stream_new_stream_readable_options + super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize }) + this.cursor = new Cursor(text, values, config) // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) @@ -19,40 +17,25 @@ class PgQueryStream extends Readable { this.handleError = this.cursor.handleError.bind(this.cursor) } - submit (connection) { + submit(connection) { this.cursor.submit(connection) } - close (callback) { - this._closed = true - const cb = callback || (() => this.emit('close')) - this.cursor.close(cb) + _destroy(_err, cb) { + this.cursor.close((err) => { + cb(err || _err) + }) } - _read (size) { - if (this._reading || this._closed) { - return false - } - this._reading = true - const readAmount = Math.max(size, this.batchSize) - this.cursor.read(readAmount, (err, rows) => { - if (this._closed) { - return - } + // https://nodejs.org/api/stream.html#stream_readable_read_size_1 + _read(size) { + this.cursor.read(size, (err, rows, result) => { if (err) { - return this.emit('error', err) - } - // if we get a 0 length array we've read to the end of the cursor - if (!rows.length) { - this._closed = true - setImmediate(() => this.emit('close')) - return this.push(null) - } - - // push each row into the stream - this._reading = false - for (var i = 0; i < rows.length; i++) { - this.push(rows[i]) + // https://nodejs.org/api/stream.html#stream_errors_while_reading + this.destroy(err) + } else { + for (const row of rows) this.push(row) + if (rows.length < size) this.push(null) } }) } diff --git a/packages/pg-query-stream/test/async-iterator.es6 b/packages/pg-query-stream/test/async-iterator.es6 index e84089b6c..47bda86d2 100644 --- a/packages/pg-query-stream/test/async-iterator.es6 +++ b/packages/pg-query-stream/test/async-iterator.es6 @@ -54,4 +54,59 @@ describe('Async iterator', () => { assert.equal(allRows.length, 603) await pool.end() }) + + it('can break out of iteration early', async () => { + const pool = new pg.Pool({ max: 1 }) + const client = await pool.connect() + const rows = [] + for await (const row of client.query(new QueryStream(queryText, [], { batchSize: 1 }))) { + rows.push(row) + break; + } + for await (const row of client.query(new QueryStream(queryText, []))) { + rows.push(row) + break; + } + for await (const row of client.query(new QueryStream(queryText, []))) { + rows.push(row) + break; + } + assert.strictEqual(rows.length, 3) + client.release() + await pool.end() + }) + + it('only returns rows on first iteration', async () => { + const pool = new pg.Pool({ max: 1 }) + const client = await pool.connect() + const rows = [] + const stream = client.query(new QueryStream(queryText, [])) + for await (const row of stream) { + rows.push(row) + break; + } + for await (const row of stream) { + rows.push(row) + } + for await (const row of stream) { + rows.push(row) + } + assert.strictEqual(rows.length, 1) + client.release() + await pool.end() + }) + + it('can read with delays', async () => { + const pool = new pg.Pool({ max: 1 }) + const client = await pool.connect() + const rows = [] + const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 })) + for await (const row of stream) { + rows.push(row) + await new Promise((resolve) => setTimeout(resolve, 1)) + } + assert.strictEqual(rows.length, 201) + client.release() + await pool.end() + }) }) diff --git a/packages/pg-query-stream/test/close.js b/packages/pg-query-stream/test/close.js index be103c7f6..d7e44b675 100644 --- a/packages/pg-query-stream/test/close.js +++ b/packages/pg-query-stream/test/close.js @@ -4,18 +4,22 @@ var concat = require('concat-stream') var QueryStream = require('../') var helper = require('./helper') +if (process.version.startsWith('v8.')) { + return console.error('warning! node versions less than 10lts no longer supported & stream closing semantics may not behave properly'); +} + helper('close', function (client) { it('emits close', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 }) var query = client.query(stream) - query.pipe(concat(function () {})) + query.pipe(concat(function () { })) query.on('close', done) }) }) helper('early close', function (client) { it('can be closed early', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2}) + var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 }) var query = client.query(stream) var readCount = 0 query.on('readable', function () { @@ -23,30 +27,62 @@ helper('early close', function (client) { query.read() }) query.once('readable', function () { - query.close() + query.destroy() }) query.on('close', function () { assert(readCount < 10, 'should not have read more than 10 rows') done() }) }) -}) -helper('close callback', function (client) { - it('notifies an optional callback when the conneciton is closed', function (done) { - var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2}) - var query = client.query(stream) - query.once('readable', function () { // only reading once - query.read() - }) - query.once('readable', function () { - query.close(function () { - // nothing to assert. This test will time out if the callback does not work. - done() + it('can destroy stream while reading', function (done) { + var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + setTimeout(() => { + stream.destroy() + stream.on('close', done) + }, 100) + }) + + it('emits an error when calling destroy with an error', function (done) { + var stream = new QueryStream('SELECT * FROM generate_series(0, 100), pg_sleep(1)') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + setTimeout(() => { + stream.destroy(new Error('intentional error')) + stream.on('error', (err) => { + // make sure there's an error + assert(err); + assert.strictEqual(err.message, 'intentional error'); + done(); }) + }, 100) + }) + + it('can destroy stream while reading an error', function (done) { + var stream = new QueryStream('SELECT * from pg_sleep(1), basdfasdf;') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + stream.once('error', () => { + stream.destroy() + // wait a bit to let any other errors shake through + setTimeout(done, 100) }) - query.on('close', function () { - assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream. - }) + }) + + it('does not crash when destroying the stream immediately after calling read', function (done) { + var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);') + client.query(stream) + stream.on('data', () => done(new Error('stream should not have returned rows'))) + stream.destroy() + stream.on('close', done) + }) + + it('does not crash when destroying the stream before its submitted', function (done) { + var stream = new QueryStream('SELECT * from generate_series(0, 100), pg_sleep(1);') + stream.on('data', () => done(new Error('stream should not have returned rows'))) + stream.destroy() + stream.on('close', done) }) }) diff --git a/packages/pg-query-stream/test/config.js b/packages/pg-query-stream/test/config.js index 4ed5b1b93..78251c894 100644 --- a/packages/pg-query-stream/test/config.js +++ b/packages/pg-query-stream/test/config.js @@ -2,9 +2,7 @@ var assert = require('assert') var QueryStream = require('../') var stream = new QueryStream('SELECT NOW()', [], { - highWaterMark: 999, batchSize: 88 }) -assert.equal(stream._readableState.highWaterMark, 999) -assert.equal(stream.batchSize, 88) +assert.equal(stream._readableState.highWaterMark, 88)