-
-
Notifications
You must be signed in to change notification settings - Fork 23
/
index.js
84 lines (70 loc) · 1.63 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* Module dependencies.
*/
const duplexer = require('duplexer2')
const { PassThrough, Readable } = require('stream')
/**
* Duplexer options.
*/
const defaultOpts = {
bubbleErrors: true,
objectMode: true
}
/**
* Pipe.
*
* @param streams Array[Stream,...]
* @param opts [Object]
* @param cb [Function]
* @return {Stream}
* @api public
*/
const pipe = (...streams) => {
let opts, cb
if (Array.isArray(streams[0])) {
streams = streams[0]
}
if (typeof streams[streams.length - 1] === 'function') {
cb = streams.pop()
}
if (
typeof streams[streams.length - 1] === 'object' &&
typeof streams[streams.length - 1].pipe !== 'function'
) {
opts = streams.pop()
}
const first = streams[0]
const last = streams[streams.length - 1]
let ret
opts = Object.assign({}, defaultOpts, opts)
if (!first) {
if (cb) process.nextTick(cb)
return new PassThrough(opts)
}
if (first.writable && last.readable) ret = duplexer(opts, first, last)
else if (streams.length === 1) ret = new Readable(opts).wrap(streams[0])
else if (first.writable) ret = first
else if (last.readable) ret = last
else ret = new PassThrough(opts)
for (const [i, stream] of streams.entries()) {
const next = streams[i + 1]
if (next) stream.pipe(next)
if (stream !== ret) stream.on('error', err => ret.emit('error', err))
}
if (cb) {
let ended = false
const end = err => {
if (ended) return
ended = true
cb(err)
}
ret.on('error', end)
last.on('finish', () => end())
last.on('close', () => end())
}
return ret
}
/**
* Expose `pipe`.
*/
module.exports = pipe