Skip to content

Commit

Permalink
[major] Remove stream method and ability to send a stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lpinca committed Nov 8, 2016
1 parent 5a025dc commit 442ca0a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 715 deletions.
120 changes: 1 addition & 119 deletions lib/WebSocket.js
Expand Up @@ -11,7 +11,6 @@ const util = require('util');
const http = require('http');
const https = require('https');
const crypto = require('crypto');
const stream = require('stream');
const Ultron = require('ultron');
const Sender = require('./Sender');
const Receiver = require('./Receiver');
Expand Down Expand Up @@ -222,11 +221,6 @@ WebSocket.prototype.send = function send (data, options, cb) {

if (!data) data = '';

if (this._queue) {
this._queue.push(() => this.send(data, options, cb));
return;
}

options = options || {};
if (options.fin !== false) options.fin = true;

Expand All @@ -241,72 +235,7 @@ WebSocket.prototype.send = function send (data, options, cb) {
options.compress = false;
}

if (data instanceof stream.Readable) {
startQueue(this);

sendStream(this, data, options, (error) => {
process.nextTick(() => executeQueueSends(this));
if (cb) cb(error);
});
} else {
this._sender.send(data, options, cb);
}
};

/**
* Streams data through calls to a user supplied function
*
* @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
* @param {function} 'function (error, send)' which is executed on successive
* ticks of which send is 'function (data, final)'.
* @api public
*/
WebSocket.prototype.stream = function stream (options, cb) {
if (typeof options === 'function') {
cb = options;
options = {};
}

if (!cb) throw new Error('callback must be provided');

if (this.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else throw new Error('not opened');
return;
}

if (this._queue) {
this._queue.push(() => this.stream(options, cb));
return;
}

options = options || {};

if (options.mask === undefined) options.mask = !this._isServer;
if (options.compress === undefined) options.compress = true;
if (!this.extensions[PerMessageDeflate.extensionName]) {
options.compress = false;
}

startQueue(this);

const send = (data, final) => {
try {
if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
options.fin = final === true;
this._sender.send(data, options);
if (!final) process.nextTick(cb, null, send);
else executeQueueSends(this);
} catch (e) {
if (typeof cb === 'function') cb(e);
else {
delete this._queue;
this.emit('error', e);
}
}
};

process.nextTick(cb, null, send);
this._sender.send(data, options, cb);
};

/**
Expand Down Expand Up @@ -845,52 +774,6 @@ function establishConnection (socket, upgradeHead) {
this.emit('open');
}

function startQueue (instance) {
instance._queue = instance._queue || [];
}

function executeQueueSends (instance) {
var queue = instance._queue;
if (queue === undefined) return;

delete instance._queue;
for (var i = 0, l = queue.length; i < l; ++i) {
queue[i]();
}
}

function sendStream (instance, stream, options, cb) {
stream.on('data', function incoming (data) {
if (instance.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else {
delete instance._queue;
instance.emit('error', new Error('not opened'));
}
return;
}

options.fin = false;
instance._sender.send(data, options);
});

stream.on('end', function end () {
if (instance.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else {
delete instance._queue;
instance.emit('error', new Error('not opened'));
}
return;
}

options.fin = true;
instance._sender.send(null, options);

if (cb) cb(null);
});
}

function cleanupWebsocketResources (error) {
if (this.readyState === WebSocket.CLOSED) return;

Expand Down Expand Up @@ -941,5 +824,4 @@ function cleanupWebsocketResources (error) {

this.removeAllListeners();
this.on('error', function onerror () {}); // catch all errors after this
delete this._queue;
}

0 comments on commit 442ca0a

Please sign in to comment.