From 1cb548469c9018a9d74c32a1e8d8dca3b4158f35 Mon Sep 17 00:00:00 2001 From: Szymon Marczak Date: Tue, 2 Apr 2019 20:21:14 +0200 Subject: [PATCH 1/5] Add Stream property --- source/as-stream.ts | 22 ++++++++++++++++++---- source/utils/types.ts | 1 + test/cache.ts | 38 ++++++++++++++++++++++++++++++++++++++ test/stream.ts | 15 ++++++++------- 4 files changed, 65 insertions(+), 11 deletions(-) diff --git a/source/as-stream.ts b/source/as-stream.ts index e6daf1f31..4e88d67b7 100644 --- a/source/as-stream.ts +++ b/source/as-stream.ts @@ -1,13 +1,17 @@ -import {PassThrough as PassThroughStream} from 'stream'; +import {PassThrough as PassThroughStream, Duplex as DuplexStream} from 'stream'; import duplexer3 from 'duplexer3'; import requestAsEventEmitter from './request-as-event-emitter'; import {HTTPError, ReadError} from './errors'; import {MergedOptions, Response} from './utils/types'; +export class ProxyStream extends DuplexStream { + fromCache?: boolean; +} + export default function asStream(options: MergedOptions) { const input = new PassThroughStream(); const output = new PassThroughStream(); - const proxy = duplexer3(input, output); + const proxy = duplexer3(input, output) as ProxyStream; const piped = new Set(); let isFinished = false; @@ -26,7 +30,8 @@ export default function asStream(options: MergedOptions) { proxy._destroy = emitter.abort; emitter.on('response', (response: Response) => { - const {statusCode} = response; + const {statusCode, fromCache} = response; + proxy.fromCache = fromCache; response.on('error', error => { proxy.emit('error', new ReadError(error, options)); @@ -37,7 +42,14 @@ export default function asStream(options: MergedOptions) { return; } - isFinished = true; + { + const read = proxy._read.bind(proxy); + proxy._read = (...args) => { + isFinished = true; + + return read(...args); + }; + } response.pipe(output); @@ -90,5 +102,7 @@ export default function asStream(options: MergedOptions) { return unpipe(stream); }; + proxy.fromCache = undefined; + return proxy; } diff --git a/source/utils/types.ts b/source/utils/types.ts index abf593542..108c14695 100644 --- a/source/utils/types.ts +++ b/source/utils/types.ts @@ -51,6 +51,7 @@ export type IterateFunction = (options: Options) => void; export interface Response extends IncomingMessage { body: string | Buffer; statusCode: number; + fromCache?: boolean; } export interface Timings { diff --git a/test/cache.ts b/test/cache.ts index 6b6b9ce53..66944a251 100644 --- a/test/cache.ts +++ b/test/cache.ts @@ -1,4 +1,6 @@ import test from 'ava'; +import pEvent from 'p-event'; +import getStream from 'get-stream'; import withServer from './helpers/with-server'; const cacheEndpoint = (_request, response) => { @@ -135,3 +137,39 @@ test('DNS cache works', withServer, async (t, _server, got) => { t.is(map.size, 1); }); + +test('`fromCache` stream property is undefined before the `response` event', withServer, async (t, server, got) => { + server.get('/', cacheEndpoint); + + const cache = new Map(); + const stream = got.stream({cache}); + t.is(stream.fromCache, undefined); + + await getStream(stream); +}); + +test('`fromCache` stream property is false after the `response` event', withServer, async (t, server, got) => { + server.get('/', cacheEndpoint); + + const cache = new Map(); + const stream = got.stream({cache}); + + await pEvent(stream, 'response'); + t.is(stream.fromCache, false); + + await getStream(stream); +}); + +test('`fromCache` stream property is true if the response was cached', withServer, async (t, server, got) => { + server.get('/', cacheEndpoint); + + const cache = new Map(); + + await getStream(got.stream({cache})); + const stream = got.stream({cache}); + + await pEvent(stream, 'response'); + t.is(stream.fromCache, true); + + await getStream(stream); +}); diff --git a/test/stream.ts b/test/stream.ts index 26d5c0ae6..01c24438b 100644 --- a/test/stream.ts +++ b/test/stream.ts @@ -1,8 +1,8 @@ +import {PassThrough} from 'stream'; import test from 'ava'; import toReadableStream from 'to-readable-stream'; import getStream from 'get-stream'; import pEvent from 'p-event'; -import delay from 'delay'; import is from '@sindresorhus/is'; import withServer from './helpers/with-server'; @@ -170,14 +170,15 @@ test('skips proxying headers after server has sent them already', withServer, as test('throws when trying to proxy through a closed stream', withServer, async (t, server, got) => { server.get('/', defaultHandler); - server.get('/proxy', async (_request, response) => { - const stream = got.stream(''); - await delay(1000); - t.throws(() => stream.pipe(response), 'Failed to pipe. The response has been emitted already.'); - response.end(); + + const stream = got.stream(''); + const promise = getStream(stream); + + stream.once('data', () => { + t.throws(() => stream.pipe(new PassThrough()), 'Failed to pipe. The response has been emitted already.'); }); - await got('proxy'); + await promise; }); test('proxies `content-encoding` header when `options.decompress` is false', withServer, async (t, server, got) => { From f1476abea914e287640f6e06caad6befcc125878 Mon Sep 17 00:00:00 2001 From: Szymon Marczak <36894700+szmarczak@users.noreply.github.com> Date: Fri, 5 Apr 2019 16:03:03 +0200 Subject: [PATCH 2/5] Throw on canceled request with incomplete response (#767) --- source/as-promise.ts | 5 +++ source/as-stream.ts | 5 ++- test/cancel.ts | 57 ++++++++++++++++++++++++++++++++ test/helpers/slow-data-stream.ts | 18 ++++++++++ test/timeout.ts | 19 +---------- 5 files changed, 85 insertions(+), 19 deletions(-) create mode 100644 test/helpers/slow-data-stream.ts diff --git a/source/as-promise.ts b/source/as-promise.ts index a619a0734..305aa6d4f 100644 --- a/source/as-promise.ts +++ b/source/as-promise.ts @@ -40,6 +40,11 @@ export default function asPromise(options: Options) { return; } + if (response.req && response.req.aborted) { + // Canceled while downloading - will throw a CancelError or TimeoutError + return; + } + const limitStatusCode = options.followRedirect ? 299 : 399; response.body = data; diff --git a/source/as-stream.ts b/source/as-stream.ts index 4e88d67b7..a428b37be 100644 --- a/source/as-stream.ts +++ b/source/as-stream.ts @@ -27,7 +27,10 @@ export default function asStream(options: MergedOptions) { const emitter = requestAsEventEmitter(options, input); // Cancels the request - proxy._destroy = emitter.abort; + proxy._destroy = (error, callback) => { + callback(error); + emitter.abort(); + }; emitter.on('response', (response: Response) => { const {statusCode, fromCache} = response; diff --git a/test/cancel.ts b/test/cancel.ts index 2e9b1e329..3e0f923b3 100644 --- a/test/cancel.ts +++ b/test/cancel.ts @@ -2,9 +2,11 @@ import {EventEmitter} from 'events'; import {Readable as ReadableStream} from 'stream'; import test from 'ava'; import pEvent from 'p-event'; +import getStream from 'get-stream'; // @ts-ignore import got, {CancelError} from '../source'; import withServer from './helpers/with-server'; +import slowDataStream from './helpers/slow-data-stream'; const prepareServer = server => { const emitter = new EventEmitter(); @@ -34,6 +36,14 @@ const prepareServer = server => { return {emitter, promise}; }; +const downloadHandler = (_request, response) => { + response.writeHead(200, { + 'transfer-encoding': 'chunked' + }); + response.flushHeaders(); + slowDataStream().pipe(response); +}; + test('does not retry after cancelation', withServer, async (t, server, got) => { const {emitter, promise} = prepareServer(server); @@ -145,3 +155,50 @@ test('recover from cancellation using error instance', async t => { await t.notThrowsAsync(recover); }); + +test('throws on incomplete (canceled) response - promise', withServer, async (t, server, got) => { + server.get('/', downloadHandler); + + await t.throwsAsync(got({ + timeout: {request: 500} + }), got.TimeoutError); +}); + +test('throws on incomplete (canceled) response - promise #2', withServer, async (t, server, got) => { + server.get('/', downloadHandler); + + const promise = got('').on('response', () => { + setTimeout(() => promise.cancel(), 500); + }); + + await t.throwsAsync(promise, got.CancelError); +}); + +test('throws on incomplete (canceled) response - stream', withServer, async (t, server, got) => { + server.get('/', downloadHandler); + + const errorString = 'Foobar'; + + const stream = got.stream('').on('response', () => { + setTimeout(() => stream.destroy(new Error(errorString)), 500); + }); + + await t.throwsAsync(getStream(stream), errorString); +}); + +// Note: it will throw, but the response is loaded already. +test('throws when canceling cached request', withServer, async (t, server, got) => { + server.get('/', (_request, response) => { + response.setHeader('Cache-Control', 'public, max-age=60'); + response.end(Date.now().toString()); + }); + + const cache = new Map(); + await got({cache}); + + const promise = got({cache}).on('response', () => { + promise.cancel(); + }); + + await t.throwsAsync(promise, got.CancelError); +}); diff --git a/test/helpers/slow-data-stream.ts b/test/helpers/slow-data-stream.ts new file mode 100644 index 000000000..eb39235f3 --- /dev/null +++ b/test/helpers/slow-data-stream.ts @@ -0,0 +1,18 @@ +import {PassThrough} from 'stream'; + +export default (): PassThrough => { + const slowStream = new PassThrough(); + let count = 0; + + const interval = setInterval(() => { + if (count++ < 10) { + slowStream.push('data\n'.repeat(100)); + return; + } + + clearInterval(interval); + slowStream.push(null); + }, 100); + + return slowStream; +}; diff --git a/test/timeout.ts b/test/timeout.ts index 97feb5ce9..7c8560f79 100644 --- a/test/timeout.ts +++ b/test/timeout.ts @@ -1,29 +1,12 @@ import http from 'http'; import net from 'net'; -import stream from 'stream'; import getStream from 'get-stream'; import test from 'ava'; import pEvent from 'p-event'; import delay from 'delay'; import got from '../source'; import withServer from './helpers/with-server'; - -const slowDataStream = () => { - const slowStream = new stream.PassThrough(); - let count = 0; - - const interval = setInterval(() => { - if (count++ < 10) { - slowStream.push('data\n'.repeat(100)); - return; - } - - clearInterval(interval); - slowStream.push(null); - }, 100); - - return slowStream; -}; +import slowDataStream from './helpers/slow-data-stream'; const requestDelay = 800; From a6813b82541bd638bb0e08a4f90efde6e4051c4b Mon Sep 17 00:00:00 2001 From: Szymon Marczak Date: Tue, 9 Apr 2019 21:28:34 +0200 Subject: [PATCH 3/5] fixes --- readme.md | 12 +++++++----- source/as-stream.ts | 4 ++-- source/request-as-event-emitter.ts | 2 ++ test/cache.ts | 15 +++++++++------ 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/readme.md b/readme.md index 5d267b9d7..d01932002 100644 --- a/readme.md +++ b/readme.md @@ -110,7 +110,7 @@ Properties from `options` will override properties in the parsed `url`. If no protocol is specified, it will throw a `TypeError`. -**Note**: this can also be an option. +**Note:** this can also be an option. ##### options @@ -229,7 +229,7 @@ If set to `true` and `Content-Type` header is not set, it will be set to `applic Type: `string` `Object` [`URLSearchParams`](https://developer.mozilla.org/en-US/docs/Web/API/URLSearchParams) -**Note**: The `query` option was renamed to `searchParams` in Got v10. The `query` option name is still functional, but is being deprecated and will be completely removed in Got v11. +**Note:** The `query` option was renamed to `searchParams` in Got v10. The `query` option name is still functional, but is being deprecated and will be completely removed in Got v11. Query string that will be added to the request URL. This will override the query string in `url`. @@ -576,7 +576,7 @@ The object contains the following properties: **Note:** The time is a `number` representing the milliseconds elapsed since the UNIX epoch. -##### fromCache +##### isFromCache Type: `boolean` @@ -598,6 +598,8 @@ The number of times the request was retried. **Note:** Progress events, redirect events and request/response events can also be used with promises. +**Note:** To access `response.isFromCache` you need to use `got.stream(url, options).isFromCache`. The value will be undefined until the `response` event. + #### got.stream(url, [options]) Sets `options.stream` to `true`. @@ -827,11 +829,11 @@ const map = new Map(); (async () => { let response = await got('https://sindresorhus.com', {cache: map}); - console.log(response.fromCache); + console.log(response.isFromCache); //=> false response = await got('https://sindresorhus.com', {cache: map}); - console.log(response.fromCache); + console.log(response.isFromCache); //=> true })(); ``` diff --git a/source/as-stream.ts b/source/as-stream.ts index a428b37be..b2d43b268 100644 --- a/source/as-stream.ts +++ b/source/as-stream.ts @@ -5,7 +5,7 @@ import {HTTPError, ReadError} from './errors'; import {MergedOptions, Response} from './utils/types'; export class ProxyStream extends DuplexStream { - fromCache?: boolean; + isFromCache?: boolean; } export default function asStream(options: MergedOptions) { @@ -34,7 +34,7 @@ export default function asStream(options: MergedOptions) { emitter.on('response', (response: Response) => { const {statusCode, fromCache} = response; - proxy.fromCache = fromCache; + proxy.isFromCache = fromCache; response.on('error', error => { proxy.emit('error', new ReadError(error, options)); diff --git a/source/request-as-event-emitter.ts b/source/request-as-event-emitter.ts index dcc88d73e..78871bee8 100644 --- a/source/request-as-event-emitter.ts +++ b/source/request-as-event-emitter.ts @@ -119,6 +119,8 @@ export default (options, input?: TransformStream) => { response.request = { gotOptions: options }; + response.isFromCache = response.fromCache || false; + delete response.fromCache; const rawCookies = response.headers['set-cookie']; if (options.cookieJar && rawCookies) { diff --git a/test/cache.ts b/test/cache.ts index 66944a251..42f7d65be 100644 --- a/test/cache.ts +++ b/test/cache.ts @@ -138,29 +138,31 @@ test('DNS cache works', withServer, async (t, _server, got) => { t.is(map.size, 1); }); -test('`fromCache` stream property is undefined before the `response` event', withServer, async (t, server, got) => { +test('`isFromCache` stream property is undefined before the `response` event', withServer, async (t, server, got) => { server.get('/', cacheEndpoint); const cache = new Map(); const stream = got.stream({cache}); - t.is(stream.fromCache, undefined); + t.is(stream.isFromCache + , undefined); await getStream(stream); }); -test('`fromCache` stream property is false after the `response` event', withServer, async (t, server, got) => { +test('`isFromCache` stream property is false after the `response` event', withServer, async (t, server, got) => { server.get('/', cacheEndpoint); const cache = new Map(); const stream = got.stream({cache}); await pEvent(stream, 'response'); - t.is(stream.fromCache, false); + t.is(stream.isFromCache + , false); await getStream(stream); }); -test('`fromCache` stream property is true if the response was cached', withServer, async (t, server, got) => { +test('`isFromCache` stream property is true if the response was cached', withServer, async (t, server, got) => { server.get('/', cacheEndpoint); const cache = new Map(); @@ -169,7 +171,8 @@ test('`fromCache` stream property is true if the response was cached', withServe const stream = got.stream({cache}); await pEvent(stream, 'response'); - t.is(stream.fromCache, true); + t.is(stream.isFromCache + , true); await getStream(stream); }); From ba7dd89796604056e3e4a6ae03584f277a38462e Mon Sep 17 00:00:00 2001 From: Szymon Marczak Date: Tue, 9 Apr 2019 21:47:58 +0200 Subject: [PATCH 4/5] fixes --- source/as-stream.ts | 6 +++--- source/utils/types.ts | 2 +- test/cache.ts | 16 ++++++++-------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/source/as-stream.ts b/source/as-stream.ts index b2d43b268..647e0e41f 100644 --- a/source/as-stream.ts +++ b/source/as-stream.ts @@ -33,8 +33,8 @@ export default function asStream(options: MergedOptions) { }; emitter.on('response', (response: Response) => { - const {statusCode, fromCache} = response; - proxy.isFromCache = fromCache; + const {statusCode, isFromCache} = response; + proxy.isFromCache = isFromCache; response.on('error', error => { proxy.emit('error', new ReadError(error, options)); @@ -105,7 +105,7 @@ export default function asStream(options: MergedOptions) { return unpipe(stream); }; - proxy.fromCache = undefined; + proxy.isFromCache = undefined; return proxy; } diff --git a/source/utils/types.ts b/source/utils/types.ts index 108c14695..ee188640f 100644 --- a/source/utils/types.ts +++ b/source/utils/types.ts @@ -51,7 +51,7 @@ export type IterateFunction = (options: Options) => void; export interface Response extends IncomingMessage { body: string | Buffer; statusCode: number; - fromCache?: boolean; + isFromCache?: boolean; } export interface Timings { diff --git a/test/cache.ts b/test/cache.ts index 42f7d65be..087dbae4d 100644 --- a/test/cache.ts +++ b/test/cache.ts @@ -2,6 +2,7 @@ import test from 'ava'; import pEvent from 'p-event'; import getStream from 'get-stream'; import withServer from './helpers/with-server'; +import {Response} from '../source/utils/types'; const cacheEndpoint = (_request, response) => { response.setHeader('Cache-Control', 'public, max-age=60'); @@ -143,8 +144,7 @@ test('`isFromCache` stream property is undefined before the `response` event', w const cache = new Map(); const stream = got.stream({cache}); - t.is(stream.isFromCache - , undefined); + t.is(stream.isFromCache, undefined); await getStream(stream); }); @@ -155,9 +155,9 @@ test('`isFromCache` stream property is false after the `response` event', withSe const cache = new Map(); const stream = got.stream({cache}); - await pEvent(stream, 'response'); - t.is(stream.isFromCache - , false); + const response = await pEvent(stream, 'response') as Response; + t.is(response.isFromCache, false); + t.is(stream.isFromCache, false); await getStream(stream); }); @@ -170,9 +170,9 @@ test('`isFromCache` stream property is true if the response was cached', withSer await getStream(got.stream({cache})); const stream = got.stream({cache}); - await pEvent(stream, 'response'); - t.is(stream.isFromCache - , true); + const response = await pEvent(stream, 'response') as Response; + t.is(response.isFromCache, true); + t.is(stream.isFromCache, true); await getStream(stream); }); From 1bbff9e5503b7438988d7a084efbf8fa79a8554f Mon Sep 17 00:00:00 2001 From: Szymon Marczak <36894700+szmarczak@users.noreply.github.com> Date: Tue, 9 Apr 2019 23:12:30 +0200 Subject: [PATCH 5/5] Update cache.ts --- test/cache.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cache.ts b/test/cache.ts index 087dbae4d..87a8f04bf 100644 --- a/test/cache.ts +++ b/test/cache.ts @@ -1,8 +1,8 @@ import test from 'ava'; import pEvent from 'p-event'; import getStream from 'get-stream'; -import withServer from './helpers/with-server'; import {Response} from '../source/utils/types'; +import withServer from './helpers/with-server'; const cacheEndpoint = (_request, response) => { response.setHeader('Cache-Control', 'public, max-age=60');