Add support for API routes (vercel/turbo#163)

This reworks our Node.js rendering logic to allow for:
* passing binary request bodies in and out of Node.js: API routes can
accept request bodies in POST, and can reply with any content type.
* content source results that are recomputed every time: we don't want
API routes to be cached (at least not by turbopack for now).

It also reworks the `END_OF_OPERATION` logic to avoid hard coding a
single end of operation marker (they're now unique per pool), and allow
multiple kinds of operation events (`Step`, `Success`, `Error`).

This is not a particularly good implementation for this. A better
implementation would proxy the request from the client to the Node.js
server in a more direct manner. We also need a way to tell turbo tasks
"don't bother ever caching this", as right now every single API request
and result will still be cached, even if we know they will never be used
again. Finally, we should communicate with Node.js processes via a
better mechanism than stdout. I made some progress on a prototype for
using https://github.com/servo/ipc-channel with NAPI a while back, which
I'd like to pick up some time after conf.

However, it seems to work well enough for now, so yay.

Co-authored-by: Tobias Koppers <tobias.koppers@googlemail.com>
This commit is contained in:
Alex Kirszenberg 2022-10-21 21:03:52 +02:00 committed by GitHub
parent 06cc52ecbc
commit 243bc9fbc0
10 changed files with 897 additions and 120 deletions

View file

@ -13,6 +13,7 @@ anyhow = "1.0.47"
futures = "0.3.21"
indexmap = { workspace = true, features = ["serde"] }
mime = "0.3.16"
rand = "0.8.5"
regex = "1.6.0"
serde = "1.0.136"
serde_json = "1.0.85"
@ -21,6 +22,7 @@ tokio = { version = "1.11.0", features = ["full"] }
turbo-tasks = { path = "../turbo-tasks" }
turbo-tasks-env = { path = "../turbo-tasks-env" }
turbo-tasks-fs = { path = "../turbo-tasks-fs" }
turbo-tasks-hash = { path = "../turbo-tasks-hash" }
turbopack = { path = "../turbopack" }
turbopack-core = { path = "../turbopack-core" }
turbopack-dev-server = { path = "../turbopack-dev-server" }

View file

@ -0,0 +1,337 @@
import type { ClientRequest, IncomingMessage, Server } from "node:http";
import http, { ServerResponse } from "node:http";
import type { AddressInfo, Socket } from "node:net";
import { Buffer } from "node:buffer";
import "next/dist/server/node-polyfill-fetch.js";
import * as allExports from ".";
import { NextParsedUrlQuery } from "next/dist/server/request-meta";
import { apiResolver } from "next/dist/server/api-utils/node";
const [MARKER, OPERATION_STEP, OPERATION_SUCCESS, OPERATION_ERROR] =
process.argv.slice(2, 6).map((arg) => Buffer.from(arg, "utf8"));
const NEW_LINE = "\n".charCodeAt(0);
const OPERATION_STEP_MARKER = Buffer.concat([
OPERATION_STEP,
Buffer.from(" ", "utf8"),
MARKER,
]);
const OPERATION_SUCCESS_MARKER = Buffer.concat([
OPERATION_SUCCESS,
Buffer.from(" ", "utf8"),
MARKER,
]);
const OPERATION_ERROR_MARKER = Buffer.concat([
OPERATION_ERROR,
Buffer.from(" ", "utf8"),
MARKER,
]);
process.stdout.write("READY\n");
function bufferEndsWith(buffer: Buffer, suffix: Buffer): boolean {
if (buffer.length < suffix.length) {
return false;
}
return buffer.subarray(buffer.length - suffix.length).equals(suffix);
}
function readStep(buffer: Buffer): { data: Buffer; remaining: Buffer } | null {
let startLineIdx = 0;
let endLineIdx = buffer.indexOf(NEW_LINE);
while (endLineIdx !== -1) {
let considering = buffer.subarray(startLineIdx, endLineIdx);
if (considering.equals(OPERATION_STEP_MARKER)) {
return {
data: buffer.subarray(
0,
// Remove the newline character right before the marker.
startLineIdx === 0 ? 0 : startLineIdx - 1
),
remaining: buffer.subarray(endLineIdx + 1),
};
}
// Consider the next line.
startLineIdx = endLineIdx + 1;
endLineIdx = buffer.indexOf(NEW_LINE, startLineIdx);
}
return null;
}
type State = "headers" | "body" | "done";
let readState: State = "headers";
let buffer: Buffer = Buffer.from([]);
let operationPromise: Promise<Operation> | null = null;
process.stdin.on("data", async (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
let step = readStep(buffer);
while (step != null) {
switch (readState) {
case "headers": {
readState = "body";
const renderData = JSON.parse(step.data.toString("utf-8"));
operationPromise = createOperation(renderData);
break;
}
case "body": {
readState = "headers";
const body = step.data;
endOperation(operationPromise!, body);
break;
}
}
buffer = step.remaining;
step = readStep(step.remaining);
}
});
type RenderData = {
method: string;
params: Record<string, string>;
path: string;
query: NextParsedUrlQuery;
};
type ResponseHeaders = {
status: number;
headers: string[];
};
function writeEventMarker(eventMarker: Buffer) {
process.stdout.write("\n");
process.stdout.write(eventMarker);
process.stdout.write("\n");
}
function writeStep(data: Buffer | string) {
process.stdout.write(data);
writeEventMarker(OPERATION_STEP_MARKER);
}
function writeSuccess(data: Buffer | string) {
process.stdout.write(data);
writeEventMarker(OPERATION_SUCCESS_MARKER);
}
function writeError(error: string) {
process.stdout.write(error);
writeEventMarker(OPERATION_ERROR_MARKER);
}
type Operation = {
clientRequest: ClientRequest;
apiOperation: Promise<void>;
server: Server;
};
async function createOperation(renderData: RenderData): Promise<Operation> {
const server = await createServer();
const {
clientRequest,
clientResponsePromise,
serverRequest,
serverResponse,
} = await makeRequest(server, renderData.method, renderData.path);
const query = { ...renderData.query, ...renderData.params };
clientResponsePromise.then((clientResponse) =>
handleClientResponse(server, clientResponse)
);
return {
clientRequest,
server,
apiOperation: apiResolver(
serverRequest,
serverResponse,
query,
allExports,
{
previewModeId: "",
previewModeEncryptionKey: "",
previewModeSigningKey: "",
},
false,
true,
renderData.path
),
};
}
function handleClientResponse(server: Server, clientResponse: IncomingMessage) {
const responseData: Buffer[] = [];
let responseHeaders: ResponseHeaders = {
status: clientResponse.statusCode!,
headers: clientResponse.rawHeaders,
};
writeStep(JSON.stringify(responseHeaders));
clientResponse.on("data", (chunk) => {
responseData.push(chunk);
});
clientResponse.once("end", () => {
writeSuccess(Buffer.concat(responseData));
server.close();
});
clientResponse.once("error", (err) => {
// TODO(alexkirsz) We need to ensure that we haven't already written an error in `endOperation`.
writeError(err.stack ?? "an unknown error occurred");
server.close();
});
}
/**
* Ends an operation by writing the response body to the client and waiting for the Next.js API resolver to finish.
*/
async function endOperation(
operationPromise: Promise<Operation>,
body: Buffer
) {
const operation = await operationPromise;
operation.clientRequest.end(body);
try {
await operation.apiOperation;
} catch (error) {
if (
error instanceof Error ||
(error != null && (error as any).stack != null)
) {
const stack = (error as any).stack as string | null;
if (stack != null) {
writeError(stack);
operation.server.close();
}
} else {
writeError("an unknown error occurred");
operation.server.close();
}
return;
}
}
/**
* Creates a server that listens a random port.
*/
function createServer(): Promise<Server> {
return new Promise((resolve, reject) => {
const server = http.createServer();
server.listen(0, () => {
resolve(server);
});
});
}
/**
* Creates a request to a server, and returns the (req, res) pairs from both
* the client's and server's perspective.
*/
function makeRequest(
server: Server,
method: string,
path: string
): Promise<{
clientRequest: ClientRequest;
clientResponsePromise: Promise<IncomingMessage>;
serverRequest: IncomingMessage;
serverResponse: ServerResponse<IncomingMessage>;
}> {
return new Promise((resolve, reject) => {
let clientRequest: ClientRequest | null = null;
let clientResponseResolve: (value: IncomingMessage) => void;
let clientResponseReject: (error: Error) => void;
let clientResponsePromise = new Promise<IncomingMessage>(
(resolve, reject) => {
clientResponseResolve = resolve;
clientResponseReject = reject;
}
);
let serverRequest: IncomingMessage | null = null;
let serverResponse: ServerResponse<IncomingMessage> | null = null;
const maybeResolve = () => {
if (
clientRequest != null &&
serverRequest != null &&
serverResponse != null
) {
cleanup();
resolve({
clientRequest,
clientResponsePromise,
serverRequest,
serverResponse,
});
}
};
const cleanup = () => {
server.removeListener("error", errorListener);
server.removeListener("request", requestListener);
};
const errorListener = (err: Error) => {
cleanup();
reject(err);
};
const requestListener = (
req: IncomingMessage,
res: ServerResponse<IncomingMessage>
) => {
serverRequest = req;
serverResponse = res;
maybeResolve();
};
const cleanupClientResponse = () => {
if (clientRequest != null) {
clientRequest.removeListener("response", responseListener);
clientRequest.removeListener("error", clientResponseErrorListener);
}
};
const clientResponseErrorListener = (err: Error) => {
cleanupClientResponse();
clientResponseReject(err);
};
const responseListener = (res: IncomingMessage) => {
cleanupClientResponse();
clientResponseResolve(res);
};
server.once("request", requestListener);
server.once("error", errorListener);
const address = server.address() as AddressInfo;
clientRequest = http.request({
method,
path,
host: "localhost",
port: address.port,
});
// Otherwise Node.js waits for the first chunk of data to be written before sending the request.
clientRequest.flushHeaders();
clientRequest.once("response", responseListener);
clientRequest.once("error", clientResponseErrorListener);
});
}

View file

@ -16,8 +16,15 @@ import chunkGroup from ".";
import type { BuildManifest } from "next/dist/server/get-page-files";
import type { ChunkGroup } from "types/next";
const END_OF_OPERATION = process.argv[2];
const [MARKER, _OPERATION_STEP, OPERATION_SUCCESS, _OPERATION_ERROR] =
process.argv.slice(2, 6).map((arg) => Buffer.from(arg, "utf8"));
const NEW_LINE = "\n".charCodeAt(0);
const OPERATION_SUCCESS_MARKER = Buffer.concat([
OPERATION_SUCCESS,
Buffer.from(" ", "utf8"),
MARKER,
]);
process.stdout.write("READY\n");
@ -34,7 +41,7 @@ process.stdin.on("data", async (data) => {
} catch (e: any) {
console.log(`ERROR=${JSON.stringify(e.stack)}`);
}
console.log(END_OF_OPERATION);
console.log(OPERATION_SUCCESS_MARKER.toString("utf8"));
data = data.slice(idx + 1);
idx = data.indexOf(NEW_LINE);
}

View file

@ -49,7 +49,7 @@ use crate::{
get_server_environment, get_server_module_options_context,
get_server_resolve_options_context, ServerContextType,
},
nodejs::{create_node_rendered_source, NodeRenderer, NodeRendererVc},
nodejs::{create_node_rendered_source, NodeEntry, NodeEntryVc},
util::regular_expression_for_path,
};
@ -328,9 +328,9 @@ struct AppRenderer {
}
#[turbo_tasks::value_impl]
impl NodeRenderer for AppRenderer {
impl NodeEntry for AppRenderer {
#[turbo_tasks::function]
async fn module(&self) -> Result<EcmascriptModuleAssetVc> {
async fn entry(&self) -> Result<EcmascriptModuleAssetVc> {
let layout_path = self.layout_path.await?;
let page = layout_path
.last()

View file

@ -1,18 +1,19 @@
use std::{
collections::{BTreeMap, HashMap, HashSet},
io::Write,
path::PathBuf,
};
use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use futures::{stream::FuturesUnordered, TryStreamExt};
use indexmap::{IndexMap, IndexSet};
use mime::TEXT_HTML_UTF_8;
pub use node_rendered_source::{create_node_rendered_source, NodeRenderer, NodeRendererVc};
pub use node_api_source::create_node_api_source;
pub use node_entry::{NodeEntry, NodeEntryVc};
pub use node_rendered_source::create_node_rendered_source;
use serde::Deserialize;
use serde_json::Value as JsonValue;
use turbo_tasks::{
primitives::StringVc, spawn_blocking, CompletionVc, CompletionsVc, TryJoinIterExt,
};
use turbo_tasks::{primitives::StringVc, CompletionVc, CompletionsVc, TryJoinIterExt};
use turbo_tasks_fs::{DiskFileSystemVc, File, FileContent, FileSystemPathVc};
use turbopack::ecmascript::EcmascriptModuleAssetVc;
use turbopack_core::{
@ -22,7 +23,7 @@ use turbopack_core::{
};
use turbopack_dev_server::{
html::DevHtmlAssetVc,
source::{query::Query, HeaderValue},
source::{query::Query, BodyVc, HeaderValue, ProxyResult, ProxyResultVc},
};
use turbopack_ecmascript::chunk::EcmascriptChunkPlaceablesVc;
@ -31,9 +32,12 @@ use self::{
issue::RenderingIssue,
pool::{NodeJsPool, NodeJsPoolVc},
};
use crate::nodejs::pool::OperationEvent;
pub(crate) mod bootstrap;
pub(crate) mod issue;
pub(crate) mod node_api_source;
pub(crate) mod node_entry;
pub(crate) mod node_rendered_source;
pub(crate) mod pool;
@ -269,15 +273,30 @@ async fn render_static(
// Read this strongly consistent, since we don't want to run inconsistent
// node.js code.
let pool = renderer_pool.strongly_consistent().await?;
let mut op = pool
.run(serde_json::to_string(&*data.await?)?.as_bytes())
.await?;
let lines = spawn_blocking(move || {
let lines = op.read_lines()?;
drop(op);
Ok::<_, anyhow::Error>(lines)
})
.await?;
let mut operation = pool.operation().await?;
let data = data.await?;
// First, write the render data to the process as a JSON string.
let data = serde_json::to_string(&*data)?;
operation.write_all(data.as_bytes())?;
operation.write_all(&[b'\n'])?;
let mut buffer = Vec::new();
// Read the result headers as a UTF8 string.
let (_, event) = operation.read_event(&mut buffer)?;
let result = match event {
OperationEvent::Success => String::from_utf8(buffer)?,
event => {
bail!(
"unexpected event from Node.js rendering process: {:?}",
event
);
}
};
// Parse the result.
let lines: Vec<_> = result.lines().collect();
let issue = if let Some(last_line) = lines.last() {
if let Some(data) = last_line.strip_prefix("RESULT=") {
let result: serde_json::Result<RenderResult> = serde_json::from_str(data);
@ -352,3 +371,106 @@ async fn render_static(
Ok(html.content())
}
#[turbo_tasks::value(shared)]
pub(super) struct ResponseHeaders {
status: u16,
headers: Vec<String>,
}
/// Renders a module as static HTML in a node.js process.
#[turbo_tasks::function]
async fn render_proxy(
path: FileSystemPathVc,
module: EcmascriptModuleAssetVc,
runtime_entries: EcmascriptChunkPlaceablesVc,
chunking_context: ChunkingContextVc,
intermediate_output_path: FileSystemPathVc,
data: RenderDataVc,
body: BodyVc,
) -> Result<ProxyResultVc> {
let renderer_pool = get_renderer_pool(
get_intermediate_asset(
module,
runtime_entries,
chunking_context,
intermediate_output_path,
),
intermediate_output_path,
);
let pool = renderer_pool.await?;
let mut operation = pool.operation().await?;
let data = data.await?;
// First, write the render data to the process as a JSON string.
let data = serde_json::to_string(&*data)?;
operation.write_all(data.as_bytes())?;
operation.write_step()?;
// Then, write the binary body.
for chunk in body.await?.chunks() {
operation.write_all(chunk.as_bytes())?;
}
operation.write_step()?;
let mut buffer = Vec::new();
// Read the response headers as a JSON string.
let (_, event) = operation.read_event(&mut buffer)?;
let headers: ResponseHeaders = match event {
OperationEvent::Step => serde_json::from_slice(&buffer)?,
OperationEvent::Error => return proxy_error(path, buffer),
event => {
bail!(
"unexpected event from Node.js rendering process: {:?}",
event
);
}
};
// Reuse the buffer.
buffer.truncate(0);
// Read the response body as a binary blob.
let (_, event) = operation.read_event(&mut buffer)?;
let body = match event {
OperationEvent::Success => buffer,
OperationEvent::Error => return proxy_error(path, buffer),
event => {
bail!(
"unexpected event from Node.js rendering process: {:?}",
event
);
}
};
Ok(ProxyResult {
status: headers.status,
headers: headers.headers,
body,
}
.cell())
}
fn proxy_error(path: FileSystemPathVc, buffer: Vec<u8>) -> Result<ProxyResultVc> {
let error_message = String::from_utf8(buffer.clone())?;
RenderingIssue {
context: path,
message: StringVc::cell(error_message),
logs: StringVc::cell("".to_string()),
}
.cell()
.as_issue()
.emit();
return Ok(ProxyResult {
status: 500,
headers: vec![
"content-type".to_string(),
"text/html; charset=utf-8".to_string(),
],
body: buffer,
}
.cell());
}

View file

@ -0,0 +1,169 @@
use std::collections::HashSet;
use anyhow::Result;
use indexmap::IndexMap;
use turbo_tasks::{primitives::StringVc, ValueToString};
use turbo_tasks_fs::FileSystemPathVc;
use turbopack_core::{
chunk::ChunkingContextVc,
introspect::{
asset::IntrospectableAssetVc, Introspectable, IntrospectableChildrenVc, IntrospectableVc,
},
};
use turbopack_dev_server::source::{
ContentSource, ContentSourceData, ContentSourceDataFilter, ContentSourceDataVary,
ContentSourceResult, ContentSourceResultVc, ContentSourceVc,
};
use turbopack_ecmascript::chunk::EcmascriptChunkPlaceablesVc;
use super::{get_intermediate_asset, render_proxy, NodeEntryVc, RenderData};
use crate::path_regex::PathRegexVc;
/// Creates a [NodeApiContentSource].
#[turbo_tasks::function]
pub fn create_node_api_source(
server_root: FileSystemPathVc,
path_regex: PathRegexVc,
entry: NodeEntryVc,
chunking_context: ChunkingContextVc,
runtime_entries: EcmascriptChunkPlaceablesVc,
intermediate_output_path: FileSystemPathVc,
) -> ContentSourceVc {
NodeApiContentSource {
server_root,
path_regex,
entry,
chunking_context,
runtime_entries,
intermediate_output_path,
}
.cell()
.into()
}
/// A content source that proxies API requests to one-off Node.js
/// servers running the passed `entry` when it matches a `path_regex`.
///
/// It needs a temporary directory (`intermediate_output_path`) to place file
/// for Node.js execution during rendering. The `chunking_context` should emit
/// to this directory.
#[turbo_tasks::value]
struct NodeApiContentSource {
server_root: FileSystemPathVc,
path_regex: PathRegexVc,
entry: NodeEntryVc,
chunking_context: ChunkingContextVc,
runtime_entries: EcmascriptChunkPlaceablesVc,
intermediate_output_path: FileSystemPathVc,
}
impl NodeApiContentSource {
/// Checks if a path matches the regular expression
async fn is_matching_path(&self, path: &str) -> Result<bool> {
Ok(self.path_regex.await?.is_match(path))
}
/// Matches a path with the regular expression and returns a JSON object
/// with the named captures
async fn get_matches(&self, path: &str) -> Result<Option<IndexMap<String, String>>> {
Ok(self.path_regex.await?.get_matches(path))
}
}
#[turbo_tasks::value_impl]
impl ContentSource for NodeApiContentSource {
#[turbo_tasks::function]
async fn get(
self_vc: NodeApiContentSourceVc,
path: &str,
data: turbo_tasks::Value<ContentSourceData>,
) -> Result<ContentSourceResultVc> {
let this = self_vc.await?;
if this.is_matching_path(path).await? {
if let Some(params) = this.get_matches(path).await? {
if let ContentSourceData {
headers: Some(headers),
method: Some(method),
url: Some(url),
query: Some(query),
body: Some(body),
..
} = &*data
{
return Ok(ContentSourceResult::HttpProxy(render_proxy(
this.server_root.join(path),
this.entry.entry(),
this.runtime_entries,
this.chunking_context,
this.intermediate_output_path,
RenderData {
params,
method: method.clone(),
url: url.clone(),
query: query.clone(),
headers: headers.clone(),
path: format!("/{path}"),
}
.cell(),
*body,
))
.cell()
.into());
} else {
return Ok(ContentSourceResult::NeedData {
source: self_vc.into(),
path: path.to_string(),
vary: ContentSourceDataVary {
method: true,
url: true,
headers: Some(ContentSourceDataFilter::All),
query: Some(ContentSourceDataFilter::All),
body: true,
cache_buster: true,
..Default::default()
},
}
.cell());
}
}
}
Ok(ContentSourceResult::NotFound.cell())
}
}
#[turbo_tasks::function]
fn introspectable_type() -> StringVc {
StringVc::cell("node api content source".to_string())
}
#[turbo_tasks::value_impl]
impl Introspectable for NodeApiContentSource {
#[turbo_tasks::function]
fn ty(&self) -> StringVc {
introspectable_type()
}
#[turbo_tasks::function]
fn title(&self) -> StringVc {
self.path_regex.to_string()
}
#[turbo_tasks::function]
fn children(&self) -> IntrospectableChildrenVc {
IntrospectableChildrenVc::cell(HashSet::from([
(
StringVc::cell("module".to_string()),
IntrospectableAssetVc::new(self.entry.entry().into()),
),
(
StringVc::cell("intermediate asset".to_string()),
IntrospectableAssetVc::new(get_intermediate_asset(
self.entry.entry(),
self.runtime_entries,
self.chunking_context,
self.intermediate_output_path,
)),
),
]))
}
}

View file

@ -0,0 +1,8 @@
use anyhow::Result;
use turbopack_ecmascript::EcmascriptModuleAssetVc;
/// Trait that allows to get the entry module for rendering something in Node.js
#[turbo_tasks::value_trait]
pub trait NodeEntry {
fn entry(&self) -> EcmascriptModuleAssetVc;
}

View file

@ -20,20 +20,16 @@ use turbopack_dev_server::{
ContentSourceResult, ContentSourceResultVc, ContentSourceVc,
},
};
use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc};
use turbopack_ecmascript::chunk::EcmascriptChunkPlaceablesVc;
use super::{external_asset_entrypoints, get_intermediate_asset, render_static, RenderData};
use super::{
external_asset_entrypoints, get_intermediate_asset, render_static, NodeEntryVc, RenderData,
};
use crate::path_regex::PathRegexVc;
/// Trait that allows to get the entry module for rendering something in Node.js
#[turbo_tasks::value_trait]
pub trait NodeRenderer {
fn module(&self) -> EcmascriptModuleAssetVc;
}
/// Creates a content source that renders something in Node.js with the passed
/// `renderer` when it matches a `path_regex`. Once rendered it serves
/// all assets referenced by the `renderer` that are within the `server_root`.
/// `entry` when it matches a `path_regex`. Once rendered it serves
/// all assets referenced by the `entry` that are within the `server_root`.
/// It needs a temporary directory (`intermediate_output_path`) to place file
/// for Node.js execution during rendering. The `chunking_context` should emit
/// to this directory.
@ -41,7 +37,7 @@ pub trait NodeRenderer {
pub fn create_node_rendered_source(
server_root: FileSystemPathVc,
path_regex: PathRegexVc,
renderer: NodeRendererVc,
entry: NodeEntryVc,
chunking_context: ChunkingContextVc,
runtime_entries: EcmascriptChunkPlaceablesVc,
fallback_page: DevHtmlAssetVc,
@ -50,7 +46,7 @@ pub fn create_node_rendered_source(
let source = NodeRenderContentSource {
server_root,
path_regex,
renderer,
entry,
chunking_context,
runtime_entries,
fallback_page,
@ -73,7 +69,7 @@ pub fn create_node_rendered_source(
struct NodeRenderContentSource {
server_root: FileSystemPathVc,
path_regex: PathRegexVc,
renderer: NodeRendererVc,
entry: NodeEntryVc,
chunking_context: ChunkingContextVc,
runtime_entries: EcmascriptChunkPlaceablesVc,
fallback_page: DevHtmlAssetVc,
@ -110,7 +106,7 @@ impl GetContentSource for NodeRenderContentSource {
AssetGraphContentSourceVc::new_lazy_multiple(
self.server_root,
external_asset_entrypoints(
self.renderer.module(),
self.entry.entry(),
self.runtime_entries,
self.chunking_context,
self.intermediate_output_path,
@ -143,7 +139,7 @@ impl ContentSource for NodeRenderContentSource {
return Ok(ContentSourceResult::Static(
render_static(
this.server_root.join(path),
this.renderer.module(),
this.entry.entry(),
this.runtime_entries,
this.fallback_page,
this.chunking_context,
@ -222,12 +218,12 @@ impl Introspectable for NodeRenderContentSource {
IntrospectableChildrenVc::cell(HashSet::from([
(
StringVc::cell("module".to_string()),
IntrospectableAssetVc::new(self.renderer.module().into()),
IntrospectableAssetVc::new(self.entry.entry().into()),
),
(
StringVc::cell("intermediate asset".to_string()),
IntrospectableAssetVc::new(get_intermediate_asset(
self.renderer.module(),
self.entry.entry(),
self.runtime_entries,
self.chunking_context,
self.intermediate_output_path,

View file

@ -1,23 +1,22 @@
use std::{
collections::HashMap,
io::{BufRead, BufReader, Write},
mem::transmute,
path::{Path, PathBuf},
process::{Child, ChildStdin, ChildStdout, Command, Stdio},
sync::{Arc, Mutex},
};
use anyhow::{bail, Context, Result};
use rand::Rng;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use turbo_tasks::spawn_blocking;
const END_OF_OPERATION: &str =
"END_OF_OPERATION 4329g8b57hnz349bo58tzuasgnhv9o8e4zo6gvj END_OF_OPERATION\n";
use turbo_tasks_hash::encode_hex_string;
struct NodeJsPoolProcess {
child: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
marker: Arc<OperationMarker>,
}
impl Drop for NodeJsPoolProcess {
@ -27,12 +26,96 @@ impl Drop for NodeJsPoolProcess {
}
}
/// A marker used to detect the limits of a single operation without the output
/// of a Node.js process.
struct OperationMarker {
marker: String,
}
impl OperationMarker {
const STEP: &str = "OPERATION_STEP";
const SUCCESS: &str = "OPERATION_END";
const ERROR: &str = "OPERATION_ERROR";
fn new() -> Self {
Self {
marker: encode_hex_string(&rand::thread_rng().gen::<[u8; 16]>()),
}
}
fn read_event(&self, orig_buffer: &[u8]) -> Option<(usize, OperationEvent)> {
let buffer = orig_buffer;
let buffer = buffer.strip_suffix(&[b'\n'])?;
let buffer = buffer.strip_suffix(self.marker.as_bytes())?;
let buffer = buffer.strip_suffix(&[b' '])?;
if let Some(buffer) = buffer
.strip_suffix(Self::STEP.as_bytes())
.and_then(|buffer| buffer.strip_suffix(&[b'\n']))
{
Some((orig_buffer.len() - buffer.len(), OperationEvent::Step))
} else if let Some(buffer) = buffer
.strip_suffix(Self::SUCCESS.as_bytes())
.and_then(|buffer| buffer.strip_suffix(&[b'\n']))
{
Some((orig_buffer.len() - buffer.len(), OperationEvent::Success))
} else if let Some(buffer) = buffer
.strip_suffix(Self::ERROR.as_bytes())
.and_then(|buffer| buffer.strip_suffix(&[b'\n']))
{
Some((orig_buffer.len() - buffer.len(), OperationEvent::Error))
} else {
None
}
}
fn write<W>(&self, mut writer: W, kind: &str) -> std::io::Result<()>
where
W: Write,
{
writer.write_all(&[b'\n'])?;
writer.write_all(kind.as_bytes())?;
writer.write_all(&[b' '])?;
writer.write_all(self.marker.as_bytes())?;
writer.write_all(&[b'\n'])?;
Ok(())
}
fn write_step<W>(&self, writer: W) -> std::io::Result<()>
where
W: Write,
{
self.write(writer, Self::STEP)
}
}
impl Default for OperationMarker {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(super) enum OperationEvent {
Step,
Success,
Error,
}
impl NodeJsPoolProcess {
fn prepare(cwd: &Path, env: &HashMap<String, String>, entrypoint: &Path) -> Command {
fn prepare(
cwd: &Path,
env: &HashMap<String, String>,
entrypoint: &Path,
marker: &OperationMarker,
) -> Command {
let mut cmd = Command::new("node");
cmd.current_dir(cwd);
cmd.arg(entrypoint);
cmd.arg(&END_OF_OPERATION[..END_OF_OPERATION.len() - 1]);
cmd.arg(&marker.marker);
cmd.arg(&OperationMarker::STEP);
cmd.arg(&OperationMarker::SUCCESS);
cmd.arg(&OperationMarker::ERROR);
cmd.env_clear();
cmd.env(
"PATH",
@ -49,7 +132,7 @@ impl NodeJsPoolProcess {
cmd
}
fn start(mut cmd: Command) -> Result<Self> {
fn start(mut cmd: Command, marker: Arc<OperationMarker>) -> Result<Self> {
let mut child = cmd.spawn().context("spawning node pool")?;
let stdin = child.stdin.take().unwrap();
let mut stdout = BufReader::new(child.stdout.take().unwrap());
@ -68,15 +151,26 @@ impl NodeJsPoolProcess {
child,
stdin,
stdout,
marker,
})
}
fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
self.stdout.read_line(buf)
fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
self.stdout.read_until(byte, buf)
}
pub(super) fn write(&mut self, buf: &[u8]) -> std::io::Result<()> {
self.stdin.write_all(buf)
fn write_step(&mut self) -> std::io::Result<()> {
self.marker.write_step(&mut self.stdin)
}
}
impl Write for NodeJsPoolProcess {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.stdin.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.stdin.flush()
}
}
@ -98,6 +192,8 @@ pub(super) struct NodeJsPool {
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
#[turbo_tasks(trace_ignore, debug_ignore)]
semaphore: Arc<Semaphore>,
#[turbo_tasks(trace_ignore, debug_ignore)]
marker: Arc<OperationMarker>,
}
impl NodeJsPool {
@ -113,6 +209,7 @@ impl NodeJsPool {
env,
processes: Arc::new(Mutex::new(Vec::new())),
semaphore: Arc::new(Semaphore::new(concurrency)),
marker: Arc::new(OperationMarker::default()),
}
}
@ -125,38 +222,30 @@ impl NodeJsPool {
Ok(if let Some(child) = popped {
(child, permit)
} else {
let marker = Arc::clone(&self.marker);
let cmd = NodeJsPoolProcess::prepare(
self.cwd.as_path(),
&self.env,
self.entrypoint.as_path(),
&*marker,
);
let fresh = spawn_blocking(move || NodeJsPoolProcess::start(cmd)).await?;
let fresh = spawn_blocking(move || NodeJsPoolProcess::start(cmd, marker)).await?;
(fresh, permit)
})
}
pub(super) async fn run(&self, input: &[u8]) -> Result<NodeJsOperationResult> {
let (mut child, permit) = self.acquire_child().await?;
// SAFETY we await spawn blocking so we stay within the lifetime of input
let static_input: &'static [u8] = unsafe { transmute(input) };
let child = spawn_blocking(move || {
child.write(static_input)?;
child.write(b"\n")?;
Ok::<_, anyhow::Error>(child)
})
.await?;
pub(super) async fn operation(&self) -> Result<NodeJsOperation> {
let (child, permit) = self.acquire_child().await?;
Ok(NodeJsOperationResult {
Ok(NodeJsOperation {
child: Some(child),
child_ended: false,
permit,
processes: self.processes.clone(),
})
}
}
pub(super) struct NodeJsOperationResult {
child_ended: bool,
pub(super) struct NodeJsOperation {
child: Option<NodeJsPoolProcess>,
// This is used for drop
#[allow(dead_code)]
@ -164,46 +253,61 @@ pub(super) struct NodeJsOperationResult {
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
}
impl NodeJsOperationResult {
pub(super) fn read_line(&mut self, buf: &mut String) -> Result<usize, std::io::Error> {
if let Some(ref mut child) = self.child {
if self.child_ended {
return Ok(0);
}
let len = child.read_line(buf)?;
if len == 0 {
self.child = None;
return Ok(0);
}
if buf.ends_with(END_OF_OPERATION) {
buf.truncate(buf.len() - END_OF_OPERATION.len());
self.child_ended = true;
Ok(0)
} else {
Ok(len)
}
} else {
Ok(0)
}
impl NodeJsOperation {
fn expect_child_mut(&mut self) -> &mut NodeJsPoolProcess {
self.child
.as_mut()
.expect("child must be present while operation is live")
}
pub(super) fn read_lines(&mut self) -> Result<Vec<String>, std::io::Error> {
let mut lines = Vec::new();
fn take_child(&mut self) -> NodeJsPoolProcess {
self.child
.take()
.expect("child must be present while operation is live")
}
/// Writes the step event end marker to the child process.
pub(super) fn write_step(&mut self) -> std::io::Result<()> {
self.expect_child_mut().write_step()
}
/// Reads a completed event in the child process output. Blocks while
/// waiting for more output.
pub(super) fn read_event(
&mut self,
buf: &mut Vec<u8>,
) -> std::io::Result<(usize, OperationEvent)> {
let child = self.expect_child_mut();
let mut total_read = 0;
loop {
let mut line = String::new();
if self.read_line(&mut line)? == 0 {
return Ok(lines);
total_read += child.read_until(b'\n', buf)?;
match child.marker.read_event(&buf) {
Some((read, event)) => {
buf.truncate(buf.len() - read);
break Ok((total_read - read, event));
}
None => {}
}
line.pop();
lines.push(line);
}
}
}
impl Drop for NodeJsOperationResult {
fn drop(&mut self) {
if let Some(child) = self.child.take() {
self.processes.lock().unwrap().push(child)
}
impl Write for NodeJsOperation {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let child = self.expect_child_mut();
child.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let child = self.expect_child_mut();
child.flush()
}
}
impl Drop for NodeJsOperation {
fn drop(&mut self) {
let child = self.take_child();
self.processes.lock().unwrap().push(child);
}
}

View file

@ -1,7 +1,7 @@
use std::collections::HashMap;
use anyhow::Result;
use turbo_tasks::Value;
use turbo_tasks::{primitives::BoolVc, Value};
use turbo_tasks_env::ProcessEnvVc;
use turbo_tasks_fs::{DirectoryContent, DirectoryEntry, FileSystemEntryType, FileSystemPathVc};
use turbopack::{transition::TransitionsByNameVc, ModuleAssetContextVc};
@ -38,7 +38,7 @@ use crate::{
get_server_environment, get_server_module_options_context,
get_server_resolve_options_context, ServerContextType,
},
nodejs::node_rendered_source::{create_node_rendered_source, NodeRenderer, NodeRendererVc},
nodejs::{create_node_api_source, create_node_rendered_source, NodeEntry, NodeEntryVc},
util::regular_expression_for_path,
};
@ -111,6 +111,7 @@ pub async fn create_server_rendered_source(
fallback_page,
server_root,
server_root,
server_root.join("api"),
output_path,
);
let fallback_source =
@ -125,7 +126,7 @@ pub async fn create_server_rendered_source(
/// Handles a single page file in the pages directory
#[turbo_tasks::function]
fn create_server_rendered_source_for_file(
async fn create_server_rendered_source_for_file(
context_path: FileSystemPathVc,
context: AssetContextVc,
pages_dir: FileSystemPathVc,
@ -134,8 +135,9 @@ fn create_server_rendered_source_for_file(
fallback_page: DevHtmlAssetVc,
server_root: FileSystemPathVc,
server_path: FileSystemPathVc,
is_api_path: BoolVc,
intermediate_output_path: FileSystemPathVc,
) -> ContentSourceVc {
) -> Result<ContentSourceVc> {
let source_asset = SourceAssetVc::new(page_file).into();
let entry_asset = context.process(source_asset);
@ -148,20 +150,38 @@ fn create_server_rendered_source_for_file(
)
.into();
create_node_rendered_source(
server_root,
regular_expression_for_path(server_root, server_path, true),
SsrRenderer {
context,
entry_asset,
}
.cell()
.into(),
chunking_context,
runtime_entries,
fallback_page,
intermediate_output_path,
)
Ok(if *is_api_path.await? {
create_node_api_source(
server_root,
regular_expression_for_path(server_root, server_path, true),
SsrEntry {
context,
entry_asset,
is_api_path,
}
.cell()
.into(),
chunking_context,
runtime_entries,
intermediate_output_path,
)
} else {
create_node_rendered_source(
server_root,
regular_expression_for_path(server_root, server_path, true),
SsrEntry {
context,
entry_asset,
is_api_path,
}
.cell()
.into(),
chunking_context,
runtime_entries,
fallback_page,
intermediate_output_path,
)
})
}
/// Handles a directory in the pages directory (or the pages directory itself).
@ -177,6 +197,7 @@ async fn create_server_rendered_source_for_directory(
fallback_page: DevHtmlAssetVc,
server_root: FileSystemPathVc,
server_path: FileSystemPathVc,
server_api_path: FileSystemPathVc,
intermediate_output_path: FileSystemPathVc,
) -> Result<CombinedContentSourceVc> {
let mut predefined_sources = vec![];
@ -219,6 +240,7 @@ async fn create_server_rendered_source_for_directory(
fallback_page,
server_root,
dev_server_path,
dev_server_path.is_inside(server_api_path),
intermediate_output_path,
),
));
@ -239,6 +261,7 @@ async fn create_server_rendered_source_for_directory(
fallback_page,
server_root,
server_path.join(name),
server_api_path,
intermediate_output_path.join(name),
)
.into(),
@ -265,21 +288,30 @@ async fn create_server_rendered_source_for_directory(
/// The node.js renderer for SSR of pages.
#[turbo_tasks::value]
struct SsrRenderer {
struct SsrEntry {
context: AssetContextVc,
entry_asset: AssetVc,
is_api_path: BoolVc,
}
#[turbo_tasks::value_impl]
impl NodeRenderer for SsrRenderer {
impl NodeEntry for SsrEntry {
#[turbo_tasks::function]
fn module(&self) -> EcmascriptModuleAssetVc {
EcmascriptModuleAssetVc::new(
async fn entry(&self) -> Result<EcmascriptModuleAssetVc> {
let virtual_asset = if *self.is_api_path.await? {
VirtualAssetVc::new(
self.entry_asset.path().join("server-api.tsx"),
next_js_file("entry/server-api.tsx").into(),
)
} else {
VirtualAssetVc::new(
self.entry_asset.path().join("server-renderer.tsx"),
next_js_file("entry/server-renderer.tsx").into(),
)
.into(),
};
Ok(EcmascriptModuleAssetVc::new(
virtual_asset.into(),
self.context,
Value::new(EcmascriptModuleAssetType::Typescript),
EcmascriptInputTransformsVc::cell(vec![
@ -287,6 +319,6 @@ impl NodeRenderer for SsrRenderer {
EcmascriptInputTransform::React { refresh: false },
]),
self.context.environment(),
)
))
}
}