turbopack: Implement streamed middleware (#47264)

Fun! This depends on https://github.com/vercel/turbo/pull/4251 to
implement streamed Node evaluations, giving us the ability to support
streamed middleware responses.

This is just the first step to supporting RSC streaming in Turbopack. I
chose to start with this because it requires all the same base logic,
and I understand the full router->middleware->HTTP server code path, so
it's a lot easier to work on.

Fixes WEB-738
This commit is contained in:
Justin Ridgewell 2023-03-23 00:04:33 -04:00 committed by GitHub
parent 9150620993
commit a5dfe46cca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 801 additions and 584 deletions

File diff suppressed because it is too large Load diff

View file

@ -46,37 +46,38 @@ swc_emotion = { version = "0.29.10" }
testing = { version = "0.31.31" }
# Turbo crates
auto-hash-map = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
node-file-trace = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
swc-ast-explorer = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-malloc = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2", default-features = false }
turbo-tasks = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-build = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-env = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-fetch = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2", default-features = false }
turbo-tasks-fs = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-hash = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-macros = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-macros-shared = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-memory = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-tasks-testing = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbo-updater = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-cli-utils = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-core = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-create-test-app = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-css = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-dev = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-dev-server = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-ecmascript = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-env = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-json = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-mdx = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-node = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-static = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-swc-utils = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-test-utils = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
turbopack-tests = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230322.2" }
auto-hash-map = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
node-file-trace = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
swc-ast-explorer = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-malloc = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1", default-features = false }
turbo-tasks = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-build = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-bytes = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-env = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-fetch = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1", default-features = false }
turbo-tasks-fs = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-hash = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-macros = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-macros-shared = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-memory = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-tasks-testing = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbo-updater = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-cli-utils = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-core = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-create-test-app = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-css = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-dev = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-dev-server = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-ecmascript = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-env = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-json = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-mdx = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-node = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-static = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-swc-utils = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-test-utils = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
turbopack-tests = { git = "https://github.com/vercel/turbo.git", tag = "turbopack-230323.1" }
# General Deps

View file

@ -11,6 +11,7 @@ bench = false
[dependencies]
anyhow = { workspace = true }
auto-hash-map = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
indoc = { workspace = true }
mime = { workspace = true }
@ -20,6 +21,7 @@ regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
turbo-tasks = { workspace = true }
turbo-tasks-bytes = { workspace = true }
turbo-tasks-env = { workspace = true }
turbo-tasks-fetch = { workspace = true }
turbo-tasks-fs = { workspace = true }

View file

@ -0,0 +1,4 @@
{
"singleQuote": false,
"semi": true
}

View file

@ -9,7 +9,10 @@ import { PHASE_DEVELOPMENT_SERVER } from "next/dist/shared/lib/constants";
import "next/dist/server/node-polyfill-fetch.js";
// @ts-expect-error internal package is injected by Rust
import middlewareChunkGroup from "MIDDLEWARE_CHUNK_GROUP";
// @ts-expect-error internal package is injected by Rust
import middlewareConfig from "MIDDLEWARE_CONFIG";
type RouterRequest = {
@ -30,17 +33,13 @@ type RouteResult =
};
type IpcOutgoingMessage = {
type: "jsonValue";
data: string;
type: "value";
data: string | Buffer;
};
type MessageData =
| { type: "middleware-headers"; data: MiddlewareHeadersResponse }
| { type: "middleware-body"; data: Uint8Array }
| {
type: "full-middleware";
data: { headers: MiddlewareHeadersResponse; body: number[] };
}
| {
type: "rewrite";
data: RewriteResponse;
@ -75,8 +74,10 @@ async function getResolveRoute(
);
return await makeResolver(dir, nextConfig, {
files: middlewareChunkGroup.filter((f) => /\.[mc]?js$/.test(f)),
matcher: middlewareConfig.matcher,
files: (middlewareChunkGroup as string[]).filter((f) =>
/\.[mc]?js$/.test(f)
),
matcher: (middlewareConfig as { matcher: string[] }).matcher,
});
}
@ -136,9 +137,9 @@ export default async function route(
}
async function handleClientResponse(
_ipc: Ipc<RouterRequest, IpcOutgoingMessage>,
ipc: Ipc<RouterRequest, IpcOutgoingMessage>,
clientResponse: IncomingMessage
): Promise<MessageData> {
): Promise<MessageData | void> {
if (clientResponse.headers["x-nextjs-route-result"] === "1") {
clientResponse.setEncoding("utf8");
// We're either a redirect or a rewrite
@ -171,31 +172,21 @@ async function handleClientResponse(
headers: toPairs(clientResponse.rawHeaders),
};
// TODO: support streaming middleware
// ipc.send({
// type: "jsonValue",
// data: JSON.stringify({
// type: "middleware-headers",
// data: responseHeaders,
// }),
// });
// ipc.send({
// type: "jsonValue",
// data: JSON.stringify({
// type: "middleware-body",
// data: chunk as Buffer,
// }),
// });
ipc.send({
type: "value",
data: JSON.stringify({
type: "middleware-headers",
data: responseHeaders,
}),
});
const buffers = [];
for await (const chunk of clientResponse) {
buffers.push(chunk as Buffer);
ipc.send({
type: "value",
data: JSON.stringify({
type: "middleware-body",
data: (chunk as Buffer).toJSON().data,
}),
});
}
return {
type: "full-middleware",
data: {
headers: responseHeaders,
body: Buffer.concat(buffers).toJSON().data,
},
};
}

View file

@ -40,6 +40,7 @@ pub use web_entry_source::create_web_entry_source;
pub fn register() {
turbo_tasks::register();
turbo_tasks_bytes::register();
turbo_tasks_fs::register();
turbo_tasks_fetch::register();
turbopack_dev::register();

View file

@ -7,8 +7,9 @@ use turbo_tasks::{
trace::TraceRawVcs,
CompletionVc, Value,
};
use turbo_tasks_bytes::stream::SingleValue;
use turbo_tasks_env::EnvMapVc;
use turbo_tasks_fs::{json::parse_json_rope_with_source_context, FileSystemPathVc};
use turbo_tasks_fs::{json::parse_json_with_source_context, FileSystemPathVc};
use turbopack::evaluate_context::node_evaluate_asset_context;
use turbopack_core::{
asset::Asset,
@ -29,7 +30,7 @@ use turbopack_ecmascript::{
EcmascriptInputTransformsVc, EcmascriptModuleAssetType, EcmascriptModuleAssetVc,
};
use turbopack_node::{
evaluate::{evaluate, JavaScriptValue},
evaluate::evaluate,
execution_context::{ExecutionContext, ExecutionContextVc},
transforms::webpack::{WebpackLoaderConfigItems, WebpackLoaderConfigItemsVc},
};
@ -602,18 +603,13 @@ pub async fn load_next_config_internal(
/* debug */ false,
)
.await?;
match &*config_value {
JavaScriptValue::Value(val) => {
let next_config: NextConfig = parse_json_rope_with_source_context(val)?;
let next_config = next_config.cell();
Ok(next_config)
}
JavaScriptValue::Error => Ok(NextConfig::default().cell()),
JavaScriptValue::Stream(_) => {
unimplemented!("Stream not supported now");
}
}
let SingleValue::Single(Ok(val)) = config_value.into_single().await else {
return Ok(NextConfig::default().cell());
};
let next_config: NextConfig = parse_json_with_source_context(val.to_str()?)?;
Ok(next_config.cell())
}
#[turbo_tasks::function]

View file

@ -377,7 +377,7 @@ async fn get_mock_stylesheet(
use turbo_tasks::{CompletionVc, Value};
use turbo_tasks_env::{CommandLineProcessEnvVc, ProcessEnv};
use turbo_tasks_fs::{
json::parse_json_rope_with_source_context, DiskFileSystemVc, File, FileSystem,
json::parse_json_with_source_context, DiskFileSystemVc, File, FileSystem,
};
use turbopack::evaluate_context::node_evaluate_asset_context;
use turbopack_core::{context::AssetContext, ident::AssetIdentVc};
@ -452,7 +452,7 @@ async fn get_mock_stylesheet(
match &*val {
JavaScriptValue::Value(val) => {
let mock_map: HashMap<String, Option<String>> =
parse_json_rope_with_source_context(val)?;
parse_json_with_source_context(&val.to_str()?)?;
Ok((mock_map.get(url).context("url not found")?).clone())
}
JavaScriptValue::Error => panic!("Unexpected error evaluating JS"),

View file

@ -1,4 +1,5 @@
use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use futures::StreamExt;
use indexmap::indexmap;
use serde::Deserialize;
use serde_json::json;
@ -6,9 +7,8 @@ use turbo_tasks::{
primitives::{JsonValueVc, StringsVc},
CompletionVc, CompletionsVc, Value,
};
use turbo_tasks_fs::{
json::parse_json_rope_with_source_context, to_sys_path, File, FileSystemPathVc,
};
use turbo_tasks_bytes::{Bytes, Stream};
use turbo_tasks_fs::{json::parse_json_with_source_context, to_sys_path, File, FileSystemPathVc};
use turbopack::{evaluate_context::node_evaluate_asset_context, transition::TransitionsByNameVc};
use turbopack_core::{
asset::AssetVc,
@ -29,7 +29,7 @@ use turbopack_ecmascript::{
EcmascriptModuleAssetVc, InnerAssetsVc, OptionEcmascriptModuleAssetVc,
};
use turbopack_node::{
evaluate::{evaluate, JavaScriptValue},
evaluate::evaluate,
execution_context::{ExecutionContext, ExecutionContextVc},
StructuredError,
};
@ -98,58 +98,36 @@ pub struct MiddlewareHeadersResponse {
#[turbo_tasks::value(shared)]
#[derive(Debug, Clone, Default)]
pub struct MiddlewareBodyResponse(pub Vec<u8>);
pub struct MiddlewareBodyResponse(Bytes);
#[derive(Deserialize, Debug)]
#[serde(tag = "type", rename_all = "kebab-case")]
enum RouterIncomingMessage {
Rewrite { data: RewriteResponse },
MiddlewareHeaders { data: MiddlewareHeadersResponse },
MiddlewareBody { data: Vec<u8> },
None,
Error(StructuredError),
}
#[turbo_tasks::value(shared)]
#[derive(Debug, Clone, Default)]
pub struct FullMiddlewareResponse {
pub headers: MiddlewareHeadersResponse,
pub body: Vec<u8>,
}
#[derive(Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
enum RouterIncomingMessage {
Rewrite {
data: RewriteResponse,
},
// TODO: Implement
#[allow(dead_code)]
MiddlewareHeaders {
data: MiddlewareHeadersResponse,
},
// TODO: Implement
#[allow(dead_code)]
MiddlewareBody {
data: MiddlewareBodyResponse,
},
FullMiddleware {
data: FullMiddlewareResponse,
},
None,
Error(StructuredError),
pub struct MiddlewareResponse {
pub status_code: u16,
pub headers: Vec<(String, String)>,
#[turbo_tasks(trace_ignore)]
pub body: Stream<Result<Bytes, String>>,
}
#[derive(Debug)]
#[turbo_tasks::value]
pub enum RouterResult {
Rewrite(RewriteResponse),
FullMiddleware(FullMiddlewareResponse),
Middleware(MiddlewareResponse),
None,
Error,
}
impl From<RouterIncomingMessage> for RouterResult {
fn from(value: RouterIncomingMessage) -> Self {
match value {
RouterIncomingMessage::Rewrite { data } => Self::Rewrite(data),
RouterIncomingMessage::FullMiddleware { data } => Self::FullMiddleware(data),
RouterIncomingMessage::None => Self::None,
_ => Self::Error,
}
}
}
#[turbo_tasks::function]
async fn get_config(
context: AssetContextVc,
@ -398,14 +376,55 @@ async fn route_internal(
)
.await?;
match &*result {
JavaScriptValue::Value(val) => {
let result: RouterIncomingMessage = parse_json_rope_with_source_context(val)?;
Ok(RouterResult::from(result).cell())
let mut read = result.read();
let Some(Ok(first)) = read.next().await else {
return Ok(RouterResult::Error.cell());
};
let first: RouterIncomingMessage = parse_json_with_source_context(first.to_str()?)?;
let (res, read) = match first {
RouterIncomingMessage::Rewrite { data } => (RouterResult::Rewrite(data), Some(read)),
RouterIncomingMessage::MiddlewareHeaders { data } => {
// The double encoding here is annoying. It'd be a lot nicer if we could embed
// a buffer directly into the IPC message without having to wrap it in an
// object.
let body = read.map(|data| {
let chunk: RouterIncomingMessage = match data?
.to_str()
.context("error decoding string")
.and_then(parse_json_with_source_context)
{
Ok(c) => c,
Err(e) => return Err(e.to_string()),
};
match chunk {
RouterIncomingMessage::MiddlewareBody { data } => Ok(Bytes::from(data)),
m => Err(format!("unexpected message type: {:#?}", m)),
}
});
let middleware = MiddlewareResponse {
status_code: data.status_code,
headers: data.headers,
body: Stream::from(body),
};
(RouterResult::Middleware(middleware), None)
}
JavaScriptValue::Error => Ok(RouterResult::Error.cell()),
JavaScriptValue::Stream(_) => {
unimplemented!("Stream not supported now");
RouterIncomingMessage::None => (RouterResult::None, Some(read)),
_ => (RouterResult::Error, Some(read)),
};
// Middleware will naturally drain the full stream, but the rest only take a
// single item. In order to free the NodeJsOperation, we must pull another
// value out of the stream.
if let Some(mut read) = read {
if let Some(v) = read.next().await {
bail!("unexpected message type: {:#?}", v);
}
}
Ok(res.cell())
}

View file

@ -1,4 +1,5 @@
use anyhow::{anyhow, bail, Context, Result};
use futures::stream::StreamExt;
use indexmap::IndexSet;
use turbo_tasks::{primitives::StringVc, CompletionVc, CompletionsVc, Value};
use turbopack_core::{
@ -6,7 +7,7 @@ use turbopack_core::{
introspect::{Introspectable, IntrospectableChildrenVc, IntrospectableVc},
};
use turbopack_dev_server::source::{
ContentSource, ContentSourceContent, ContentSourceData, ContentSourceDataVary,
Body, BodyError, ContentSource, ContentSourceContent, ContentSourceData, ContentSourceDataVary,
ContentSourceResultVc, ContentSourceVc, HeaderListVc, NeededData, ProxyResult, RewriteBuilder,
};
use turbopack_node::execution_context::ExecutionContextVc;
@ -145,12 +146,19 @@ impl ContentSource for NextRouterContentSource {
ContentSourceContent::Rewrite(rewrite.build()).cell().into(),
)
}
RouterResult::FullMiddleware(data) => ContentSourceResultVc::exact(
RouterResult::Middleware(data) => ContentSourceResultVc::exact(
ContentSourceContent::HttpProxy(
ProxyResult {
status: data.headers.status_code,
headers: data.headers.headers.clone(),
body: data.body.clone().into(),
status: data.status_code,
headers: data.headers.clone(),
body: Body::from_stream(data.body.read().map(|chunk| {
chunk.map_err(|e| {
BodyError::new(format!(
"error streaming proxied contents: {}",
e.as_str()
))
})
})),
}
.cell(),
)

View file

@ -0,0 +1,16 @@
/** @type {import('next').NextConfig} */
module.exports = {
async headers() {
return [
{
source: "/foo",
headers: [
{
key: "x-foo",
value: "bar",
},
],
},
];
},
};

View file

@ -0,0 +1,3 @@
export default function Foo() {
return "check x-foo header";
}

View file

@ -0,0 +1,17 @@
import { useEffect } from "react";
export default function Foo() {
useEffect(() => {
// Only run on client
import("@turbo/pack-test-harness").then(runTests);
});
return "index";
}
function runTests() {
it("should set header onto response", async () => {
const res = await fetch("/foo");
expect(res.headers.get("x-foo")).toBe("bar");
});
}

View file

@ -0,0 +1,35 @@
import { NextResponse } from 'next/server'
import type { NextRequest } from 'next/server'
function iteratorToStream<T>(iterator: AsyncIterator<T>) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
},
})
}
function sleep(ms: number) {
return new Promise((r) => setTimeout(r, ms))
}
async function* count(n: number) {
for (let i = 0; i < n; i++) {
yield String(i)
await sleep(100)
}
}
export function middleware(_request: NextRequest) {
return new NextResponse(iteratorToStream(count(10)))
}
export const config = {
matcher: '/stream',
}

View file

@ -0,0 +1,2 @@
/** @type {import('next').NextConfig} */
module.exports = {}

View file

@ -0,0 +1,46 @@
import { useEffect } from 'react'
export default function Foo() {
useEffect(() => {
// Only run on client
import('@turbo/pack-test-harness').then(runTests)
})
return 'index'
}
function runTests() {
it('should stream middleware response from node', async () => {
let start = Date.now()
const res = await fetch('/stream')
const reader = res.body.getReader()
const decoder = new TextDecoder()
let data = ''
let first = true
while (true) {
// we're only testing the timing
const { value, done } = await reader.read()
console.log({ data })
if (first) {
first = false
// The body still stream for 1 second, we just want the first chunk
// to be delivered within 500ms.
expect(Date.now()).toBeGreaterThan(start + 50)
expect(Date.now()).toBeLessThan(start + 500)
expect(done).toBe(false)
}
if (value) {
data += decoder.decode(value, { stream: !done })
}
if (done) break
}
expect(data).toBe('0123456789')
const second = await fetch('/stream').then((r) => r.text())
expect(data).toBe(second)
})
}