Skip to content

Commit

Permalink
Make the stream catch errors in the query (#2638)
Browse files Browse the repository at this point in the history
* Make the stream catch errors in the query

* Fix another case in which stream doesnt emits error

* Linter stuff

* Remove setTimeout in tests

* Make a test not to check the MySQL error code

* Fix stream error catching for MariaDB and PostgreSQL

* Fix stream error catching in Oracle

* Throw the error after emitting it to the stream

* Throw the error without instantiating a new Error
  • Loading branch information
fcmatteo authored and elhigu committed Jun 27, 2018
1 parent 933d570 commit ec85502
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 24 deletions.
5 changes: 4 additions & 1 deletion src/dialects/maria/index.js
Expand Up @@ -76,7 +76,10 @@ assign(Client_MariaSQL.prototype, {
connection.query(sql.sql, sql.bindings)
.on('result', function(res) {
res
.on('error', rejecter)
.on('error', (err) => {
rejecter(err)
stream.emit('error', err)
})
.on('end', function() {
resolver(res.info);
})
Expand Down
9 changes: 8 additions & 1 deletion src/dialects/mysql/index.js
Expand Up @@ -104,7 +104,14 @@ assign(Client_MySQL.prototype, {
return new Promise((resolver, rejecter) => {
stream.on('error', rejecter)
stream.on('end', resolver)
connection.query(queryOptions, obj.bindings).stream(options).pipe(stream)
const queryStream = connection.query(queryOptions, obj.bindings).stream(options)

queryStream.on('error', (err) => {
rejecter(err);
stream.emit('error', err)
})

queryStream.pipe(stream)
})
},

Expand Down
4 changes: 4 additions & 0 deletions src/dialects/oracle/index.js
Expand Up @@ -125,6 +125,10 @@ assign(Client_Oracle.prototype, {
stream.on('end', resolver);
const queryStream = connection.queryStream(obj.sql, obj.bindings, options);
queryStream.pipe(stream);
queryStream.on('error', function(error) {
rejecter(error);
stream.emit('error', error);
});
})
},

Expand Down
10 changes: 5 additions & 5 deletions src/dialects/postgres/index.js
Expand Up @@ -199,15 +199,15 @@ assign(Client_PG.prototype, {
_stream(connection, obj, stream, options) {
const PGQueryStream = process.browser ? undefined : require('pg-query-stream');
const sql = obj.sql;

return new Promise(function(resolver, rejecter) {
const queryStream = connection.query(new PGQueryStream(sql, obj.bindings, options));
queryStream.on('error', function(error) { stream.emit('error', error); });
// 'error' is not propagated by .pipe, but it breaks the pipe
stream.on('error', function(error) {
// Ensure the queryStream is closed so the connection can be released.
queryStream.close();

queryStream.on('error', function(error) {
rejecter(error);
stream.emit('error', error);
});

// 'end' IS propagated by .pipe, by default
stream.on('end', resolver);
queryStream.pipe(stream);
Expand Down
17 changes: 11 additions & 6 deletions src/runner.js
Expand Up @@ -84,13 +84,18 @@ assign(Runner.prototype, {
const promise = Promise.using(this.ensureConnection(), function(connection) {
hasConnection = true;
runner.connection = connection;
const sql = runner.builder.toSQL()
const err = new Error('The stream may only be used with a single query statement.');
if (isArray(sql)) {
if (hasHandler) throw err;
stream.emit('error', err);
try {
const sql = runner.builder.toSQL()

if (isArray(sql) && hasHandler) {
throw new Error('The stream may only be used with a single query statement.');
}

return runner.client.stream(runner.connection, sql, stream, options)
} catch (e) {
stream.emit('error', e)
throw e;
}
return runner.client.stream(runner.connection, sql, stream, options);
})

// If a function is passed to handle the stream, send the stream
Expand Down
38 changes: 27 additions & 11 deletions test/integration/builder/selects.js
@@ -1,10 +1,10 @@
/*global describe, expect, it, testPromise, d*/
'use strict';

var _ = require('lodash')
var assert = require('assert')
var Promise = testPromise;
var Runner = require('../../../lib/runner');
const _ = require('lodash')
const assert = require('assert')
const Promise = testPromise;
const Runner = require('../../../lib/runner');

module.exports = function(knex) {

Expand Down Expand Up @@ -212,7 +212,7 @@ module.exports = function(knex) {
return knex('accounts')
.options({
typeCast (field, next) {
var val
let val
if (field.type === 'VAR_STRING') {
val = field.string()
return val == null ? val : val.toUpperCase()
Expand All @@ -235,22 +235,22 @@ module.exports = function(knex) {
})

it('emits error on the stream, if not passed a function, and connecting fails', function() {
var expected = new Error();
var original = Runner.prototype.ensureConnection;
const expected = new Error();
const original = Runner.prototype.ensureConnection;
Runner.prototype.ensureConnection = function() {
return Promise.reject(expected);
};

var restore = () => {
const restore = () => {
Runner.prototype.ensureConnection = original;
};

var promise = new Promise((resolve, reject) => {
var timeout = setTimeout(() => {
const promise = new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Timeout'));
}, 5000);

var stream = knex('accounts').stream();
const stream = knex('accounts').stream();
stream.on('error', function(actual) {
clearTimeout(timeout);

Expand All @@ -266,6 +266,22 @@ module.exports = function(knex) {
return promise;
});

it('emits error on the stream, if not passed a function, and query fails', function(done) {
const stream = knex('accounts').select('invalid_field').stream()
stream.on('error', function(err) {
assert(err instanceof Error)
done()
})
})

it('emits error if not passed a function and the query has wrong bindings', function(done) {
const stream = knex('accounts').whereRaw('id = ? and first_name = ?', ['2']).stream()
stream.on('error', function(err) {
assert(err instanceof Error)
done()
})
})

it('properly escapes postgres queries on streaming', function() {
let count = 0;
return knex('accounts').where('id', 1).stream(function(rowStream) {
Expand Down

0 comments on commit ec85502

Please sign in to comment.