Skip to content

Commit

Permalink
Parser speed improvements (#2151)
Browse files Browse the repository at this point in the history
* Change from transform stream

* Yeah a thing

* Make tests pass, add new code to travis

* Update 'best' benchmarks and include tsc in pretest script

* Need to add build early so we can create test tables

* logging
  • Loading branch information
brianc committed Apr 2, 2020
1 parent 90c6d13 commit 2013d77
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 77 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Expand Up @@ -2,10 +2,13 @@ language: node_js
dist: bionic

before_script: |
yarn build
node packages/pg/script/create-test-tables.js postgresql:///
env:
- CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres
# test w/ new faster parsing code
- CC=clang CXX=clang++ npm_config_clang=1 PGUSER=postgres PGDATABASE=postgres PG_FAST_CONNECTION=true

node_js:
- lts/dubnium
Expand All @@ -30,6 +33,7 @@ matrix:
-e '/^host/ s/trust$/md5/' \
/etc/postgresql/10/main/pg_hba.conf
sudo -u postgres psql -c "ALTER ROLE postgres PASSWORD 'test-password'; SELECT pg_reload_conf()"
yarn build
node packages/pg/script/create-test-tables.js postgresql:///
- node_js: lts/carbon
Expand Down
2 changes: 2 additions & 0 deletions package.json
Expand Up @@ -11,6 +11,8 @@
],
"scripts": {
"test": "yarn lerna exec yarn test",
"build": "yarn lerna exec --scope pg-packet-stream yarn build",
"pretest": "yarn build",
"lint": "yarn lerna exec --parallel yarn lint"
},
"devDependencies": {
Expand Down
6 changes: 4 additions & 2 deletions packages/pg-packet-stream/package.json
Expand Up @@ -16,7 +16,9 @@
},
"scripts": {
"test": "mocha dist/**/*.test.js",
"prepublish": "tsc",
"pretest": "tsc"
"build": "tsc",
"build:watch": "tsc --watch",
"prepublish": "yarn build",
"pretest": "yarn build"
}
}
64 changes: 25 additions & 39 deletions packages/pg-packet-stream/src/inbound-parser.test.ts
@@ -1,8 +1,9 @@
import buffers from './testing/test-buffers'
import BufferList from './testing/buffer-list'
import { PgPacketStream } from './'
import { parse } from './'
import assert from 'assert'
import { Readable } from 'stream'
import { PassThrough } from 'stream'
import { BackendMessage } from './messages'

var authOkBuffer = buffers.authenticationOk()
var paramStatusBuffer = buffers.parameterStatus('client_encoding', 'UTF8')
Expand Down Expand Up @@ -137,25 +138,14 @@ var expectedTwoRowMessage = {
}]
}

const concat = (stream: Readable): Promise<any[]> => {
return new Promise((resolve) => {
const results: any[] = []
stream.on('data', item => results.push(item))
stream.on('end', () => resolve(results))
})
}

var testForMessage = function (buffer: Buffer, expectedMessage: any) {
it('recieves and parses ' + expectedMessage.name, async () => {
const parser = new PgPacketStream();
parser.write(buffer);
parser.end();
const [lastMessage] = await concat(parser);
const messages = await parseBuffers([buffer])
const [lastMessage] = messages;

for (const key in expectedMessage) {
assert.deepEqual(lastMessage[key], expectedMessage[key])
assert.deepEqual((lastMessage as any)[key], expectedMessage[key])
}

})
}

Expand Down Expand Up @@ -197,6 +187,19 @@ var expectedNotificationResponseMessage = {
payload: 'boom'
}



const parseBuffers = async (buffers: Buffer[]): Promise<BackendMessage[]> => {
const stream = new PassThrough();
for (const buffer of buffers) {
stream.write(buffer);
}
stream.end()
const msgs: BackendMessage[] = []
await parse(stream, (msg) => msgs.push(msg))
return msgs
}

