Skip to content

Commit

Permalink
[fix] Take into account the data queued in the sender (#971)
Browse files Browse the repository at this point in the history
This makes the `bufferedAmount` getter take into account the data
queued in the sender.
  • Loading branch information
lpinca committed Jan 25, 2017
1 parent bc35fa4 commit d74a32e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 33 deletions.
75 changes: 49 additions & 26 deletions lib/Sender.js
Expand Up @@ -24,12 +24,16 @@ class Sender {
*/
constructor (socket, extensions) {
this.perMessageDeflate = (extensions || {})[PerMessageDeflate.extensionName];
this._socket = socket;

this.firstFragment = true;
this.processing = false;
this.compress = false;
this._socket = socket;
this.onerror = null;

this.processing = false;
this.bufferedBytes = 0;
this.queue = [];

this.onerror = null;
}

/**
Expand Down Expand Up @@ -86,10 +90,23 @@ class Sender {
* @public
*/
ping (data, mask) {
var readOnly = true;

if (data && !Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
readOnly = false;
}
}

if (this.perMessageDeflate) {
this.enqueue([this.doPing, data, mask]);
this.enqueue([this.doPing, data, mask, readOnly]);
} else {
this.doPing(data, mask);
this.doPing(data, mask, readOnly);
}
}

Expand All @@ -98,14 +115,15 @@ class Sender {
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @private
*/
doPing (data, mask) {
doPing (data, mask, readOnly) {
this.frameAndSend(data, {
readOnly: true,
opcode: 0x09,
rsv1: false,
fin: true,
readOnly,
mask
});

Expand All @@ -120,10 +138,23 @@ class Sender {
* @public
*/
pong (data, mask) {
var readOnly = true;

if (data && !Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
readOnly = false;
}
}

if (this.perMessageDeflate) {
this.enqueue([this.doPong, data, mask]);
this.enqueue([this.doPong, data, mask, readOnly]);
} else {
this.doPong(data, mask);
this.doPong(data, mask, readOnly);
}
}

Expand All @@ -132,14 +163,15 @@ class Sender {
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @private
*/
doPong (data, mask) {
doPong (data, mask, readOnly) {
this.frameAndSend(data, {
readOnly: true,
opcode: 0x0a,
rsv1: false,
fin: true,
readOnly,
mask
});

Expand Down Expand Up @@ -243,7 +275,7 @@ class Sender {
/**
* Frames and sends a piece of data according to the HyBi WebSocket protocol.
*
* @param {*} data The data to send
* @param {Buffer} data The data to send
* @param {Object} options Options object
* @param {Number} options.opcode The opcode
* @param {Boolean} options.readOnly Specifies whether `data` can be modified
Expand All @@ -267,17 +299,6 @@ class Sender {
return;
}

if (!Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
options.readOnly = false;
}
}

const mergeBuffers = data.length < 1024 || options.mask && options.readOnly;
var dataOffset = options.mask ? 6 : 2;
var payloadLength = data.length;
Expand Down Expand Up @@ -334,12 +355,13 @@ class Sender {
dequeue () {
if (this.processing) return;

const handler = this.queue.shift();
if (!handler) return;
const params = this.queue.shift();
if (!params) return;

if (params[1]) this.bufferedBytes -= params[1].length;
this.processing = true;

handler[0].apply(this, handler.slice(1));
params[0].apply(this, params.slice(1));
}

/**
Expand All @@ -361,6 +383,7 @@ class Sender {
* @private
*/
enqueue (params) {
if (params[1]) this.bufferedBytes += params[1].length;
this.queue.push(params);
this.dequeue();
}
Expand Down
4 changes: 3 additions & 1 deletion lib/WebSocket.js
Expand Up @@ -82,7 +82,9 @@ class WebSocket extends EventEmitter {
get bufferedAmount () {
var amount = 0;

if (this._socket) amount = this._socket.bufferSize || 0;
if (this._socket) {
amount = this._socket.bufferSize + this._sender.bufferedBytes;
}
return amount;
}

Expand Down
18 changes: 18 additions & 0 deletions test/Sender.test.js
Expand Up @@ -73,6 +73,24 @@ describe('Sender', function () {
});
});

describe('#pong', function () {
it('works with multiple types of data', function (done) {
let count = 0;
const sender = new Sender({
write: (data) => {
assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (++count === 3) done();
}
});

const array = new Uint8Array([0x68, 0x69]);

sender.pong(array.buffer, false);
sender.pong(array, false);
sender.pong('hi', false);
});
});

describe('#send', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
Expand Down
30 changes: 24 additions & 6 deletions test/WebSocket.test.js
Expand Up @@ -131,19 +131,34 @@ describe('WebSocket', function () {
});

it('defaults to zero upon "open"', function (done) {
server.createServer(++port, (srv) => {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.onopen = () => {
assert.strictEqual(ws.bufferedAmount, 0);

ws.on('close', () => srv.close(done));
ws.close();
wss.close(done);
};
});
});

it('stress kernel write buffer', function (done) {
it('takes into account the data in the sender queue', function (done) {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.on('open', () => {
ws.send('foo');
ws.send('bar', (err) => {
assert.ifError(err);
assert.strictEqual(ws.bufferedAmount, 0);
wss.close(done);
});

assert.strictEqual(ws.bufferedAmount, 3);
});
});
});

it('takes into account the data in the socket queue', function (done) {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`, {
perMessageDeflate: false
Expand All @@ -152,7 +167,10 @@ describe('WebSocket', function () {

wss.on('connection', (ws) => {
while (true) {
if (ws.bufferedAmount > 0) break;
if (ws._socket.bufferSize > 0) {
assert.strictEqual(ws.bufferedAmount, ws._socket.bufferSize);
break;
}
ws.send('hello'.repeat(1e4));
}
wss.close(done);
Expand Down

0 comments on commit d74a32e

Please sign in to comment.