Skip to content

Commit

Permalink
Do not destroy completed requests (#1248)
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak committed May 12, 2020
1 parent 48bbb36 commit b9b6b1e
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 6 deletions.
1 change: 0 additions & 1 deletion package.json
Expand Up @@ -49,7 +49,6 @@
"@types/responselike": "^1.0.0",
"cacheable-lookup": "^5.0.3",
"cacheable-request": "^7.0.1",
"decompress-response": "^5.0.0",
"get-stream": "^5.1.0",
"http2-wrapper": "^1.0.0-beta.4.5",
"lowercase-keys": "^2.0.0",
Expand Down
2 changes: 1 addition & 1 deletion source/core/index.ts
Expand Up @@ -8,7 +8,6 @@ import http = require('http');
import {ClientRequest, RequestOptions, IncomingMessage, ServerResponse, request as httpRequest} from 'http';
import https = require('https');
import timer, {ClientRequestWithTimings, Timings, IncomingMessageWithTimings} from '@szmarczak/http-timer';
import decompressResponse = require('decompress-response');
import CacheableLookup from 'cacheable-lookup';
import CacheableRequest = require('cacheable-request');
// @ts-ignore Missing types
Expand All @@ -24,6 +23,7 @@ import timedOut, {Delays, TimeoutError as TimedOutTimeoutError} from './utils/ti
import urlToOptions from './utils/url-to-options';
import optionsToUrl, {URLOptions} from './utils/options-to-url';
import WeakableMap from './utils/weakable-map';
import decompressResponse from './utils/decompress-response';

type HttpRequestFunction = typeof httpRequest;
type Error = NodeJS.ErrnoException;
Expand Down
99 changes: 99 additions & 0 deletions source/core/utils/decompress-response.ts
@@ -0,0 +1,99 @@
'use strict';
import {IncomingMessage} from 'http';
import {Transform, PassThrough} from 'stream';
import zlib = require('zlib');

const knownProperties = [
'aborted',
'complete',
'headers',
'httpVersion',
'httpVersionMinor',
'httpVersionMajor',
'method',
'rawHeaders',
'rawTrailers',
'setTimeout',
'socket',
'statusCode',
'statusMessage',
'trailers',
'url'
];

// TODO: Switch back to the `decompress-response` package when it's fixed
const decompressResponse = (response: IncomingMessage): IncomingMessage => {
const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase();

if (!['gzip', 'deflate', 'br'].includes(contentEncoding)) {
return response;
}

// TODO: Remove this when targeting Node.js 12.
const isBrotli = contentEncoding === 'br';
if (isBrotli && typeof zlib.createBrotliDecompress !== 'function') {
response.destroy(new Error('Brotli is not supported on Node.js < 12'));
return response;
}

let isEmpty = true;

const checker = new Transform({
transform(data, _encoding, callback) {
isEmpty = false;

callback(null, data);
},

flush(callback) {
callback();
}
});

const stream = new PassThrough({
autoDestroy: false,
destroy(error, callback) {
response.destroy();

callback(error);
}
});

const decompressStream = isBrotli ? zlib.createBrotliDecompress() : zlib.createUnzip();

decompressStream.once('error', (error: Error) => {
if (isEmpty) {
stream.end();
return;
}

stream.destroy(error);
});

response.pipe(checker).pipe(decompressStream).pipe(stream);

response.once('error', error => {
stream.destroy(error);
});

const properties: {[key: string]: any} = {};

for (const property of knownProperties) {
properties[property] = {
get() {
return (response as any)[property];
},
set(value: unknown) {
(response as any)[property] = value;
},
enumerable: true,
configurable: false
};
}

Object.defineProperties(stream, properties);

return stream as unknown as IncomingMessage;
};

export default decompressResponse;
9 changes: 6 additions & 3 deletions test/gzip.ts
Expand Up @@ -3,7 +3,7 @@ import zlib = require('zlib');
import test from 'ava';
import getStream = require('get-stream');
import withServer from './helpers/with-server';
import {HTTPError} from '../source';
import {HTTPError, ReadError} from '../source';

const testContent = 'Compressible response content.\n';
const testContentUncompressed = 'Uncompressed response content.\n';
Expand Down Expand Up @@ -115,13 +115,16 @@ test('does not break HEAD responses', withServer, async (t, server, got) => {
t.is((await got.head('')).body, '');
});

test('ignore missing data', withServer, async (t, server, got) => {
test('does not ignore missing data', withServer, async (t, server, got) => {
server.get('/', (_request, response) => {
response.setHeader('Content-Encoding', 'gzip');
response.end(gzipData.slice(0, -1));
});

t.is((await got('')).body, testContent);
await t.throwsAsync(got(''), {
instanceOf: ReadError,
message: 'unexpected end of file'
});
});

test('response has `url` and `requestUrl` properties', withServer, async (t, server, got) => {
Expand Down
40 changes: 39 additions & 1 deletion test/http.ts
@@ -1,7 +1,8 @@
import {STATUS_CODES} from 'http';
import {STATUS_CODES, Agent} from 'http';
import test from 'ava';
import nock = require('nock');
import getStream = require('get-stream');
import pEvent = require('p-event');
import got, {HTTPError, UnsupportedProtocolError} from '../source';
import withServer from './helpers/with-server';

Expand Down Expand Up @@ -217,3 +218,40 @@ test('statusMessage fallback', async t => {

t.is(statusMessage, STATUS_CODES[503]);
});

test('does not destroy completed requests', withServer, async (t, server, got) => {
server.get('/', (_request, response) => {
response.setHeader('content-encoding', 'gzip');
response.end('');
});

const options = {
agent: {
http: new Agent({keepAlive: true})
},
retry: 0
};

const stream = got.stream(options);
stream.resume();

const endPromise = pEvent(stream, 'end');

const socket = await pEvent(stream, 'socket');

const closeListener = () => {
t.fail('Socket has been destroyed');
};

socket.once('close', closeListener);

await new Promise(resolve => setTimeout(resolve, 10));

socket.off('close', closeListener);

await endPromise;

options.agent.http.destroy();

t.pass();
});

0 comments on commit b9b6b1e

Please sign in to comment.