describe('PgPacketStream', function () {
testForMessage(authOkBuffer, expectedAuthenticationOkayMessage)
testForMessage(plainPasswordBuffer, expectedPlainPasswordMessage)
Expand Down Expand Up @@ -391,18 +394,9 @@ describe('PgPacketStream', function () {
describe('split buffer, single message parsing', function () {
var fullBuffer = buffers.dataRow([null, 'bang', 'zug zug', null, '!'])

const parse = async (buffers: Buffer[]): Promise<any> => {
const parser = new PgPacketStream();
for (const buffer of buffers) {
parser.write(buffer);
}
parser.end()
const [msg] = await concat(parser)
return msg;
}

it('parses when full buffer comes in', async function () {
const message = await parse([fullBuffer]);
const messages = await parseBuffers([fullBuffer]);
const message = messages[0] as any
assert.equal(message.fields.length, 5)
assert.equal(message.fields[0], null)
assert.equal(message.fields[1], 'bang')
Expand All @@ -416,7 +410,8 @@ describe('PgPacketStream', function () {
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
fullBuffer.copy(firstBuffer, 0, 0)
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
const message = await parse([firstBuffer, secondBuffer]);
const messages = await parseBuffers([fullBuffer]);
const message = messages[0] as any
assert.equal(message.fields.length, 5)
assert.equal(message.fields[0], null)
assert.equal(message.fields[1], 'bang')
Expand Down Expand Up @@ -447,15 +442,6 @@ describe('PgPacketStream', function () {
dataRowBuffer.copy(fullBuffer, 0, 0)
readyForQueryBuffer.copy(fullBuffer, dataRowBuffer.length, 0)

const parse = (buffers: Buffer[]): Promise<any[]> => {
const parser = new PgPacketStream();
for (const buffer of buffers) {
parser.write(buffer);
}
parser.end()
return concat(parser)
}

var verifyMessages = function (messages: any[]) {
assert.strictEqual(messages.length, 2)
assert.deepEqual(messages[0], {
Expand All @@ -473,7 +459,7 @@ describe('PgPacketStream', function () {
}
// sanity check
it('recieves both messages when packet is not split', async function () {
const messages = await parse([fullBuffer])
const messages = await parseBuffers([fullBuffer])
verifyMessages(messages)
})

Expand All @@ -482,7 +468,7 @@ describe('PgPacketStream', function () {
var secondBuffer = Buffer.alloc(fullBuffer.length - firstBuffer.length)
fullBuffer.copy(firstBuffer, 0, 0)
fullBuffer.copy(secondBuffer, 0, firstBuffer.length)
const messages = await parse([firstBuffer, secondBuffer])
const messages = await parseBuffers([firstBuffer, secondBuffer])
verifyMessages(messages)
}

Expand Down
32 changes: 16 additions & 16 deletions packages/pg-packet-stream/src/index.ts
@@ -1,5 +1,5 @@
import { Transform, TransformCallback, TransformOptions } from 'stream';
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password } from './messages';
import { TransformOptions } from 'stream';
import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages';
import { BufferReader } from './BufferReader';
import assert from 'assert'

Expand Down Expand Up @@ -46,23 +46,27 @@ const enum MessageCodes {
CopyData = 0x64, // d
}

export class PgPacketStream extends Transform {
type MessageCallback = (msg: BackendMessage) => void;

export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
const parser = new PgPacketParser()
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
return new Promise((resolve) => stream.on('end', () => resolve()))
}

class PgPacketParser {
private remainingBuffer: Buffer = emptyBuffer;
private reader = new BufferReader();
private mode: Mode;

constructor(opts?: StreamOptions) {
super({
...opts,
readableObjectMode: true
})
if (opts?.mode === 'binary') {
throw new Error('Binary mode not supported yet')
}
this.mode = opts?.mode || 'text';
}

public _transform(buffer: Buffer, encoding: string, callback: TransformCallback) {
public parse(buffer: Buffer, callback: MessageCallback) {
let combinedBuffer = buffer;
if (this.remainingBuffer.byteLength) {
combinedBuffer = Buffer.allocUnsafe(this.remainingBuffer.byteLength + buffer.byteLength);
Expand All @@ -81,7 +85,7 @@ export class PgPacketStream extends Transform {

if (fullMessageLength + offset <= combinedBuffer.byteLength) {
const message = this.handlePacket(offset + HEADER_LENGTH, code, length, combinedBuffer);
this.push(message)
callback(message)
offset += fullMessageLength;
} else {
break;
Expand All @@ -94,7 +98,6 @@ export class PgPacketStream extends Transform {
this.remainingBuffer = combinedBuffer.slice(offset)
}

callback(null);
}

private handlePacket(offset: number, code: number, length: number, bytes: Buffer): BackendMessage {
Expand Down Expand Up @@ -146,10 +149,6 @@ export class PgPacketStream extends Transform {
}
}

public _flush(callback: TransformCallback) {
this._transform(Buffer.alloc(0), 'utf-8', callback)
}

private parseReadyForQueryMessage(offset: number, length: number, bytes: Buffer) {
this.reader.setBuffer(offset, bytes);
const status = this.reader.string(1);
Expand Down Expand Up @@ -304,8 +303,9 @@ export class PgPacketStream extends Transform {
fieldType = this.reader.string(1)
}

// the msg is an Error instance
var message = new DatabaseError(fields.M, length, name)
const messageValue = fields.M

const message = name === MessageName.notice ? new NoticeMessage(length, messageValue) : new DatabaseError(messageValue, length, name)

message.severity = fields.S
message.code = fields.C
Expand Down
43 changes: 42 additions & 1 deletion packages/pg-packet-stream/src/messages.ts
Expand Up @@ -74,7 +74,27 @@ export const copyDone: BackendMessage = {
length: 4,
}

export class DatabaseError extends Error {
interface NoticeOrError {
message: string | undefined;
severity: string | undefined;
code: string | undefined;
detail: string | undefined;
hint: string | undefined;
position: string | undefined;
internalPosition: string | undefined;
internalQuery: string | undefined;
where: string | undefined;
schema: string | undefined;
table: string | undefined;
column: string | undefined;
dataType: string | undefined;
constraint: string | undefined;
file: string | undefined;
line: string | undefined;
routine: string | undefined;
}

export class DatabaseError extends Error implements NoticeOrError {
public severity: string | undefined;
public code: string | undefined;
public detail: string | undefined;
Expand Down Expand Up @@ -167,3 +187,24 @@ export class DataRowMessage {
this.fieldCount = fields.length;
}
}

export class NoticeMessage implements BackendMessage, NoticeOrError {
constructor(public readonly length: number, public readonly message: string | undefined) {}
public readonly name = MessageName.notice;
public severity: string | undefined;
public code: string | undefined;
public detail: string | undefined;
public hint: string | undefined;
public position: string | undefined;
public internalPosition: string | undefined;
public internalQuery: string | undefined;
public where: string | undefined;
public schema: string | undefined;
public table: string | undefined;
public column: string | undefined;
public dataType: string | undefined;
public constraint: string | undefined;
public file: string | undefined;
public line: string | undefined;
public routine: string | undefined;
}
5 changes: 3 additions & 2 deletions packages/pg/bench.js
Expand Up @@ -54,13 +54,14 @@ const run = async () => {
queries = await bench(client, seq, seconds * 1000);
console.log("sequence queries:", queries);
console.log("qps", queries / seconds);
console.log("on my laptop best so far seen 1192 qps")
console.log("on my laptop best so far seen 1209 qps")

console.log('')
queries = await bench(client, insert, seconds * 1000);
console.log("insert queries:", queries);
console.log("qps", queries / seconds);
console.log("on my laptop best so far seen 5600 qps")
console.log("on my laptop best so far seen 5799 qps")
console.log()
await client.end();
await client.end();
};
Expand Down
35 changes: 18 additions & 17 deletions packages/pg/lib/connection-fast.js
Expand Up @@ -13,13 +13,13 @@ var util = require('util')

var Writer = require('buffer-writer')
// eslint-disable-next-line
var PacketStream = require('pg-packet-stream')
const { parse } = require('pg-packet-stream')

var TEXT_MODE = 0

// TODO(bmc) support binary mode here
// var BINARY_MODE = 1
console.log('using faster connection')
console.log('***using faster connection***')
var Connection = function (config) {
EventEmitter.call(this)
config = config || {}
Expand Down Expand Up @@ -84,12 +84,13 @@ Connection.prototype.connect = function (port, host) {
this.stream.once('data', function (buffer) {
var responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'N': // Server does not support SSL connections
return self.emit('error', new Error('The server does not support SSL connections'))
case 'S': // Server supports SSL connections, continue with a secure connection
break
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
case 'N': // Server does not support SSL connections
self.stream.end()
return self.emit('error', new Error('The server does not support SSL connections'))
default: // Any other response byte, including 'E' (ErrorResponse) indicating a server error
self.stream.end()
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
var tls = require('tls')
Expand All @@ -108,19 +109,15 @@ Connection.prototype.connect = function (port, host) {
}

Connection.prototype.attachListeners = function (stream) {
var self = this
const mode = this._mode === TEXT_MODE ? 'text' : 'binary'
const packetStream = new PacketStream.PgPacketStream({ mode })
this.stream.pipe(packetStream)
packetStream.on('data', (msg) => {
stream.on('end', () => {
this.emit('end')
})
parse(stream, (msg) => {
var eventName = msg.name === 'error' ? 'errorMessage' : msg.name
if (self._emitMessage) {
self.emit('message', msg)
if (this._emitMessage) {
this.emit('message', msg)
}
self.emit(eventName, msg)
})
stream.on('end', function () {
self.emit('end')
this.emit(eventName, msg)
})
}

Expand Down Expand Up @@ -331,6 +328,10 @@ Connection.prototype.end = function () {
// 0x58 = 'X'
this.writer.clear()
this._ending = true
if (!this.stream.writable) {
this.stream.end()
return
}
return this.stream.write(END_BUFFER, () => {
this.stream.end()
})
Expand Down

0 comments on commit 2013d77

Please sign in to comment.