diff --git a/package.json b/package.json index 03e3827e1..160180777 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ ], "scripts": { "test": "yarn lerna exec yarn test", - "build": "yarn lerna exec --scope pg-packet-stream yarn build", + "build": "yarn lerna exec --scope pg-protocol yarn build", "pretest": "yarn build", "lint": "yarn lerna exec --parallel yarn lint" }, diff --git a/packages/pg-packet-stream/package.json b/packages/pg-protocol/package.json similarity index 81% rename from packages/pg-packet-stream/package.json rename to packages/pg-protocol/package.json index bf9c13e84..e3e5640cd 100644 --- a/packages/pg-packet-stream/package.json +++ b/packages/pg-protocol/package.json @@ -1,6 +1,7 @@ { - "name": "pg-packet-stream", + "name": "pg-protocol", "version": "1.1.0", + "description": "The postgres client/server binary protocol, implemented in TypeScript", "main": "dist/index.js", "types": "dist/index.d.ts", "license": "MIT", diff --git a/packages/pg-protocol/src/b.ts b/packages/pg-protocol/src/b.ts new file mode 100644 index 000000000..267d211c4 --- /dev/null +++ b/packages/pg-protocol/src/b.ts @@ -0,0 +1,24 @@ +// file for microbenchmarking + +import { Writer } from './buffer-writer' +import { serialize } from './index' + +const LOOPS = 1000 +let count = 0 +let start = Date.now() +const writer = new Writer() + +const run = () => { + if (count > LOOPS) { + console.log(Date.now() - start) + return; + } + count++ + for(let i = 0; i < LOOPS; i++) { + serialize.describe({ type: 'P'}) + serialize.describe({ type: 'S'}) + } + setImmediate(run) +} + +run() diff --git a/packages/pg-packet-stream/src/BufferReader.ts b/packages/pg-protocol/src/buffer-reader.ts similarity index 96% rename from packages/pg-packet-stream/src/BufferReader.ts rename to packages/pg-protocol/src/buffer-reader.ts index 9729d919f..68dc89cae 100644 --- a/packages/pg-packet-stream/src/BufferReader.ts +++ b/packages/pg-protocol/src/buffer-reader.ts @@ -2,8 +2,10 @@ const emptyBuffer = Buffer.allocUnsafe(0); export class BufferReader { private buffer: Buffer = emptyBuffer; - // TODO(bmc): support non-utf8 encoding + + // TODO(bmc): support non-utf8 encoding? private encoding: string = 'utf-8'; + constructor(private offset: number = 0) { } public setBuffer(offset: number, buffer: Buffer): void { diff --git a/packages/pg-protocol/src/buffer-writer.ts b/packages/pg-protocol/src/buffer-writer.ts new file mode 100644 index 000000000..2299070d1 --- /dev/null +++ b/packages/pg-protocol/src/buffer-writer.ts @@ -0,0 +1,87 @@ +//binary data writer tuned for encoding binary specific to the postgres binary protocol + +export class Writer { + private buffer: Buffer; + private offset: number = 5; + private headerPosition: number = 0; + constructor(private size = 256) { + this.buffer = Buffer.alloc(size) + } + + private ensure(size: number): void { + var remaining = this.buffer.length - this.offset; + if (remaining < size) { + var oldBuffer = this.buffer; + // exponential growth factor of around ~ 1.5 + // https://stackoverflow.com/questions/2269063/buffer-growth-strategy + var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size; + this.buffer = Buffer.alloc(newSize); + oldBuffer.copy(this.buffer); + } + } + + public addInt32(num: number): Writer { + this.ensure(4); + this.buffer[this.offset++] = (num >>> 24 & 0xFF); + this.buffer[this.offset++] = (num >>> 16 & 0xFF); + this.buffer[this.offset++] = (num >>> 8 & 0xFF); + this.buffer[this.offset++] = (num >>> 0 & 0xFF); + return this; + } + + public addInt16(num: number): Writer { + this.ensure(2); + this.buffer[this.offset++] = (num >>> 8 & 0xFF); + this.buffer[this.offset++] = (num >>> 0 & 0xFF); + return this; + } + + + public addCString(string: string): Writer { + if (!string) { + this.ensure(1); + } else { + var len = Buffer.byteLength(string); + this.ensure(len + 1); // +1 for null terminator + this.buffer.write(string, this.offset, 'utf-8') + this.offset += len; + } + + this.buffer[this.offset++] = 0; // null terminator + return this; + } + + public addString(string: string = ""): Writer { + var len = Buffer.byteLength(string); + this.ensure(len); + this.buffer.write(string, this.offset); + this.offset += len; + return this; + } + + public add(otherBuffer: Buffer): Writer { + this.ensure(otherBuffer.length); + otherBuffer.copy(this.buffer, this.offset); + this.offset += otherBuffer.length; + return this; + } + + private join(code?: number): Buffer { + if (code) { + this.buffer[this.headerPosition] = code; + //length is everything in this packet minus the code + const length = this.offset - (this.headerPosition + 1) + this.buffer.writeInt32BE(length, this.headerPosition + 1) + } + return this.buffer.slice(code ? 0 : 5, this.offset); + } + + public flush(code?: number): Buffer { + var result = this.join(code); + this.offset = 5; + this.headerPosition = 0; + this.buffer = Buffer.allocUnsafe(this.size) + return result; + } +} + diff --git a/packages/pg-packet-stream/src/inbound-parser.test.ts b/packages/pg-protocol/src/inbound-parser.test.ts similarity index 99% rename from packages/pg-packet-stream/src/inbound-parser.test.ts rename to packages/pg-protocol/src/inbound-parser.test.ts index e8619bf83..461ab2628 100644 --- a/packages/pg-packet-stream/src/inbound-parser.test.ts +++ b/packages/pg-protocol/src/inbound-parser.test.ts @@ -1,6 +1,6 @@ import buffers from './testing/test-buffers' import BufferList from './testing/buffer-list' -import { parse } from './' +import { parse } from '.' import assert from 'assert' import { PassThrough } from 'stream' import { BackendMessage } from './messages' diff --git a/packages/pg-protocol/src/index.ts b/packages/pg-protocol/src/index.ts new file mode 100644 index 000000000..f4ade0173 --- /dev/null +++ b/packages/pg-protocol/src/index.ts @@ -0,0 +1,11 @@ +import { BackendMessage } from './messages'; +import { serialize } from './serializer'; +import { Parser, MessageCallback } from './parser' + +export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise { + const parser = new Parser() + stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback)) + return new Promise((resolve) => stream.on('end', () => resolve())) +} + +export { serialize }; diff --git a/packages/pg-packet-stream/src/messages.ts b/packages/pg-protocol/src/messages.ts similarity index 100% rename from packages/pg-packet-stream/src/messages.ts rename to packages/pg-protocol/src/messages.ts diff --git a/packages/pg-protocol/src/outbound-serializer.test.ts b/packages/pg-protocol/src/outbound-serializer.test.ts new file mode 100644 index 000000000..110b932ce --- /dev/null +++ b/packages/pg-protocol/src/outbound-serializer.test.ts @@ -0,0 +1,256 @@ +import assert from 'assert' +import { serialize } from './serializer' +import BufferList from './testing/buffer-list' + +describe('serializer', () => { + it('builds startup message', function () { + const actual = serialize.startup({ + user: 'brian', + database: 'bang' + }) + assert.deepEqual(actual, new BufferList() + .addInt16(3) + .addInt16(0) + .addCString('user') + .addCString('brian') + .addCString('database') + .addCString('bang') + .addCString('client_encoding') + .addCString("'utf-8'") + .addCString('').join(true)) + }) + + it('builds password message', function () { + const actual = serialize.password('!') + assert.deepEqual(actual, new BufferList().addCString('!').join(true, 'p')) + }) + + it('builds request ssl message', function () { + const actual = serialize.requestSsl() + const expected = new BufferList().addInt32(80877103).join(true) + assert.deepEqual(actual, expected); + }) + + it('builds SASLInitialResponseMessage message', function () { + const actual = serialize.sendSASLInitialResponseMessage('mech', 'data') + assert.deepEqual(actual, new BufferList().addCString('mech').addInt32(4).addString('data').join(true, 'p')) + }) + + + it('builds SCRAMClientFinalMessage message', function () { + const actual = serialize.sendSCRAMClientFinalMessage('data') + assert.deepEqual(actual, new BufferList().addString('data').join(true, 'p')) + }) + + + it('builds query message', function () { + var txt = 'select * from boom' + const actual = serialize.query(txt) + assert.deepEqual(actual, new BufferList().addCString(txt).join(true, 'Q')) + }) + + + describe('parse message', () => { + + it('builds parse message', function () { + const actual = serialize.parse({ text: '!' }) + var expected = new BufferList() + .addCString('') + .addCString('!') + .addInt16(0).join(true, 'P') + assert.deepEqual(actual, expected) + }) + + it('builds parse message with named query', function () { + const actual = serialize.parse({ + name: 'boom', + text: 'select * from boom', + types: [] + }) + var expected = new BufferList() + .addCString('boom') + .addCString('select * from boom') + .addInt16(0).join(true, 'P') + assert.deepEqual(actual, expected) + }) + + it('with multiple parameters', function () { + const actual = serialize.parse({ + name: 'force', + text: 'select * from bang where name = $1', + types: [1, 2, 3, 4] + }) + var expected = new BufferList() + .addCString('force') + .addCString('select * from bang where name = $1') + .addInt16(4) + .addInt32(1) + .addInt32(2) + .addInt32(3) + .addInt32(4).join(true, 'P') + assert.deepEqual(actual, expected) + }) + + }) + + + describe('bind messages', function () { + it('with no values', function () { + const actual = serialize.bind() + + var expectedBuffer = new BufferList() + .addCString('') + .addCString('') + .addInt16(0) + .addInt16(0) + .addInt16(0) + .join(true, 'B') + assert.deepEqual(actual, expectedBuffer) + }) + + it('with named statement, portal, and values', function () { + const actual = serialize.bind({ + portal: 'bang', + statement: 'woo', + values: ['1', 'hi', null, 'zing'] + }) + var expectedBuffer = new BufferList() + .addCString('bang') // portal name + .addCString('woo') // statement name + .addInt16(0) + .addInt16(4) + .addInt32(1) + .add(Buffer.from('1')) + .addInt32(2) + .add(Buffer.from('hi')) + .addInt32(-1) + .addInt32(4) + .add(Buffer.from('zing')) + .addInt16(0) + .join(true, 'B') + assert.deepEqual(actual, expectedBuffer) + }) + }) + + it('with named statement, portal, and buffer value', function () { + const actual = serialize.bind({ + portal: 'bang', + statement: 'woo', + values: ['1', 'hi', null, Buffer.from('zing', 'utf8')] + }) + var expectedBuffer = new BufferList() + .addCString('bang') // portal name + .addCString('woo') // statement name + .addInt16(4)// value count + .addInt16(0)// string + .addInt16(0)// string + .addInt16(0)// string + .addInt16(1)// binary + .addInt16(4) + .addInt32(1) + .add(Buffer.from('1')) + .addInt32(2) + .add(Buffer.from('hi')) + .addInt32(-1) + .addInt32(4) + .add(Buffer.from('zing', 'utf-8')) + .addInt16(0) + .join(true, 'B') + assert.deepEqual(actual, expectedBuffer) + }) + + describe('builds execute message', function () { + it('for unamed portal with no row limit', function () { + const actual = serialize.execute() + var expectedBuffer = new BufferList() + .addCString('') + .addInt32(0) + .join(true, 'E') + assert.deepEqual(actual, expectedBuffer) + }) + + it('for named portal with row limit', function () { + const actual = serialize.execute({ + portal: 'my favorite portal', + rows: 100 + }) + var expectedBuffer = new BufferList() + .addCString('my favorite portal') + .addInt32(100) + .join(true, 'E') + assert.deepEqual(actual, expectedBuffer) + }) + }) + + it('builds flush command', function () { + const actual = serialize.flush() + var expected = new BufferList().join(true, 'H') + assert.deepEqual(actual, expected) + }) + + it('builds sync command', function () { + const actual = serialize.sync() + var expected = new BufferList().join(true, 'S') + assert.deepEqual(actual, expected) + }) + + it('builds end command', function () { + const actual = serialize.end() + var expected = Buffer.from([0x58, 0, 0, 0, 4]) + assert.deepEqual(actual, expected) + }) + + describe('builds describe command', function () { + it('describe statement', function () { + const actual = serialize.describe({ type: 'S', name: 'bang' }) + var expected = new BufferList().addChar('S').addCString('bang').join(true, 'D') + assert.deepEqual(actual, expected) + }) + + it('describe unnamed portal', function () { + const actual = serialize.describe({ type: 'P' }) + var expected = new BufferList().addChar('P').addCString('').join(true, 'D') + assert.deepEqual(actual, expected) + }) + }) + + describe('builds close command', function () { + it('describe statement', function () { + const actual = serialize.close({ type: 'S', name: 'bang' }) + var expected = new BufferList().addChar('S').addCString('bang').join(true, 'C') + assert.deepEqual(actual, expected) + }) + + it('describe unnamed portal', function () { + const actual = serialize.close({ type: 'P' }) + var expected = new BufferList().addChar('P').addCString('').join(true, 'C') + assert.deepEqual(actual, expected) + }) + }) + + describe('copy messages', function () { + it('builds copyFromChunk', () => { + const actual = serialize.copyData(Buffer.from([1, 2, 3])) + const expected = new BufferList().add(Buffer.from([1, 2,3 ])).join(true, 'd') + assert.deepEqual(actual, expected) + }) + + it('builds copy fail', () => { + const actual = serialize.copyFail('err!') + const expected = new BufferList().addCString('err!').join(true, 'f') + assert.deepEqual(actual, expected) + }) + + it('builds copy done', () => { + const actual = serialize.copyDone() + const expected = new BufferList().join(true, 'c') + assert.deepEqual(actual, expected) + }) + }) + + it('builds cancel message', () => { + const actual = serialize.cancel(3, 4) + const expected = new BufferList().addInt16(1234).addInt16(5678).addInt32(3).addInt32(4).join(true) + assert.deepEqual(actual, expected) + }) +}) diff --git a/packages/pg-packet-stream/src/index.ts b/packages/pg-protocol/src/parser.ts similarity index 96% rename from packages/pg-packet-stream/src/index.ts rename to packages/pg-protocol/src/parser.ts index 3ebe5e847..69a9c28b2 100644 --- a/packages/pg-packet-stream/src/index.ts +++ b/packages/pg-protocol/src/parser.ts @@ -1,6 +1,6 @@ import { TransformOptions } from 'stream'; import { Mode, bindComplete, parseComplete, closeComplete, noData, portalSuspended, copyDone, replicationStart, emptyQuery, ReadyForQueryMessage, CommandCompleteMessage, CopyDataMessage, CopyResponse, NotificationResponseMessage, RowDescriptionMessage, Field, DataRowMessage, ParameterStatusMessage, BackendKeyDataMessage, DatabaseError, BackendMessage, MessageName, AuthenticationMD5Password, NoticeMessage } from './messages'; -import { BufferReader } from './BufferReader'; +import { BufferReader } from './buffer-reader'; import assert from 'assert' // every message is prefixed with a single bye @@ -46,15 +46,9 @@ const enum MessageCodes { CopyData = 0x64, // d } -type MessageCallback = (msg: BackendMessage) => void; +export type MessageCallback = (msg: BackendMessage) => void; -export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise { - const parser = new PgPacketParser() - stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback)) - return new Promise((resolve) => stream.on('end', () => resolve())) -} - -class PgPacketParser { +export class Parser { private remainingBuffer: Buffer = emptyBuffer; private reader = new BufferReader(); private mode: Mode; diff --git a/packages/pg-protocol/src/serializer.ts b/packages/pg-protocol/src/serializer.ts new file mode 100644 index 000000000..71ac3c878 --- /dev/null +++ b/packages/pg-protocol/src/serializer.ts @@ -0,0 +1,272 @@ +import { Writer } from './buffer-writer' + +const enum code { + startup = 0x70, + query = 0x51, + parse = 0x50, + bind = 0x42, + execute = 0x45, + flush = 0x48, + sync = 0x53, + end = 0x58, + close = 0x43, + describe = 0x44, + copyFromChunk = 0x64, + copyDone = 0x63, + copyFail = 0x66 +} + +const writer = new Writer() + +const startup = (opts: Record): Buffer => { + // protocol version + writer.addInt16(3).addInt16(0) + for (const key of Object.keys(opts)) { + writer.addCString(key).addCString(opts[key]) + } + + writer.addCString('client_encoding').addCString("'utf-8'") + + var bodyBuffer = writer.addCString('').flush() + // this message is sent without a code + + var length = bodyBuffer.length + 4 + + return new Writer() + .addInt32(length) + .add(bodyBuffer) + .flush() +} + +const requestSsl = (): Buffer => { + const response = Buffer.allocUnsafe(8) + response.writeInt32BE(8, 0); + response.writeInt32BE(80877103, 4) + return response +} + +const password = (password: string): Buffer => { + return writer.addCString(password).flush(code.startup) +} + +const sendSASLInitialResponseMessage = function (mechanism: string, initialResponse: string): Buffer { + // 0x70 = 'p' + writer + .addCString(mechanism) + .addInt32(Buffer.byteLength(initialResponse)) + .addString(initialResponse) + + return writer.flush(code.startup) +} + +const sendSCRAMClientFinalMessage = function (additionalData: string): Buffer { + return writer.addString(additionalData).flush(code.startup) +} + +const query = (text: string): Buffer => { + return writer.addCString(text).flush(code.query) +} + +type ParseOpts = { + name?: string; + types?: number[]; + text: string; +} + +const emptyArray: any[] = [] + +const parse = (query: ParseOpts): Buffer => { + // expect something like this: + // { name: 'queryName', + // text: 'select * from blah', + // types: ['int8', 'bool'] } + + // normalize missing query names to allow for null + const name = query.name || '' + if (name.length > 63) { + /* eslint-disable no-console */ + console.error('Warning! Postgres only supports 63 characters for query names.') + console.error('You supplied %s (%s)', name, name.length) + console.error('This can cause conflicts and silent errors executing queries') + /* eslint-enable no-console */ + } + + const types = query.types || emptyArray + + var len = types.length + + var buffer = writer + .addCString(name) // name of query + .addCString(query.text) // actual query text + .addInt16(len) + + for (var i = 0; i < len; i++) { + buffer.addInt32(types[i]) + } + + return writer.flush(code.parse) +} + +type BindOpts = { + portal?: string; + binary?: boolean; + statement?: string; + values?: any[]; +} + +const bind = (config: BindOpts = {}): Buffer => { + // normalize config + const portal = config.portal || '' + const statement = config.statement || '' + const binary = config.binary || false + var values = config.values || emptyArray + var len = values.length + + var useBinary = false + // TODO(bmc): all the loops in here aren't nice, we can do better + for (var j = 0; j < len; j++) { + useBinary = useBinary || values[j] instanceof Buffer + } + + var buffer = writer + .addCString(portal) + .addCString(statement) + if (!useBinary) { + buffer.addInt16(0) + } else { + buffer.addInt16(len) + for (j = 0; j < len; j++) { + buffer.addInt16(values[j] instanceof Buffer ? 1 : 0) + } + } + buffer.addInt16(len) + for (var i = 0; i < len; i++) { + var val = values[i] + if (val === null || typeof val === 'undefined') { + buffer.addInt32(-1) + } else if (val instanceof Buffer) { + buffer.addInt32(val.length) + buffer.add(val) + } else { + buffer.addInt32(Buffer.byteLength(val)) + buffer.addString(val) + } + } + + if (binary) { + buffer.addInt16(1) // format codes to use binary + buffer.addInt16(1) + } else { + buffer.addInt16(0) // format codes to use text + } + return writer.flush(code.bind) +} + +type ExecOpts = { + portal?: string; + rows?: number; +} + +const emptyExecute = Buffer.from([code.execute, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00]) + +const execute = (config?: ExecOpts): Buffer => { + // this is the happy path for most queries + if (!config || !config.portal && !config.rows) { + return emptyExecute; + } + + const portal = config.portal || '' + const rows = config.rows || 0 + + const portalLength = Buffer.byteLength(portal) + const len = 4 + portalLength + 1 + 4 + // one extra bit for code + const buff = Buffer.allocUnsafe(1 + len) + buff[0] = code.execute + buff.writeInt32BE(len, 1) + buff.write(portal, 5, 'utf-8') + buff[portalLength + 5] = 0; // null terminate portal cString + buff.writeUInt32BE(rows, buff.length - 4) + return buff; +} + +const cancel = (processID: number, secretKey: number): Buffer => { + const buffer = Buffer.allocUnsafe(16) + buffer.writeInt32BE(16, 0) + buffer.writeInt16BE(1234, 4) + buffer.writeInt16BE(5678, 6) + buffer.writeInt32BE(processID, 8) + buffer.writeInt32BE(secretKey, 12) + return buffer; +} + +type PortalOpts = { + type: 'S' | 'P', + name?: string; +} + +const cstringMessage = (code: code, string: string): Buffer => { + const stringLen = Buffer.byteLength(string) + const len = 4 + stringLen + 1 + // one extra bit for code + const buffer = Buffer.allocUnsafe(1 + len) + buffer[0] = code + buffer.writeInt32BE(len, 1) + buffer.write(string, 5, 'utf-8') + buffer[len] = 0 // null terminate cString + return buffer +} + +const emptyDescribePortal = writer.addCString('P').flush(code.describe) +const emptyDescribeStatement = writer.addCString('S').flush(code.describe) + +const describe = (msg: PortalOpts): Buffer => { + return msg.name ? + cstringMessage(code.describe,`${msg.type}${msg.name || ''}`) : + msg.type === 'P' ? + emptyDescribePortal : + emptyDescribeStatement; +} + +const close = (msg: PortalOpts): Buffer => { + const text = `${msg.type}${msg.name || ''}` + return cstringMessage(code.close, text) +} + +const copyData = (chunk: Buffer): Buffer => { + return writer.add(chunk).flush(code.copyFromChunk) +} + +const copyFail = (message: string): Buffer => { + return cstringMessage(code.copyFail, message); +} + +const codeOnlyBuffer = (code: code): Buffer => Buffer.from([code, 0x00, 0x00, 0x00, 0x04]) + +const flushBuffer = codeOnlyBuffer(code.flush) +const syncBuffer = codeOnlyBuffer(code.sync) +const endBuffer = codeOnlyBuffer(code.end) +const copyDoneBuffer = codeOnlyBuffer(code.copyDone) + +const serialize = { + startup, + password, + requestSsl, + sendSASLInitialResponseMessage, + sendSCRAMClientFinalMessage, + query, + parse, + bind, + execute, + describe, + close, + flush: () => flushBuffer, + sync: () => syncBuffer, + end: () => endBuffer, + copyData, + copyDone: () => copyDoneBuffer, + copyFail, + cancel +} + +export { serialize } diff --git a/packages/pg-packet-stream/src/testing/buffer-list.ts b/packages/pg-protocol/src/testing/buffer-list.ts similarity index 100% rename from packages/pg-packet-stream/src/testing/buffer-list.ts rename to packages/pg-protocol/src/testing/buffer-list.ts diff --git a/packages/pg-packet-stream/src/testing/test-buffers.ts b/packages/pg-protocol/src/testing/test-buffers.ts similarity index 100% rename from packages/pg-packet-stream/src/testing/test-buffers.ts rename to packages/pg-protocol/src/testing/test-buffers.ts diff --git a/packages/pg-packet-stream/src/types/chunky.d.ts b/packages/pg-protocol/src/types/chunky.d.ts similarity index 100% rename from packages/pg-packet-stream/src/types/chunky.d.ts rename to packages/pg-protocol/src/types/chunky.d.ts diff --git a/packages/pg-packet-stream/tsconfig.json b/packages/pg-protocol/tsconfig.json similarity index 100% rename from packages/pg-packet-stream/tsconfig.json rename to packages/pg-protocol/tsconfig.json diff --git a/packages/pg/lib/connection-fast.js b/packages/pg/lib/connection-fast.js index ecbb362c9..71ef63ba6 100644 --- a/packages/pg/lib/connection-fast.js +++ b/packages/pg/lib/connection-fast.js @@ -11,11 +11,8 @@ var net = require('net') var EventEmitter = require('events').EventEmitter var util = require('util') -var Writer = require('buffer-writer') // eslint-disable-next-line -const { parse } = require('pg-packet-stream') - -var TEXT_MODE = 0 +const { parse, serialize } = require('../../pg-protocol/dist') // TODO(bmc) support binary mode here // var BINARY_MODE = 1 @@ -28,15 +25,9 @@ var Connection = function (config) { this._keepAlive = config.keepAlive this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis this.lastBuffer = false - this.lastOffset = 0 - this.buffer = null - this.offset = null - this.encoding = config.encoding || 'utf8' this.parsedStatements = {} - this.writer = new Writer() this.ssl = config.ssl || false this._ending = false - this._mode = TEXT_MODE this._emitMessage = false var self = this this.on('newListener', function (eventName) { @@ -122,244 +113,103 @@ Connection.prototype.attachListeners = function (stream) { } Connection.prototype.requestSsl = function () { - var bodyBuffer = this.writer - .addInt16(0x04d2) - .addInt16(0x162f) - .flush() - - var length = bodyBuffer.length + 4 - - var buffer = new Writer() - .addInt32(length) - .add(bodyBuffer) - .join() - this.stream.write(buffer) + this.stream.write(serialize.requestSsl()) } Connection.prototype.startup = function (config) { - var writer = this.writer.addInt16(3).addInt16(0) - - Object.keys(config).forEach(function (key) { - var val = config[key] - writer.addCString(key).addCString(val) - }) - - writer.addCString('client_encoding').addCString("'utf-8'") - - var bodyBuffer = writer.addCString('').flush() - // this message is sent without a code - - var length = bodyBuffer.length + 4 - - var buffer = new Writer() - .addInt32(length) - .add(bodyBuffer) - .join() - this.stream.write(buffer) + this.stream.write(serialize.startup(config)) } Connection.prototype.cancel = function (processID, secretKey) { - var bodyBuffer = this.writer - .addInt16(1234) - .addInt16(5678) - .addInt32(processID) - .addInt32(secretKey) - .flush() - - var length = bodyBuffer.length + 4 - - var buffer = new Writer() - .addInt32(length) - .add(bodyBuffer) - .join() - this.stream.write(buffer) + this._send(serialize.cancel(processID, secretKey)) } Connection.prototype.password = function (password) { - // 0x70 = 'p' - this._send(0x70, this.writer.addCString(password)) + this._send(serialize.password(password)) } Connection.prototype.sendSASLInitialResponseMessage = function (mechanism, initialResponse) { - // 0x70 = 'p' - this.writer - .addCString(mechanism) - .addInt32(Buffer.byteLength(initialResponse)) - .addString(initialResponse) - - this._send(0x70) + this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) } Connection.prototype.sendSCRAMClientFinalMessage = function (additionalData) { - // 0x70 = 'p' - this.writer.addString(additionalData) - - this._send(0x70) + this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) } -Connection.prototype._send = function (code, more) { +Connection.prototype._send = function (buffer) { if (!this.stream.writable) { return false } - return this.stream.write(this.writer.flush(code)) + return this.stream.write(buffer) } Connection.prototype.query = function (text) { - // 0x51 = Q - this.stream.write(this.writer.addCString(text).flush(0x51)) + this._send(serialize.query(text)) } // send parse message Connection.prototype.parse = function (query) { - // expect something like this: - // { name: 'queryName', - // text: 'select * from blah', - // types: ['int8', 'bool'] } - - // normalize missing query names to allow for null - query.name = query.name || '' - if (query.name.length > 63) { - /* eslint-disable no-console */ - console.error('Warning! Postgres only supports 63 characters for query names.') - console.error('You supplied %s (%s)', query.name, query.name.length) - console.error('This can cause conflicts and silent errors executing queries') - /* eslint-enable no-console */ - } - // normalize null type array - query.types = query.types || [] - var len = query.types.length - var buffer = this.writer - .addCString(query.name) // name of query - .addCString(query.text) // actual query text - .addInt16(len) - for (var i = 0; i < len; i++) { - buffer.addInt32(query.types[i]) - } - - var code = 0x50 - this._send(code) - this.flush() + this._send(serialize.parse(query)) } // send bind message // "more" === true to buffer the message until flush() is called Connection.prototype.bind = function (config) { - // normalize config - config = config || {} - config.portal = config.portal || '' - config.statement = config.statement || '' - config.binary = config.binary || false - var values = config.values || [] - var len = values.length - var useBinary = false - for (var j = 0; j < len; j++) { - useBinary |= values[j] instanceof Buffer - } - var buffer = this.writer.addCString(config.portal).addCString(config.statement) - if (!useBinary) { - buffer.addInt16(0) - } else { - buffer.addInt16(len) - for (j = 0; j < len; j++) { - buffer.addInt16(values[j] instanceof Buffer) - } - } - buffer.addInt16(len) - for (var i = 0; i < len; i++) { - var val = values[i] - if (val === null || typeof val === 'undefined') { - buffer.addInt32(-1) - } else if (val instanceof Buffer) { - buffer.addInt32(val.length) - buffer.add(val) - } else { - buffer.addInt32(Buffer.byteLength(val)) - buffer.addString(val) - } - } - - if (config.binary) { - buffer.addInt16(1) // format codes to use binary - buffer.addInt16(1) - } else { - buffer.addInt16(0) // format codes to use text - } - // 0x42 = 'B' - this._send(0x42) - this.flush() + this._send(serialize.bind(config)) } // send execute message // "more" === true to buffer the message until flush() is called Connection.prototype.execute = function (config) { - config = config || {} - config.portal = config.portal || '' - config.rows = config.rows || '' - this.writer.addCString(config.portal).addInt32(config.rows) - - // 0x45 = 'E' - this._send(0x45) - this.flush() + this._send(serialize.execute(config)) } -var emptyBuffer = Buffer.alloc(0) - -const flushBuffer = Buffer.from([0x48, 0x00, 0x00, 0x00, 0x04]) +const flushBuffer = serialize.flush() Connection.prototype.flush = function () { if (this.stream.writable) { this.stream.write(flushBuffer) } } -const syncBuffer = Buffer.from([0x53, 0x00, 0x00, 0x00, 0x04]) +const syncBuffer = serialize.sync() Connection.prototype.sync = function () { this._ending = true - // clear out any pending data in the writer - this.writer.clear() - if (this.stream.writable) { - this.stream.write(syncBuffer) - this.stream.write(flushBuffer) - } + this._send(syncBuffer) + this._send(flushBuffer) } -const END_BUFFER = Buffer.from([0x58, 0x00, 0x00, 0x00, 0x04]) +const endBuffer = serialize.end() Connection.prototype.end = function () { // 0x58 = 'X' - this.writer.clear() this._ending = true if (!this.stream.writable) { this.stream.end() return } - return this.stream.write(END_BUFFER, () => { + return this.stream.write(endBuffer, () => { this.stream.end() }) } Connection.prototype.close = function (msg) { - this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x43) + this._send(serialize.close(msg)) } Connection.prototype.describe = function (msg) { - this.writer.addCString(msg.type + (msg.name || '')) - this._send(0x44) - this.flush() + this._send(serialize.describe(msg)) } Connection.prototype.sendCopyFromChunk = function (chunk) { - this.stream.write(this.writer.add(chunk).flush(0x64)) + this._send(serialize.copyData(chunk)) } Connection.prototype.endCopyFrom = function () { - this.stream.write(this.writer.add(emptyBuffer).flush(0x63)) + this._send(serialize.copyDone()) } Connection.prototype.sendCopyFail = function (msg) { - // this.stream.write(this.writer.add(emptyBuffer).flush(0x66)); - this.writer.addCString(msg) - this._send(0x66) + this._send(serialize.copyFail(msg)) } module.exports = Connection diff --git a/packages/pg/package.json b/packages/pg/package.json index edd24337b..b0bd735f5 100644 --- a/packages/pg/package.json +++ b/packages/pg/package.json @@ -22,7 +22,7 @@ "buffer-writer": "2.0.0", "packet-reader": "1.0.0", "pg-connection-string": "0.1.3", - "pg-packet-stream": "^1.1.0", + "pg-protocol": "^1.1.0", "pg-pool": "^3.0.0", "pg-types": "^2.1.0", "pgpass": "1.x",