-
-
Notifications
You must be signed in to change notification settings - Fork 916
/
as-stream.ts
121 lines (96 loc) 路 2.9 KB
/
as-stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import {PassThrough as PassThroughStream, Duplex as DuplexStream} from 'stream';
import {IncomingMessage} from 'http';
import duplexer3 from 'duplexer3';
import requestAsEventEmitter from './request-as-event-emitter';
import {HTTPError, ReadError} from './errors';
import {MergedOptions, Response} from './utils/types';
export class ProxyStream extends DuplexStream {
isFromCache?: boolean;
}
export default function asStream(options: MergedOptions) {
const input = new PassThroughStream();
const output = new PassThroughStream();
const proxy = duplexer3(input, output) as ProxyStream;
const piped = new Set();
let isFinished = false;
options.retry.retries = () => 0;
if (options.body) {
proxy.write = () => {
proxy.destroy();
throw new Error('Got\'s stream is not writable when the `body` option is used');
};
}
const emitter = requestAsEventEmitter(options, input);
// Cancels the request
proxy._destroy = (error, callback) => {
callback(error);
emitter.abort();
};
emitter.on('response', (response: Response) => {
const {statusCode, isFromCache} = response;
proxy.isFromCache = isFromCache;
response.on('error', error => {
proxy.emit('error', new ReadError(error, options));
});
if (options.throwHttpErrors && statusCode !== 304 && (statusCode < 200 || statusCode > 299)) {
proxy.emit('error', new HTTPError(response, options), null, response);
return;
}
{
const read = proxy._read.bind(proxy);
proxy._read = (...args) => {
isFinished = true;
return read(...args);
};
}
response.pipe(output);
for (const destination of piped) {
if (destination.headersSent) {
continue;
}
for (const [key, value] of Object.entries(response.headers)) {
// Got gives *decompressed* data. Overriding `content-encoding` header would result in an error.
// It's not possible to decompress already decompressed data, is it?
const allowed = options.decompress ? key !== 'content-encoding' : true;
if (allowed) {
destination.setHeader(key, value);
}
}
destination.statusCode = response.statusCode;
}
proxy.emit('response', response);
});
[
'error',
'request',
'redirect',
'uploadProgress',
'downloadProgress'
].forEach(event => emitter.on(event, (...args) => proxy.emit(event, ...args)));
const pipe = proxy.pipe.bind(proxy);
const unpipe = proxy.unpipe.bind(proxy);
proxy.pipe = (destination, options) => {
if (isFinished) {
throw new Error('Failed to pipe. The response has been emitted already.');
}
pipe(destination, options);
if (Reflect.has(destination, 'setHeader')) {
piped.add(destination);
}
return destination;
};
proxy.unpipe = stream => {
piped.delete(stream);
return unpipe(stream);
};
proxy.on('pipe', source => {
if (source instanceof IncomingMessage) {
options.headers = {
...source.headers,
...options.headers
};
}
});
proxy.isFromCache = undefined;
return proxy;
}