import type { FlightRouterState } from './app-render' import { nonNullable } from '../lib/non-nullable' export type ReactReadableStream = ReadableStream & { allReady?: Promise | undefined } export function encodeText(input: string) { return new TextEncoder().encode(input) } export function decodeText(input?: Uint8Array, textDecoder?: TextDecoder) { return textDecoder ? textDecoder.decode(input, { stream: true }) : new TextDecoder().decode(input) } export function readableStreamTee( readable: ReadableStream ): [ReadableStream, ReadableStream] { const transformStream = new TransformStream() const transformStream2 = new TransformStream() const writer = transformStream.writable.getWriter() const writer2 = transformStream2.writable.getWriter() const reader = readable.getReader() function read() { reader.read().then(({ done, value }) => { if (done) { writer.close() writer2.close() return } writer.write(value) writer2.write(value) read() }) } read() return [transformStream.readable, transformStream2.readable] } export function chainStreams( streams: ReadableStream[] ): ReadableStream { const { readable, writable } = new TransformStream() let promise = Promise.resolve() for (let i = 0; i < streams.length; ++i) { promise = promise.then(() => streams[i].pipeTo(writable, { preventClose: i + 1 < streams.length }) ) } return readable } export function streamFromArray(strings: string[]): ReadableStream { // Note: we use a TransformStream here instead of instantiating a ReadableStream // because the built-in ReadableStream polyfill runs strings through TextEncoder. const { readable, writable } = new TransformStream() const writer = writable.getWriter() strings.forEach((str) => writer.write(encodeText(str))) writer.close() return readable } export async function streamToString( stream: ReadableStream ): Promise { const reader = stream.getReader() const textDecoder = new TextDecoder() let bufferedString = '' while (true) { const { done, value } = await reader.read() if (done) { return bufferedString } bufferedString += decodeText(value, textDecoder) } } export function createBufferedTransformStream( transform: (v: string) => string | Promise = (v) => v ): TransformStream { let bufferedString = '' let pendingFlush: Promise | null = null const flushBuffer = (controller: TransformStreamDefaultController) => { if (!pendingFlush) { pendingFlush = new Promise((resolve) => { setTimeout(async () => { const buffered = await transform(bufferedString) controller.enqueue(encodeText(buffered)) bufferedString = '' pendingFlush = null resolve() }, 0) }) } return pendingFlush } const textDecoder = new TextDecoder() return new TransformStream({ transform(chunk, controller) { bufferedString += decodeText(chunk, textDecoder) flushBuffer(controller) }, flush() { if (pendingFlush) { return pendingFlush } }, }) } export function createInsertedHTMLStream( getServerInsertedHTML: () => Promise ): TransformStream { return new TransformStream({ async transform(chunk, controller) { const insertedHTMLChunk = encodeText(await getServerInsertedHTML()) controller.enqueue(insertedHTMLChunk) controller.enqueue(chunk) }, }) } export function renderToInitialStream({ ReactDOMServer, element, streamOptions, }: { ReactDOMServer: any element: React.ReactElement streamOptions?: any }): Promise { return ReactDOMServer.renderToReadableStream(element, streamOptions) } export function createHeadInjectionTransformStream( inject: () => Promise ): TransformStream { let injected = false return new TransformStream({ async transform(chunk, controller) { const content = decodeText(chunk) let index if (!injected && (index = content.indexOf(', // but wait for the major chunks to be enqueued. export function createDeferredSuffixStream( suffix: string ): TransformStream { let suffixFlushed = false let suffixFlushTask: Promise | null = null return new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk) if (!suffixFlushed && suffix) { suffixFlushed = true suffixFlushTask = new Promise((res) => { // NOTE: streaming flush // Enqueue suffix part before the major chunks are enqueued so that // suffix won't be flushed too early to interrupt the data stream setTimeout(() => { controller.enqueue(encodeText(suffix)) res() }) }) } }, flush(controller) { if (suffixFlushTask) return suffixFlushTask if (!suffixFlushed && suffix) { suffixFlushed = true controller.enqueue(encodeText(suffix)) } }, }) } export function createInlineDataStream( dataStream: ReadableStream ): TransformStream { let dataStreamFinished: Promise | null = null return new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk) if (!dataStreamFinished) { const dataStreamReader = dataStream.getReader() // NOTE: streaming flush // We are buffering here for the inlined data stream because the // "shell" stream might be chunkenized again by the underlying stream // implementation, e.g. with a specific high-water mark. To ensure it's // the safe timing to pipe the data stream, this extra tick is // necessary. dataStreamFinished = new Promise((res) => setTimeout(async () => { try { while (true) { const { done, value } = await dataStreamReader.read() if (done) { return res() } controller.enqueue(value) } } catch (err) { controller.error(err) } res() }, 0) ) } }, flush() { if (dataStreamFinished) { return dataStreamFinished } }, }) } export function createSuffixStream( suffix: string ): TransformStream { return new TransformStream({ flush(controller) { if (suffix) { controller.enqueue(encodeText(suffix)) } }, }) } export function createRootLayoutValidatorStream( assetPrefix = '', getTree: () => FlightRouterState ): TransformStream { let foundHtml = false let foundHead = false let foundBody = false return new TransformStream({ async transform(chunk, controller) { if (!foundHtml || !foundHead || !foundBody) { const content = decodeText(chunk) if (!foundHtml && content.includes(' 0) { controller.enqueue( encodeText( `` ) ) } }, }) } export async function continueFromInitialStream( renderStream: ReactReadableStream, { suffix, dataStream, generateStaticHTML, getServerInsertedHTML, serverInsertedHTMLToHead, validateRootLayout, }: { suffix?: string dataStream?: ReadableStream generateStaticHTML: boolean getServerInsertedHTML?: () => Promise serverInsertedHTMLToHead: boolean validateRootLayout?: { assetPrefix?: string getTree: () => FlightRouterState } } ): Promise> { const closeTag = '' const suffixUnclosed = suffix ? suffix.split(closeTag)[0] : null if (generateStaticHTML) { await renderStream.allReady } const transforms: Array> = [ createBufferedTransformStream(), getServerInsertedHTML && !serverInsertedHTMLToHead ? createInsertedHTMLStream(getServerInsertedHTML) : null, suffixUnclosed != null ? createDeferredSuffixStream(suffixUnclosed) : null, dataStream ? createInlineDataStream(dataStream) : null, suffixUnclosed != null ? createSuffixStream(closeTag) : null, createHeadInjectionTransformStream(async () => { // TODO-APP: Insert server side html to end of head in app layout rendering, to avoid // hydration errors. Remove this once it's ready to be handled by react itself. const serverInsertedHTML = getServerInsertedHTML && serverInsertedHTMLToHead ? await getServerInsertedHTML() : '' return serverInsertedHTML }), validateRootLayout ? createRootLayoutValidatorStream( validateRootLayout.assetPrefix, validateRootLayout.getTree ) : null, ].filter(nonNullable) return transforms.reduce( (readable, transform) => readable.pipeThrough(transform), renderStream ) }