Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove stream method and ability to send a stream #875

Merged
merged 1 commit into from Dec 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 1 addition & 119 deletions lib/WebSocket.js
Expand Up @@ -11,7 +11,6 @@ const util = require('util');
const http = require('http');
const https = require('https');
const crypto = require('crypto');
const stream = require('stream');
const Ultron = require('ultron');
const Sender = require('./Sender');
const Receiver = require('./Receiver');
Expand Down Expand Up @@ -222,11 +221,6 @@ WebSocket.prototype.send = function send (data, options, cb) {

if (!data) data = '';

if (this._queue) {
this._queue.push(() => this.send(data, options, cb));
return;
}

options = options || {};
if (options.fin !== false) options.fin = true;

Expand All @@ -241,72 +235,7 @@ WebSocket.prototype.send = function send (data, options, cb) {
options.compress = false;
}

if (data instanceof stream.Readable) {
startQueue(this);

sendStream(this, data, options, (error) => {
process.nextTick(() => executeQueueSends(this));
if (cb) cb(error);
});
} else {
this._sender.send(data, options, cb);
}
};

/**
* Streams data through calls to a user supplied function
*
* @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
* @param {function} 'function (error, send)' which is executed on successive
* ticks of which send is 'function (data, final)'.
* @api public
*/
WebSocket.prototype.stream = function stream (options, cb) {
if (typeof options === 'function') {
cb = options;
options = {};
}

if (!cb) throw new Error('callback must be provided');

if (this.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else throw new Error('not opened');
return;
}

if (this._queue) {
this._queue.push(() => this.stream(options, cb));
return;
}

options = options || {};

if (options.mask === undefined) options.mask = !this._isServer;
if (options.compress === undefined) options.compress = true;
if (!this.extensions[PerMessageDeflate.extensionName]) {
options.compress = false;
}

startQueue(this);

const send = (data, final) => {
try {
if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
options.fin = final === true;
this._sender.send(data, options);
if (!final) process.nextTick(cb, null, send);
else executeQueueSends(this);
} catch (e) {
if (typeof cb === 'function') cb(e);
else {
delete this._queue;
this.emit('error', e);
}
}
};

process.nextTick(cb, null, send);
this._sender.send(data, options, cb);
};

/**
Expand Down Expand Up @@ -845,52 +774,6 @@ function establishConnection (socket, upgradeHead) {
this.emit('open');
}

function startQueue (instance) {
instance._queue = instance._queue || [];
}

function executeQueueSends (instance) {
var queue = instance._queue;
if (queue === undefined) return;

delete instance._queue;
for (var i = 0, l = queue.length; i < l; ++i) {
queue[i]();
}
}

function sendStream (instance, stream, options, cb) {
stream.on('data', function incoming (data) {
if (instance.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else {
delete instance._queue;
instance.emit('error', new Error('not opened'));
}
return;
}

options.fin = false;
instance._sender.send(data, options);
});

stream.on('end', function end () {
if (instance.readyState !== WebSocket.OPEN) {
if (cb) cb(new Error('not opened'));
else {
delete instance._queue;
instance.emit('error', new Error('not opened'));
}
return;
}

options.fin = true;
instance._sender.send(null, options);

if (cb) cb(null);
});
}

function cleanupWebsocketResources (error) {
if (this.readyState === WebSocket.CLOSED) return;

Expand Down Expand Up @@ -941,5 +824,4 @@ function cleanupWebsocketResources (error) {

this.removeAllListeners();
this.on('error', function onerror () {}); // catch all errors after this
delete this._queue;
}