From d74a32e1761a4016764b44f617d57180e9e3af5b Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Wed, 25 Jan 2017 09:36:26 +0100 Subject: [PATCH] [fix] Take into account the data queued in the sender (#971) This makes the `bufferedAmount` getter take into account the data queued in the sender. --- lib/Sender.js | 75 +++++++++++++++++++++++++++--------------- lib/WebSocket.js | 4 ++- test/Sender.test.js | 18 ++++++++++ test/WebSocket.test.js | 30 +++++++++++++---- 4 files changed, 94 insertions(+), 33 deletions(-) diff --git a/lib/Sender.js b/lib/Sender.js index 74f6e85d1..3913a8139 100644 --- a/lib/Sender.js +++ b/lib/Sender.js @@ -24,12 +24,16 @@ class Sender { */ constructor (socket, extensions) { this.perMessageDeflate = (extensions || {})[PerMessageDeflate.extensionName]; + this._socket = socket; + this.firstFragment = true; - this.processing = false; this.compress = false; - this._socket = socket; - this.onerror = null; + + this.processing = false; + this.bufferedBytes = 0; this.queue = []; + + this.onerror = null; } /** @@ -86,10 +90,23 @@ class Sender { * @public */ ping (data, mask) { + var readOnly = true; + + if (data && !Buffer.isBuffer(data)) { + if (data instanceof ArrayBuffer) { + data = Buffer.from(data); + } else if (ArrayBuffer.isView(data)) { + data = viewToBuffer(data); + } else { + data = Buffer.from(data); + readOnly = false; + } + } + if (this.perMessageDeflate) { - this.enqueue([this.doPing, data, mask]); + this.enqueue([this.doPing, data, mask, readOnly]); } else { - this.doPing(data, mask); + this.doPing(data, mask, readOnly); } } @@ -98,14 +115,15 @@ class Sender { * * @param {*} data The message to send * @param {Boolean} mask Specifies whether or not to mask `data` + * @param {Boolean} readOnly Specifies whether `data` can be modified * @private */ - doPing (data, mask) { + doPing (data, mask, readOnly) { this.frameAndSend(data, { - readOnly: true, opcode: 0x09, rsv1: false, fin: true, + readOnly, mask }); @@ -120,10 +138,23 @@ class Sender { * @public */ pong (data, mask) { + var readOnly = true; + + if (data && !Buffer.isBuffer(data)) { + if (data instanceof ArrayBuffer) { + data = Buffer.from(data); + } else if (ArrayBuffer.isView(data)) { + data = viewToBuffer(data); + } else { + data = Buffer.from(data); + readOnly = false; + } + } + if (this.perMessageDeflate) { - this.enqueue([this.doPong, data, mask]); + this.enqueue([this.doPong, data, mask, readOnly]); } else { - this.doPong(data, mask); + this.doPong(data, mask, readOnly); } } @@ -132,14 +163,15 @@ class Sender { * * @param {*} data The message to send * @param {Boolean} mask Specifies whether or not to mask `data` + * @param {Boolean} readOnly Specifies whether `data` can be modified * @private */ - doPong (data, mask) { + doPong (data, mask, readOnly) { this.frameAndSend(data, { - readOnly: true, opcode: 0x0a, rsv1: false, fin: true, + readOnly, mask }); @@ -243,7 +275,7 @@ class Sender { /** * Frames and sends a piece of data according to the HyBi WebSocket protocol. * - * @param {*} data The data to send + * @param {Buffer} data The data to send * @param {Object} options Options object * @param {Number} options.opcode The opcode * @param {Boolean} options.readOnly Specifies whether `data` can be modified @@ -267,17 +299,6 @@ class Sender { return; } - if (!Buffer.isBuffer(data)) { - if (data instanceof ArrayBuffer) { - data = Buffer.from(data); - } else if (ArrayBuffer.isView(data)) { - data = viewToBuffer(data); - } else { - data = Buffer.from(data); - options.readOnly = false; - } - } - const mergeBuffers = data.length < 1024 || options.mask && options.readOnly; var dataOffset = options.mask ? 6 : 2; var payloadLength = data.length; @@ -334,12 +355,13 @@ class Sender { dequeue () { if (this.processing) return; - const handler = this.queue.shift(); - if (!handler) return; + const params = this.queue.shift(); + if (!params) return; + if (params[1]) this.bufferedBytes -= params[1].length; this.processing = true; - handler[0].apply(this, handler.slice(1)); + params[0].apply(this, params.slice(1)); } /** @@ -361,6 +383,7 @@ class Sender { * @private */ enqueue (params) { + if (params[1]) this.bufferedBytes += params[1].length; this.queue.push(params); this.dequeue(); } diff --git a/lib/WebSocket.js b/lib/WebSocket.js index ea3513559..ec7490bd0 100644 --- a/lib/WebSocket.js +++ b/lib/WebSocket.js @@ -82,7 +82,9 @@ class WebSocket extends EventEmitter { get bufferedAmount () { var amount = 0; - if (this._socket) amount = this._socket.bufferSize || 0; + if (this._socket) { + amount = this._socket.bufferSize + this._sender.bufferedBytes; + } return amount; } diff --git a/test/Sender.test.js b/test/Sender.test.js index 48c126963..d6454b47a 100644 --- a/test/Sender.test.js +++ b/test/Sender.test.js @@ -73,6 +73,24 @@ describe('Sender', function () { }); }); + describe('#pong', function () { + it('works with multiple types of data', function (done) { + let count = 0; + const sender = new Sender({ + write: (data) => { + assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69]))); + if (++count === 3) done(); + } + }); + + const array = new Uint8Array([0x68, 0x69]); + + sender.pong(array.buffer, false); + sender.pong(array, false); + sender.pong('hi', false); + }); + }); + describe('#send', function () { it('compresses data if compress option is enabled', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); diff --git a/test/WebSocket.test.js b/test/WebSocket.test.js index 7ac77dd96..13b9d4bb4 100644 --- a/test/WebSocket.test.js +++ b/test/WebSocket.test.js @@ -131,19 +131,34 @@ describe('WebSocket', function () { }); it('defaults to zero upon "open"', function (done) { - server.createServer(++port, (srv) => { + const wss = new WebSocketServer({ port: ++port }, () => { const ws = new WebSocket(`ws://localhost:${port}`); ws.onopen = () => { assert.strictEqual(ws.bufferedAmount, 0); - - ws.on('close', () => srv.close(done)); - ws.close(); + wss.close(done); }; }); }); - it('stress kernel write buffer', function (done) { + it('takes into account the data in the sender queue', function (done) { + const wss = new WebSocketServer({ port: ++port }, () => { + const ws = new WebSocket(`ws://localhost:${port}`); + + ws.on('open', () => { + ws.send('foo'); + ws.send('bar', (err) => { + assert.ifError(err); + assert.strictEqual(ws.bufferedAmount, 0); + wss.close(done); + }); + + assert.strictEqual(ws.bufferedAmount, 3); + }); + }); + }); + + it('takes into account the data in the socket queue', function (done) { const wss = new WebSocketServer({ port: ++port }, () => { const ws = new WebSocket(`ws://localhost:${port}`, { perMessageDeflate: false @@ -152,7 +167,10 @@ describe('WebSocket', function () { wss.on('connection', (ws) => { while (true) { - if (ws.bufferedAmount > 0) break; + if (ws._socket.bufferSize > 0) { + assert.strictEqual(ws.bufferedAmount, ws._socket.bufferSize); + break; + } ws.send('hello'.repeat(1e4)); } wss.close(done);