Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
31 changed files
with
2,440 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# These are supported funding model platforms | ||
|
||
github: [brianc] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
node_modules |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
The MIT License (MIT) | ||
|
||
Copyright (c) 2013 Brian M. Carlson | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# pg-query-stream | ||
|
||
[![Build Status](https://travis-ci.org/brianc/node-pg-query-stream.svg)](https://travis-ci.org/brianc/node-pg-query-stream) | ||
|
||
Receive result rows from [pg](https://github.com/brianc/node-postgres) as a readable (object) stream. | ||
|
||
|
||
## installation | ||
|
||
```bash | ||
$ npm install pg --save | ||
$ npm install pg-query-stream --save | ||
``` | ||
|
||
_requires pg>=2.8.1_ | ||
|
||
|
||
## use | ||
|
||
```js | ||
const pg = require('pg') | ||
const QueryStream = require('pg-query-stream') | ||
const JSONStream = require('JSONStream') | ||
|
||
//pipe 1,000,000 rows to stdout without blowing up your memory usage | ||
pg.connect((err, client, done) => { | ||
if (err) throw err; | ||
const query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000]) | ||
const stream = client.query(query) | ||
//release the client when the stream is finished | ||
stream.on('end', done) | ||
stream.pipe(JSONStream.stringify()).pipe(process.stdout) | ||
}) | ||
``` | ||
|
||
The stream uses a cursor on the server so it efficiently keeps only a low number of rows in memory. | ||
|
||
This is especially useful when doing [ETL](http://en.wikipedia.org/wiki/Extract,_transform,_load) on a huge table. Using manual `limit` and `offset` queries to fake out async itteration through your data is cumbersome, and _way way way_ slower than using a cursor. | ||
|
||
_note: this module only works with the JavaScript client, and does not work with the native bindings. libpq doesn't expose the protocol at a level where a cursor can be manipulated directly_ | ||
|
||
## contribution | ||
|
||
I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome? | ||
|
||
## license | ||
|
||
The MIT License (MIT) | ||
|
||
Copyright (c) 2013 Brian M. Carlson | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in | ||
all copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
THE SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
'use strict' | ||
var Cursor = require('pg-cursor') | ||
var Readable = require('stream').Readable | ||
|
||
class PgQueryStream extends Readable { | ||
constructor (text, values, options) { | ||
super(Object.assign({ objectMode: true }, options)) | ||
this.cursor = new Cursor(text, values, options) | ||
this._reading = false | ||
this._closed = false | ||
this.batchSize = (options || {}).batchSize || 100 | ||
|
||
// delegate Submittable callbacks to cursor | ||
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) | ||
this.handleDataRow = this.cursor.handleDataRow.bind(this.cursor) | ||
this.handlePortalSuspended = this.cursor.handlePortalSuspended.bind(this.cursor) | ||
this.handleCommandComplete = this.cursor.handleCommandComplete.bind(this.cursor) | ||
this.handleReadyForQuery = this.cursor.handleReadyForQuery.bind(this.cursor) | ||
this.handleError = this.cursor.handleError.bind(this.cursor) | ||
} | ||
|
||
submit (connection) { | ||
this.cursor.submit(connection) | ||
} | ||
|
||
close (callback) { | ||
this._closed = true | ||
const cb = callback || (() => this.emit('close')) | ||
this.cursor.close(cb) | ||
} | ||
|
||
_read (size) { | ||
if (this._reading || this._closed) { | ||
return false | ||
} | ||
this._reading = true | ||
const readAmount = Math.max(size, this.batchSize) | ||
this.cursor.read(readAmount, (err, rows) => { | ||
if (this._closed) { | ||
return | ||
} | ||
if (err) { | ||
return this.emit('error', err) | ||
} | ||
// if we get a 0 length array we've read to the end of the cursor | ||
if (!rows.length) { | ||
this._closed = true | ||
setImmediate(() => this.emit('close')) | ||
return this.push(null) | ||
} | ||
|
||
// push each row into the stream | ||
this._reading = false | ||
for (var i = 0; i < rows.length; i++) { | ||
this.push(rows[i]) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
module.exports = PgQueryStream |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
{ | ||
"name": "pg-query-stream", | ||
"version": "2.0.2", | ||
"description": "Postgres query result returned as readable stream", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "mocha", | ||
"lint": "eslint ." | ||
}, | ||
"repository": { | ||
"type": "git", | ||
"url": "git://github.com/brianc/node-postgres.git" | ||
}, | ||
"keywords": [ | ||
"postgres", | ||
"pg", | ||
"query", | ||
"stream" | ||
], | ||
"author": "Brian M. Carlson", | ||
"license": "MIT", | ||
"bugs": { | ||
"url": "https://github.com/brianc/node-postgres/issues" | ||
}, | ||
"devDependencies": { | ||
"JSONStream": "~0.7.1", | ||
"concat-stream": "~1.0.1", | ||
"eslint-plugin-promise": "^3.5.0", | ||
"mocha": "^6.2.2", | ||
"pg": "^7.15.2", | ||
"stream-spec": "~0.3.5", | ||
"stream-tester": "0.0.5", | ||
"through": "~2.3.4" | ||
}, | ||
"dependencies": { | ||
"pg-cursor": "^2.0.3" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
const QueryStream = require('../') | ||
const pg = require('pg') | ||
const assert = require('assert') | ||
|
||
const queryText = 'SELECT * FROM generate_series(0, 200) num' | ||
describe('Async iterator', () => { | ||
it('works', async () => { | ||
const stream = new QueryStream(queryText, []) | ||
const client = new pg.Client() | ||
await client.connect() | ||
const query = client.query(stream) | ||
const rows = [] | ||
for await (const row of query) { | ||
rows.push(row) | ||
} | ||
assert.equal(rows.length, 201) | ||
await client.end() | ||
}) | ||
|
||
it('can async iterate and then do a query afterwards', async () => { | ||
const stream = new QueryStream(queryText, []) | ||
const client = new pg.Client() | ||
await client.connect() | ||
const query = client.query(stream) | ||
const iteratorRows = [] | ||
for await (const row of query) { | ||
iteratorRows.push(row) | ||
} | ||
assert.equal(iteratorRows.length, 201) | ||
const { rows } = await client.query('SELECT NOW()') | ||
assert.equal(rows.length, 1) | ||
await client.end() | ||
}) | ||
|
||
it('can async iterate multiple times with a pool', async () => { | ||
const pool = new pg.Pool({ max: 1 }) | ||
|
||
const allRows = [] | ||
const run = async () => { | ||
// get the client | ||
const client = await pool.connect() | ||
// stream some rows | ||
const stream = new QueryStream(queryText, []) | ||
const iteratorRows = [] | ||
client.query(stream) | ||
for await (const row of stream) { | ||
iteratorRows.push(row) | ||
allRows.push(row) | ||
} | ||
assert.equal(iteratorRows.length, 201) | ||
client.release() | ||
} | ||
await Promise.all([run(), run(), run()]) | ||
assert.equal(allRows.length, 603) | ||
await pool.end() | ||
}) | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
// only newer versions of node support async iterator | ||
if (!process.version.startsWith('v8')) { | ||
require('./async-iterator.es6') | ||
} |
Oops, something went wrong.