Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .isFromCache property to the stream API and rename the Promise API property from .fromCache to .isFromCache #768

Merged
merged 5 commits into from Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 18 additions & 4 deletions source/as-stream.ts
@@ -1,13 +1,17 @@
import {PassThrough as PassThroughStream} from 'stream';
import {PassThrough as PassThroughStream, Duplex as DuplexStream} from 'stream';
import duplexer3 from 'duplexer3';
import requestAsEventEmitter from './request-as-event-emitter';
import {HTTPError, ReadError} from './errors';
import {MergedOptions, Response} from './utils/types';

export class ProxyStream extends DuplexStream {
fromCache?: boolean;
}

export default function asStream(options: MergedOptions) {
const input = new PassThroughStream();
const output = new PassThroughStream();
const proxy = duplexer3(input, output);
const proxy = duplexer3(input, output) as ProxyStream;
const piped = new Set();
let isFinished = false;

Expand All @@ -26,7 +30,8 @@ export default function asStream(options: MergedOptions) {
proxy._destroy = emitter.abort;

emitter.on('response', (response: Response) => {
const {statusCode} = response;
const {statusCode, fromCache} = response;
proxy.fromCache = fromCache;

response.on('error', error => {
proxy.emit('error', new ReadError(error, options));
Expand All @@ -37,7 +42,14 @@ export default function asStream(options: MergedOptions) {
return;
}

isFinished = true;
{
const read = proxy._read.bind(proxy);
proxy._read = (...args) => {
isFinished = true;

return read(...args);
};
}
sindresorhus marked this conversation as resolved.
Show resolved Hide resolved

response.pipe(output);

Expand Down Expand Up @@ -90,5 +102,7 @@ export default function asStream(options: MergedOptions) {
return unpipe(stream);
};

proxy.fromCache = undefined;

return proxy;
}
1 change: 1 addition & 0 deletions source/utils/types.ts
Expand Up @@ -51,6 +51,7 @@ export type IterateFunction = (options: Options) => void;
export interface Response extends IncomingMessage {
body: string | Buffer;
statusCode: number;
fromCache?: boolean;
}

export interface Timings {
Expand Down
38 changes: 38 additions & 0 deletions test/cache.ts
@@ -1,4 +1,6 @@
import test from 'ava';
import pEvent from 'p-event';
import getStream from 'get-stream';
import withServer from './helpers/with-server';

const cacheEndpoint = (_request, response) => {
Expand Down Expand Up @@ -135,3 +137,39 @@ test('DNS cache works', withServer, async (t, _server, got) => {

t.is(map.size, 1);
});

test('`fromCache` stream property is undefined before the `response` event', withServer, async (t, server, got) => {
server.get('/', cacheEndpoint);

const cache = new Map();
const stream = got.stream({cache});
t.is(stream.fromCache, undefined);

await getStream(stream);
});

test('`fromCache` stream property is false after the `response` event', withServer, async (t, server, got) => {
server.get('/', cacheEndpoint);

const cache = new Map();
const stream = got.stream({cache});

await pEvent(stream, 'response');
t.is(stream.fromCache, false);

await getStream(stream);
});

test('`fromCache` stream property is true if the response was cached', withServer, async (t, server, got) => {
server.get('/', cacheEndpoint);

const cache = new Map();

await getStream(got.stream({cache}));
const stream = got.stream({cache});

await pEvent(stream, 'response');
t.is(stream.fromCache, true);

await getStream(stream);
});
15 changes: 8 additions & 7 deletions test/stream.ts
@@ -1,8 +1,8 @@
import {PassThrough} from 'stream';
import test from 'ava';
import toReadableStream from 'to-readable-stream';
import getStream from 'get-stream';
import pEvent from 'p-event';
import delay from 'delay';
import is from '@sindresorhus/is';
import withServer from './helpers/with-server';

Expand Down Expand Up @@ -170,14 +170,15 @@ test('skips proxying headers after server has sent them already', withServer, as

test('throws when trying to proxy through a closed stream', withServer, async (t, server, got) => {
server.get('/', defaultHandler);
server.get('/proxy', async (_request, response) => {
const stream = got.stream('');
await delay(1000);
t.throws(() => stream.pipe(response), 'Failed to pipe. The response has been emitted already.');
response.end();

const stream = got.stream('');
const promise = getStream(stream);

stream.once('data', () => {
t.throws(() => stream.pipe(new PassThrough()), 'Failed to pipe. The response has been emitted already.');
});

await got('proxy');
await promise;
});

test('proxies `content-encoding` header when `options.decompress` is false', withServer, async (t, server, got) => {
Expand Down