diff --git a/lib/WebSocket.js b/lib/WebSocket.js index 63363cf41..5899c3a1a 100644 --- a/lib/WebSocket.js +++ b/lib/WebSocket.js @@ -8,7 +8,6 @@ const EventEmitter = require('events'); const crypto = require('crypto'); -const stream = require('stream'); const Ultron = require('ultron'); const https = require('https'); const http = require('http'); @@ -211,11 +210,6 @@ WebSocket.prototype.send = function send (data, options, cb) { if (!data) data = ''; - if (this._queue) { - this._queue.push(() => this.send(data, options, cb)); - return; - } - options = options || {}; if (options.fin !== false) options.fin = true; @@ -230,72 +224,7 @@ WebSocket.prototype.send = function send (data, options, cb) { options.compress = false; } - if (data instanceof stream.Readable) { - startQueue(this); - - sendStream(this, data, options, (error) => { - process.nextTick(() => executeQueueSends(this)); - if (cb) cb(error); - }); - } else { - this._sender.send(data, options, cb); - } -}; - -/** - * Streams data through calls to a user supplied function - * - * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean - * @param {function} 'function (error, send)' which is executed on successive - * ticks of which send is 'function (data, final)'. - * @api public - */ -WebSocket.prototype.stream = function stream (options, cb) { - if (typeof options === 'function') { - cb = options; - options = {}; - } - - if (!cb) throw new Error('callback must be provided'); - - if (this.readyState !== WebSocket.OPEN) { - if (cb) cb(new Error('not opened')); - else throw new Error('not opened'); - return; - } - - if (this._queue) { - this._queue.push(() => this.stream(options, cb)); - return; - } - - options = options || {}; - - if (options.mask === undefined) options.mask = !this._isServer; - if (options.compress === undefined) options.compress = true; - if (!this.extensions[PerMessageDeflate.extensionName]) { - options.compress = false; - } - - startQueue(this); - - const send = (data, final) => { - try { - if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); - options.fin = final === true; - this._sender.send(data, options); - if (!final) process.nextTick(cb, null, send); - else executeQueueSends(this); - } catch (e) { - if (typeof cb === 'function') cb(e); - else { - delete this._queue; - this.emit('error', e); - } - } - }; - - process.nextTick(cb, null, send); + this._sender.send(data, options, cb); }; /** @@ -748,52 +677,6 @@ function establishConnection (socket, upgradeHead) { this.emit('open'); } -function startQueue (instance) { - instance._queue = instance._queue || []; -} - -function executeQueueSends (instance) { - var queue = instance._queue; - if (queue === undefined) return; - - delete instance._queue; - for (var i = 0, l = queue.length; i < l; ++i) { - queue[i](); - } -} - -function sendStream (instance, stream, options, cb) { - stream.on('data', function incoming (data) { - if (instance.readyState !== WebSocket.OPEN) { - if (cb) cb(new Error('not opened')); - else { - delete instance._queue; - instance.emit('error', new Error('not opened')); - } - return; - } - - options.fin = false; - instance._sender.send(data, options); - }); - - stream.on('end', function end () { - if (instance.readyState !== WebSocket.OPEN) { - if (cb) cb(new Error('not opened')); - else { - delete instance._queue; - instance.emit('error', new Error('not opened')); - } - return; - } - - options.fin = true; - instance._sender.send(null, options); - - if (cb) cb(null); - }); -} - function cleanupWebsocketResources (error) { if (this.readyState === WebSocket.CLOSED) return; @@ -844,5 +727,4 @@ function cleanupWebsocketResources (error) { this.removeAllListeners(); this.on('error', function onerror () {}); // catch all errors after this - delete this._queue; } diff --git a/test/WebSocket.test.js b/test/WebSocket.test.js index 3337d4c67..e133ac145 100644 --- a/test/WebSocket.test.js +++ b/test/WebSocket.test.js @@ -792,11 +792,11 @@ describe('WebSocket', function () { }); }); - it('with unencoded message is successfully transmitted to the server', function (done) { + it('with unmasked message is successfully transmitted to the server', function (done) { server.createServer(++port, (srv) => { const ws = new WebSocket(`ws://localhost:${port}`); - ws.on('open', () => ws.send('hi')); + ws.on('open', () => ws.send('hi', { mask: false })); srv.on('message', (message, flags) => { assert.strictEqual(message, 'hi'); @@ -806,7 +806,7 @@ describe('WebSocket', function () { }); }); - it('with encoded message is successfully transmitted to the server', function (done) { + it('with masked message is successfully transmitted to the server', function (done) { server.createServer(++port, (srv) => { const ws = new WebSocket(`ws://localhost:${port}`); @@ -821,7 +821,7 @@ describe('WebSocket', function () { }); }); - it('with unencoded binary message is successfully transmitted to the server', function (done) { + it('with unmasked binary message is successfully transmitted to the server', function (done) { server.createServer(++port, (srv) => { const array = new Float32Array(5); @@ -831,7 +831,7 @@ describe('WebSocket', function () { const ws = new WebSocket(`ws://localhost:${port}`); - ws.on('open', () => ws.send(array, { binary: true })); + ws.on('open', () => ws.send(array, { mask: false, binary: true })); srv.on('message', (message, flags) => { assert.ok(flags.binary); @@ -842,7 +842,7 @@ describe('WebSocket', function () { }); }); - it('with encoded binary message is successfully transmitted to the server', function (done) { + it('with masked binary message is successfully transmitted to the server', function (done) { server.createServer(++port, (srv) => { const array = new Float32Array(5); @@ -863,561 +863,9 @@ describe('WebSocket', function () { }); }); }); - - it('with binary stream will send fragmented data', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - let callbackFired = false; - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100 - }); - - ws.send(fileStream, { binary: true }, (error) => { - assert.ifError(error); - callbackFired = true; - }); - }); - - ws.on('close', () => { - assert.ok(callbackFired); - srv.close(done); - }); - - srv.on('message', (data, flags) => { - assert.ok(flags.binary); - assert.ok(data.equals(fs.readFileSync('test/fixtures/textfile'))); - - ws.close(); - }); - }); - }); - - it('with text stream will send fragmented data', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - let callbackFired = false; - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - - ws.send(fileStream, { binary: false }, (error) => { - assert.ifError(error); - callbackFired = true; - }); - }); - - ws.on('close', () => { - assert.ok(callbackFired); - srv.close(done); - }); - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual( - data, - fs.readFileSync('test/fixtures/textfile', { encoding: 'utf8' }) - ); - - ws.close(); - }); - }); - }); - - it('will cause intermittent send to be delayed in order', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - - ws.send(fileStream); - ws.send('foobar'); - ws.send('baz'); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - if (++receivedIndex === 1) { - assert.ok(!flags.binary); - assert.strictEqual( - data, - fs.readFileSync('test/fixtures/textfile', { encoding: 'utf8' }) - ); - } else if (receivedIndex === 2) { - assert.ok(!flags.binary); - assert.strictEqual(data, 'foobar'); - } else { - assert.ok(!flags.binary); - assert.strictEqual(data, 'baz'); - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent stream to be delayed in order', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - - ws.send(fileStream); - - let i = 0; - ws.stream((error, send) => { - assert.ifError(error); - - if (++i === 1) send('foo'); - else send('bar', true); - }); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - if (++receivedIndex === 1) { - assert.ok(!flags.binary); - assert.strictEqual( - data, - fs.readFileSync('test/fixtures/textfile', { encoding: 'utf8' }) - ); - } else if (receivedIndex === 2) { - assert.ok(!flags.binary); - assert.strictEqual(data, 'foobar'); - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent ping to be delivered', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - - ws.send(fileStream); - ws.ping('foobar'); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual( - data, - fs.readFileSync('test/fixtures/textfile', { encoding: 'utf8' }) - ); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - - srv.on('ping', (data) => { - assert.strictEqual(data.toString(), 'foobar'); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent pong to be delivered', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - - ws.send(fileStream); - ws.pong('foobar'); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual( - data, - fs.readFileSync('test/fixtures/textfile', { encoding: 'utf8' }) - ); - if (++receivedIndex === 2) { - srv.close(done); - ws.close(); - } - }); - - srv.on('pong', (data) => { - assert.strictEqual(data.toString(), 'foobar'); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent close to be delivered', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - ws.send(fileStream); - ws.close(1000, 'foobar'); - }); - - ws.on('close', () => srv.close(done)); - ws.on('error', () => { - // That's quite alright -- a send was attempted after close - }); - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual( - data, - fs.readFileSync('test/fixtures/textfile', { encoding: 'utf8' }) - ); - }); - - srv.on('close', (code, data) => { - assert.strictEqual(code, 1000); - assert.strictEqual(data, 'foobar'); - }); - }); - }); - }); - - describe('#stream', function () { - it('very long binary data can be streamed', function (done) { - server.createServer(++port, (srv) => { - const buffer = new Buffer(10 * 1024); - - for (let i = 0; i < buffer.length; ++i) { - buffer[i] = i % 0xff; - } - - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - const bufLen = buffer.length; - const blockSize = 800; - let i = 0; - - ws.stream({ binary: true }, (error, send) => { - assert.ifError(error); - - const start = i * blockSize; - const toSend = Math.min(blockSize, bufLen - (i * blockSize)); - const end = start + toSend; - const isFinal = toSend < blockSize; - - send(buffer.slice(start, end), isFinal); - i += 1; - }); - }); - - srv.on('message', (data, flags) => { - assert.ok(flags.binary); - assert.ok(data.equals(buffer)); - srv.close(done); - ws.terminate(); - }); - }); - }); - - it('before connect should pass error through callback', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.stream((error) => { - assert.ok(error instanceof Error); - srv.close(done); - ws.terminate(); - }); - }); - }); - - it('without callback should fail', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - try { - ws.stream(); - } catch (e) { - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent send to be delayed in order', function (done) { - server.createServer(++port, (srv) => { - const payload = 'HelloWorld'; - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - let i = 0; - - ws.stream((error, send) => { - assert.ifError(error); - if (++i === 1) { - send(payload.substr(0, 5)); - ws.send('foobar'); - ws.send('baz'); - } else { - send(payload.substr(5, 5), true); - } - }); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - if (++receivedIndex === 1) { - assert.ok(!flags.binary); - assert.strictEqual(data, payload); - } else if (receivedIndex === 2) { - assert.ok(!flags.binary); - assert.strictEqual(data, 'foobar'); - } else { - assert.ok(!flags.binary); - assert.strictEqual(data, 'baz'); - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent stream to be delayed in order', function (done) { - server.createServer(++port, (srv) => { - const payload = 'HelloWorld'; - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - let i = 0; - - ws.stream((error, send) => { - assert.ifError(error); - if (++i === 1) { - send(payload.substr(0, 5)); - - let i2 = 0; - - ws.stream((error, send) => { - assert.ifError(error); - if (++i2 === 1) send('foo'); - else send('bar', true); - }); - - ws.send('baz'); - } else { - send(payload.substr(5, 5), true); - } - }); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - if (++receivedIndex === 1) { - assert.ok(!flags.binary); - assert.strictEqual(data, payload); - } else if (receivedIndex === 2) { - assert.ok(!flags.binary); - assert.strictEqual(data, 'foobar'); - } else if (receivedIndex === 3) { - assert.ok(!flags.binary); - assert.strictEqual(data, 'baz'); - setTimeout(() => { - srv.close(done); - ws.terminate(); - }, 1000); - } else { - throw new Error('more messages than we actually sent just arrived'); - } - }); - }); - }); - - it('will cause intermittent ping to be delivered', function (done) { - server.createServer(++port, (srv) => { - const payload = 'HelloWorld'; - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - let i = 0; - - ws.stream((error, send) => { - assert.ifError(error); - if (++i === 1) { - send(payload.substr(0, 5)); - ws.ping('foobar'); - } else { - send(payload.substr(5, 5), true); - } - }); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual(data, payload); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - - srv.on('ping', (data) => { - assert.strictEqual(data.toString(), 'foobar'); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent pong to be delivered', function (done) { - server.createServer(++port, (srv) => { - const payload = 'HelloWorld'; - const ws = new WebSocket(`ws://localhost:${port}`); - - ws.on('open', () => { - let i = 0; - - ws.stream((error, send) => { - assert.ifError(error); - if (++i === 1) { - send(payload.substr(0, 5)); - ws.pong('foobar'); - } else { - send(payload.substr(5, 5), true); - } - }); - }); - - let receivedIndex = 0; - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual(data, payload); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - - srv.on('pong', (data) => { - assert.strictEqual(data.toString(), 'foobar'); - if (++receivedIndex === 2) { - srv.close(done); - ws.terminate(); - } - }); - }); - }); - - it('will cause intermittent close to be delivered', function (done) { - server.createServer(++port, (srv) => { - const payload = 'HelloWorld'; - const ws = new WebSocket(`ws://localhost:${port}`); - let errorGiven = false; - - ws.on('open', () => { - let i = 0; - - ws.stream((error, send) => { - if (++i === 1) { - send(payload.substr(0, 5)); - ws.close(1000, 'foobar'); - } else if (i === 2) { - send(payload.substr(5, 5), true); - } else if (i === 3) { - assert.ok(error); - errorGiven = true; - } - }); - }); - - ws.on('close', () => { - assert.ok(errorGiven); - srv.close(done); - ws.terminate(); - }); - - srv.on('message', (data, flags) => { - assert.ok(!flags.binary); - assert.strictEqual(data, payload); - }); - - srv.on('close', (code, data) => { - assert.strictEqual(code, 1000); - assert.strictEqual(data.toString(), 'foobar'); - }); - }); - }); }); describe('#close', function () { - it('will raise error callback, if any, if called during send stream', function (done) { - server.createServer(++port, (srv) => { - const ws = new WebSocket(`ws://localhost:${port}`); - let errorGiven = false; - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100, - encoding: 'utf8' - }); - - ws.send(fileStream, (error) => { - errorGiven = !!error; - }); - ws.close(1000, 'foobar'); - }); - - ws.on('close', () => { - setTimeout(() => { - assert.ok(errorGiven); - srv.close(done); - }, 1000); - }); - }); - }); - it('without invalid first argument throws exception', function (done) { server.createServer(++port, (srv) => { const ws = new WebSocket(`ws://localhost:${port}`); @@ -2128,44 +1576,6 @@ describe('WebSocket', function () { }); }); - it('with binary stream will send fragmented data', function (done) { - const wss = new WebSocketServer({ - perMessageDeflate: true, - port: ++port - }, () => { - const ws = new WebSocket(`ws://localhost:${port}`, { - perMessageDeflate: true - }); - - let callbackFired = false; - - ws.on('open', () => { - const fileStream = fs.createReadStream('test/fixtures/textfile', { - highWaterMark: 100 - }); - - ws.send(fileStream, { binary: true, compress: true }, (error) => { - assert.ifError(error); - callbackFired = true; - }); - }); - - ws.on('close', () => { - assert.ok(callbackFired); - wss.close(); - done(); - }); - }); - - wss.on('connection', (ws) => { - ws.on('message', (data, flags) => { - assert.ok(flags.binary); - assert.ok(data.equals(fs.readFileSync('test/fixtures/textfile'))); - ws.close(); - }); - }); - }); - describe('#send', function () { it('can set the compress option true when perMessageDeflate is disabled', function (done) { const wss = new WebSocketServer({ port: ++port }, () => {