Skip to content

Commit

Permalink
Support Streams/Async Iterable in Flight Reply
Browse files Browse the repository at this point in the history
  • Loading branch information
sebmarkbage committed May 3, 2024
1 parent 5fcfd71 commit 2f246f4
Show file tree
Hide file tree
Showing 4 changed files with 722 additions and 14 deletions.
146 changes: 141 additions & 5 deletions packages/react-client/src/ReactFlightReplyClient.js
Expand Up @@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';
import {
enableRenderableContext,
enableBinaryFlight,
enableFlightReadableStream,
} from 'shared/ReactFeatureFlags';

import {
Expand All @@ -28,6 +29,7 @@ import {
REACT_CONTEXT_TYPE,
REACT_PROVIDER_TYPE,
getIteratorFn,
ASYNC_ITERATOR,
} from 'shared/ReactSymbols';

import {
Expand Down Expand Up @@ -206,6 +208,123 @@ export function processReply(
return '$' + tag + blobId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. It also implies that different chunks can be split up or merged as opposed
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
// received in the same slices.
// $FlowFixMe: This is a Node.js extension.
let supportsBYOB: void | boolean = stream.supportsBYOB;
if (supportsBYOB === undefined) {
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
stream.getReader({mode: 'byob'}).releaseLock();
supportsBYOB = true;
} catch (x) {
supportsBYOB = false;
}
}

const reader = stream.getReader();

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, partJSON);
reader.read().then(progress, reject);
} catch (x) {
reject(x);
}
}
}
reader.read().then(progress, reject);

return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
}

function serializeAsyncIterable(
iterable: $AsyncIterable<ReactServerValue, ReactServerValue, void>,
iterator: $AsyncIterator<ReactServerValue, ReactServerValue, void>,
): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

// Generators/Iterators are Iterables but they're also their own iterator
// functions. If that's the case, we treat them as single-shot. Otherwise,
// we assume that this iterable might be a multi-shot and allow it to be
// iterated more than once on the client.
const isIterator = iterable === iterator;

// There's a race condition between when the stream is aborted and when the promise
// resolves so we track whether we already aborted it to avoid writing twice.
function progress(
entry:
| {done: false, +value: ReactServerValue, ...}
| {done: true, +value: ReactServerValue, ...},
) {
if (entry.done) {
if (entry.value === undefined) {
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
} else {
// Unlike streams, the last value may not be undefined. If it's not
// we outline it and encode a reference to it in the closing instruction.
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal
} catch (x) {
reject(x);
return;
}
}
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
try {
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, partJSON);
iterator.next().then(progress, reject);
} catch (x) {
reject(x);
return;
}
}
}

iterator.next().then(progress, reject);
return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16);
}

function resolveToJSON(
this:
| {+[key: string | number]: ReactServerValue}
Expand Down Expand Up @@ -349,11 +468,9 @@ export function processReply(
reject(reason);
}
},
reason => {
// In the future we could consider serializing this as an error
// that throws on the server instead.
reject(reason);
},
// In the future we could consider serializing this as an error
// that throws on the server instead.
reject,
);
return serializePromiseID(promiseId);
}
Expand Down Expand Up @@ -486,6 +603,25 @@ export function processReply(
return Array.from((iterator: any));
}

if (enableFlightReadableStream) {
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
if (
typeof ReadableStream === 'function' &&
value instanceof ReadableStream
) {
return serializeReadableStream(value);
}
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
(value: any)[ASYNC_ITERATOR];
if (typeof getAsyncIterator === 'function') {
// We treat AsyncIterables as a Fragment and as such we might need to key them.
return serializeAsyncIterable(
(value: any),
getAsyncIterator.call((value: any)),
);
}
}

// Verify that this is a simple plain object.
const proto = getPrototypeOf(value);
if (
Expand Down
Expand Up @@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => {
// This should've been the same reference that we already saw.
expect(response.children).toBe(children);
});

// @gate enableFlightReadableStream
it('should supports streaming ReadableStream with objects', async () => {
let controller1;
let controller2;
const s1 = new ReadableStream({
start(c) {
controller1 = c;
},
});
const s2 = new ReadableStream({
start(c) {
controller2 = c;
},
});

const promise = ReactServerDOMClient.encodeReply({s1, s2});

controller1.enqueue({hello: 'world'});
controller2.enqueue({hi: 'there'});

controller1.enqueue('text1');
controller2.enqueue('text2');

controller1.close();
controller2.close();

const body = await promise;

const result = await ReactServerDOMServer.decodeReply(
body,
webpackServerMap,
);
const reader1 = result.s1.getReader();
const reader2 = result.s2.getReader();

expect(await reader1.read()).toEqual({
value: {hello: 'world'},
done: false,
});
expect(await reader2.read()).toEqual({
value: {hi: 'there'},
done: false,
});

expect(await reader1.read()).toEqual({
value: 'text1',
done: false,
});
expect(await reader1.read()).toEqual({
value: undefined,
done: true,
});
expect(await reader2.read()).toEqual({
value: 'text2',
done: false,
});
expect(await reader2.read()).toEqual({
value: undefined,
done: true,
});
});

// @gate enableFlightReadableStream
it('should supports streaming AsyncIterables with objects', async () => {
let resolve;
const wait = new Promise(r => (resolve = r));
const multiShotIterable = {
async *[Symbol.asyncIterator]() {
const next = yield {hello: 'A'};
expect(next).toBe(undefined);
await wait;
yield {hi: 'B'};
return 'C';
},
};
const singleShotIterator = (async function* () {
const next = yield {hello: 'D'};
expect(next).toBe(undefined);
await wait;
yield {hi: 'E'};
return 'F';
})();

await resolve();

const body = await ReactServerDOMClient.encodeReply({
multiShotIterable,
singleShotIterator,
});
const result = await ReactServerDOMServer.decodeReply(
body,
webpackServerMap,
);

const iterator1 = result.multiShotIterable[Symbol.asyncIterator]();
const iterator2 = result.singleShotIterator[Symbol.asyncIterator]();

expect(iterator1).not.toBe(result.multiShotIterable);
expect(iterator2).toBe(result.singleShotIterator);

expect(await iterator1.next()).toEqual({
value: {hello: 'A'},
done: false,
});
expect(await iterator2.next()).toEqual({
value: {hello: 'D'},
done: false,
});

expect(await iterator1.next()).toEqual({
value: {hi: 'B'},
done: false,
});
expect(await iterator2.next()).toEqual({
value: {hi: 'E'},
done: false,
});
expect(await iterator1.next()).toEqual({
value: 'C', // Return value
done: true,
});
expect(await iterator1.next()).toEqual({
value: undefined,
done: true,
});

expect(await iterator2.next()).toEqual({
value: 'F', // Return value
done: true,
});

// Multi-shot iterables should be able to do the same thing again
const iterator3 = result.multiShotIterable[Symbol.asyncIterator]();

expect(iterator3).not.toBe(iterator1);

// We should be able to iterate over the iterable again and it should be
// synchronously available using instrumented promises so that React can
// rerender it synchronously.
expect(iterator3.next().value).toEqual({
value: {hello: 'A'},
done: false,
});
expect(iterator3.next().value).toEqual({
value: {hi: 'B'},
done: false,
});
expect(iterator3.next().value).toEqual({
value: 'C', // Return value
done: true,
});
expect(iterator3.next().value).toEqual({
value: undefined,
done: true,
});

expect(() => iterator3.next('this is not allowed')).toThrow(
'Values cannot be passed to next() of AsyncIterables passed to Client Components.',
);
});
});

0 comments on commit 2f246f4

Please sign in to comment.