Skip to content

Commit

Permalink
add standard
Browse files Browse the repository at this point in the history
  • Loading branch information
juliangruber committed Nov 20, 2017
1 parent 9ff06ea commit e044eb2
Show file tree
Hide file tree
Showing 4 changed files with 3,365 additions and 235 deletions.
95 changes: 49 additions & 46 deletions index.js
@@ -1,18 +1,17 @@

/**
* Module dependencies.
*/

var duplexer = require('duplexer2');
var PassThrough = require('stream').PassThrough;
var Readable = require('stream').Readable;
var objectAssign = require('object-assign');
var duplexer = require('duplexer2')
var PassThrough = require('stream').PassThrough
var Readable = require('stream').Readable
var objectAssign = require('object-assign')

/**
* Slice reference.
*/

var slice = [].slice;
var slice = [].slice

/**
* Duplexer options.
Expand All @@ -21,13 +20,13 @@ var slice = [].slice;
var defaultOpts = {
bubbleErrors: false,
objectMode: true
};
}

/**
* Expose `pipe`.
*/

module.exports = pipe;
module.exports = pipe

/**
* Pipe.
Expand All @@ -39,55 +38,59 @@ module.exports = pipe;
* @api public
*/

function pipe(streams, opts, cb){
function pipe (streams, opts, cb) {
if (!Array.isArray(streams)) {
streams = slice.call(arguments);
opts = null;
cb = null;
streams = slice.call(arguments)
opts = null
cb = null
}

var lastArg = streams[streams.length - 1];
if ('function' == typeof lastArg) {
cb = streams.splice(-1)[0];
lastArg = streams[streams.length - 1];
var lastArg = streams[streams.length - 1]
if (typeof lastArg === 'function') {
cb = streams.splice(-1)[0]
lastArg = streams[streams.length - 1]
}
if ('object' == typeof lastArg && typeof lastArg.pipe != 'function') {
opts = streams.splice(-1)[0];
if (typeof lastArg === 'object' && typeof lastArg.pipe !== 'function') {
opts = streams.splice(-1)[0]
}
var first = streams[0];
var last = streams[streams.length - 1];
var ret;
var first = streams[0]
var last = streams[streams.length - 1]
var ret
opts = objectAssign({}, defaultOpts, opts)

if (!first) {
if (cb) process.nextTick(cb);
return new PassThrough(opts);
if (cb) process.nextTick(cb)
return new PassThrough(opts)
}

if (first.writable && last.readable) ret = duplexer(opts, first, last)
else if (streams.length === 1) ret = new Readable(opts).wrap(streams[0])
else if (first.writable) ret = first
else if (last.readable) ret = last
else ret = new PassThrough(opts)

streams.forEach(function (stream, i) {
var next = streams[i + 1]
if (next) stream.pipe(next)
if (stream !== ret) stream.on('error', ret.emit.bind(ret, 'error'))
})

function end (err) {
if (ended) return
ended = true
cb(err)
}

if (first.writable && last.readable) ret = duplexer(opts, first, last);
else if (streams.length == 1) ret = new Readable(opts).wrap(streams[0]);
else if (first.writable) ret = first;
else if (last.readable) ret = last;
else ret = new PassThrough(opts);

streams.forEach(function(stream, i){
var next = streams[i+1];
if (next) stream.pipe(next);
if (stream != ret) stream.on('error', ret.emit.bind(ret, 'error'));
});

if (cb) {
var ended = false;
ret.on('error', end);
last.on('finish', function(){ end() });
last.on('close', function(){ end() });
function end(err){
if (ended) return;
ended = true;
cb(err);
}
var ended = false
ret.on('error', end)
last.on('finish', function () {
end()
})
last.on('close', function () {
end()
})
}

return ret;
return ret
}

0 comments on commit e044eb2

Please sign in to comment.