Skip to content

Commit

Permalink
[minor] Do not use Readable.prototype.pipe()
Browse files Browse the repository at this point in the history
  • Loading branch information
lpinca committed Mar 1, 2018
1 parent 70fff53 commit aa2c423
Showing 1 changed file with 62 additions and 38 deletions.
100 changes: 62 additions & 38 deletions lib/websocket.js
Expand Up @@ -132,13 +132,9 @@ class WebSocket extends EventEmitter {
receiver[kWebSocket] = this;
socket[kWebSocket] = this;

socket.on('close', socketOnClose);
socket.on('end', socketOnEnd);
socket.on('error', socketOnError);
socket.on('error', NOOP);

receiver.on('message', receiverOnMessage);
receiver.on('close', receiverOnClose);
receiver.on('drain', receiverOnDrain);
receiver.on('error', receiverOnError);
receiver.on('ping', receiverOnPing);
receiver.on('pong', receiverOnPong);
Expand All @@ -147,7 +143,12 @@ class WebSocket extends EventEmitter {
socket.setNoDelay();

if (head.length > 0) socket.unshift(head);
socket.pipe(receiver);

socket.on('close', socketOnClose);
socket.on('data', socketOnData);
socket.on('end', socketOnEnd);
socket.on('error', socketOnError);
socket.on('error', NOOP);

this.readyState = WebSocket.OPEN;
this.emit('open');
Expand Down Expand Up @@ -696,7 +697,7 @@ function abortHandshake (websocket, stream, message) {
function receiverOnClose (code, reason) {
const websocket = this[kWebSocket];

websocket._socket.unpipe(websocket._receiver);
websocket._socket.removeListener('data', socketOnData);
websocket._socket.resume();

websocket._closeFrameReceived = true;
Expand All @@ -707,6 +708,15 @@ function receiverOnClose (code, reason) {
else websocket.close(code, reason);
}

/**
* The listener of the `Receiver` `'drain'` event.
*
* @private
*/
function receiverOnDrain () {
this[kWebSocket]._socket.resume();
}

/**
* The listener of the `Receiver` `'error'` event.
*
Expand All @@ -721,7 +731,6 @@ function receiverOnError (err) {
websocket.readyState = WebSocket.CLOSING;
websocket._socket.removeListener('error', socketOnError);
websocket._closeCode = err[constants.kStatusCode];
websocket._closeMessage = '';
websocket.emit('error', err);
websocket._socket.destroy();
}
Expand Down Expand Up @@ -759,36 +768,6 @@ function receiverOnPong (data) {
this[kWebSocket].emit('pong', data);
}

/**
* The listener of the `net.Socket` `'error'` event.
*
* @param {Error} err The emitted error
* @private
*/
function socketOnError (err) {
const websocket = this[kWebSocket];

websocket.readyState = WebSocket.CLOSING;
this.removeListener('error', socketOnError);

//
// There might be valid buffered data in the socket waiting to be read so we
// can't re-emit this error immediately.
//
websocket._error = err;
}

/**
* The listener of the `net.Socket` `'end'` event.
*
* @private
*/
function socketOnEnd () {
this[kWebSocket].readyState = WebSocket.CLOSING;
this.removeListener('error', socketOnError);
this.end();
}

/**
* The listener of the `net.Socket` `'close'` event.
*
Expand Down Expand Up @@ -829,3 +808,48 @@ function socketOnClose () {
websocket._receiver.on('finish', emitClose);
}
}

/**
* The listener of the `net.Socket` `'data'` event.
*
* @param {Buffer} chunk A chunk of data
* @private
*/
function socketOnData (chunk) {
if (!this[kWebSocket]._receiver.write(chunk)) {
this.pause();
}
}

/**
* The listener of the `net.Socket` `'end'` event.
*
* @private
*/
function socketOnEnd () {
const websocket = this[kWebSocket];

websocket.readyState = WebSocket.CLOSING;
this.removeListener('error', socketOnError);
websocket._receiver.end();
this.end();
}

/**
* The listener of the `net.Socket` `'error'` event.
*
* @param {Error} err The emitted error
* @private
*/
function socketOnError (err) {
const websocket = this[kWebSocket];

websocket.readyState = WebSocket.CLOSING;
this.removeListener('error', socketOnError);

//
// There might be valid buffered data in the socket waiting to be read so we
// can't re-emit this error immediately.
//
websocket._error = err;
}

0 comments on commit aa2c423

Please sign in to comment.