Skip to content

Commit

Permalink
[Flight Reply] Encode binary streams as a single collapsed Blob (#28986)
Browse files Browse the repository at this point in the history
Based on #28893.

For other streams we encode each chunk as a separate form field which is
a bit bloated. Especially for binary chunks since they also have an
indirection. We need some way to encode the chunks as separate anyway.
This way the streaming using busboy actually allows each chunk to stream
in over the network one at a time.

For binary streams the actual chunking is not important. The chunks can
be split and recombined in whatever size chunk makes sense.

Since we buffer the entire content anyway we can combine the chunks to
be consecutive. This PR does that with binary streams and also combine
them into a single Blob. That way there's no extra overhead when passing
through a binary stream.

Ideally, we'd be able to just use the stream from that one Blob but
Node.js doesn't return byob streams from Blob. Additionally, we don't
actually stream the content of Blobs due to the layering with busboy
atm. We could do that for binary streams in particular by replacing the
File layering with a stream and resolving each chunk as it comes in.
That could be a follow up.

If we stop buffering in the future, this set up still allows us to split
them and send other form fields in between while blocked since the
protocol is still the same.
  • Loading branch information
sebmarkbage committed May 8, 2024
1 parent e7d213d commit 826bf4e
Showing 1 changed file with 49 additions and 16 deletions.
65 changes: 49 additions & 16 deletions packages/react-client/src/ReactFlightReplyClient.js
Expand Up @@ -208,7 +208,7 @@ export function processReply(
return '$' + tag + blobId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
function serializeBinaryReader(reader: any): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
Expand All @@ -218,23 +218,43 @@ export function processReply(
pendingParts++;
const streamId = nextPartId++;

// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. It also implies that different chunks can be split up or merged as opposed
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
// received in the same slices.
// $FlowFixMe: This is a Node.js extension.
let supportsBYOB: void | boolean = stream.supportsBYOB;
if (supportsBYOB === undefined) {
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
stream.getReader({mode: 'byob'}).releaseLock();
supportsBYOB = true;
} catch (x) {
supportsBYOB = false;
const buffer = [];

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
const blobId = nextPartId++;
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + blobId, new Blob(buffer));
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(
formFieldPrefix + streamId,
'"$o' + blobId.toString(16) + '"',
);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
buffer.push(entry.value);
reader.read(new Uint8Array(1024)).then(progress, reject);
}
}
reader.read(new Uint8Array(1024)).then(progress, reject);

const reader = stream.getReader();
return '$r' + streamId.toString(16);
}

function serializeReader(reader: ReadableStreamReader): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
Expand All @@ -258,7 +278,20 @@ export function processReply(
}
reader.read().then(progress, reject);

return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
return '$R' + streamId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. For binary streams, we serialize them as plain Blobs.
let binaryReader;
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
binaryReader = stream.getReader({mode: 'byob'});
} catch (x) {
return serializeReader(stream.getReader());
}
return serializeBinaryReader(binaryReader);
}

function serializeAsyncIterable(
Expand Down

0 comments on commit 826bf4e

Please sign in to comment.