Initial HMR Nexturbo API implementation (#52950)

This implements a MVP of HMR. HMR works similarly as in turbopack-dev-server, but instead of going through the router to retrieve output assets, output assets are eagerly stored into a global hash map, and retrieved directly from there (see `VersionedContentMap`).

This will require some more glue on the Next.js side in order to handle:
* RSC headers;
* handling Turbopack subscriptiob HMR events from the Next.js WS server, proxying them to `hmr_events`, and sending back the stream of updates.

There's currently no way to evict deleted output assets, nor to communicate these events to the client. @sokra mentioned the `VersionedContentMap` could store a list of assets per entrypoint, instead of having a top-level flat map.

Co-authored-by: Tobias Koppers <1365881+sokra@users.noreply.github.com>
This commit is contained in:
Alex Kirszenberg 2023-08-07 12:37:57 +02:00 committed by GitHub
parent b993afbf7c
commit 9483ff170a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 338 additions and 79 deletions

1
Cargo.lock generated
View file

@ -7608,6 +7608,7 @@ dependencies = [
"turbopack-dev",
"turbopack-dev-server",
"turbopack-ecmascript",
"turbopack-ecmascript-hmr-protocol",
"turbopack-ecmascript-plugins",
"turbopack-ecmascript-runtime",
"turbopack-env",

View file

@ -62,7 +62,8 @@ turbopack-binding = { workspace = true, features = [
"__turbo",
"__turbo_tasks",
"__turbo_tasks_memory",
"__turbopack"
"__turbopack",
"__turbopack_ecmascript_hmr_protocol",
] }
[target.'cfg(not(all(target_os = "linux", target_env = "musl", target_arch = "aarch64")))'.dependencies]

View file

@ -12,7 +12,7 @@ use next_core::tracing_presets::{
use tracing_subscriber::{
prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, EnvFilter, Registry,
};
use turbo_tasks::{TurboTasks, Vc};
use turbo_tasks::{TransientInstance, TurboTasks, Vc};
use turbopack_binding::{
turbo::tasks_memory::MemoryBackend,
turbopack::{
@ -22,7 +22,11 @@ use turbopack_binding::{
trace_writer::{TraceWriter, TraceWriterGuard},
tracing_presets::TRACING_OVERVIEW_TARGETS,
},
core::error::PrettyPrintError,
core::{
error::PrettyPrintError,
version::{PartialUpdate, TotalUpdate, Update},
},
ecmascript_hmr_protocol::{ClientUpdateInstruction, ResourceIdentifier},
},
};
@ -30,7 +34,7 @@ use super::{
endpoint::ExternalEndpoint,
utils::{
get_diagnostics, get_issues, serde_enum_to_string, subscribe, NapiDiagnostic, NapiIssue,
RootTask, VcArc,
RootTask, TurbopackResult, VcArc,
},
};
use crate::register;
@ -274,7 +278,6 @@ impl NapiMiddleware {
})
}
}
#[napi(object)]
struct NapiEntrypoints {
pub routes: Vec<NapiRoute>,
@ -282,8 +285,6 @@ struct NapiEntrypoints {
pub pages_document_endpoint: External<ExternalEndpoint>,
pub pages_app_endpoint: External<ExternalEndpoint>,
pub pages_error_endpoint: External<ExternalEndpoint>,
pub issues: Vec<NapiIssue>,
pub diagnostics: Vec<NapiDiagnostic>,
}
#[napi(ts_return_type = "{ __napiType: \"RootTask\" }")]
@ -309,7 +310,8 @@ pub fn project_entrypoints_subscribe(
move |ctx| {
let (entrypoints, issues, diags) = ctx.value;
Ok(vec![NapiEntrypoints {
Ok(vec![TurbopackResult {
result: NapiEntrypoints {
routes: entrypoints
.routes
.iter()
@ -334,6 +336,7 @@ pub fn project_entrypoints_subscribe(
turbo_tasks.clone(),
entrypoints.pages_error_endpoint,
))),
},
issues: issues
.iter()
.map(|issue| NapiIssue::from(&**issue))
@ -343,3 +346,79 @@ pub fn project_entrypoints_subscribe(
},
)
}
#[napi(ts_return_type = "{ __napiType: \"RootTask\" }")]
pub fn project_hmr_events(
#[napi(ts_arg_type = "{ __napiType: \"Project\" }")] project: External<
VcArc<Vc<ProjectContainer>>,
>,
identifier: String,
func: JsFunction,
) -> napi::Result<External<RootTask>> {
let turbo_tasks = project.turbo_tasks().clone();
let project = **project;
let session = TransientInstance::new(());
subscribe(
turbo_tasks.clone(),
func,
{
let identifier = identifier.clone();
let session = session.clone();
move || {
let identifier = identifier.clone();
let session = session.clone();
async move {
let state = project
.project()
.hmr_version_state(identifier.clone(), session);
let update = project.project().hmr_update(identifier, state);
let issues = get_issues(update).await?;
let diags = get_diagnostics(update).await?;
let update = update.strongly_consistent().await?;
match &*update {
Update::None => {}
Update::Total(TotalUpdate { to }) => {
state.set(to.clone()).await?;
}
Update::Partial(PartialUpdate { to, .. }) => {
state.set(to.clone()).await?;
}
}
Ok((update, issues, diags))
}
}
},
move |ctx| {
let (update, issues, diags) = ctx.value;
let napi_issues = issues
.iter()
.map(|issue| NapiIssue::from(&**issue))
.collect();
let update_issues = issues
.iter()
.map(|issue| (&**issue).into())
.collect::<Vec<_>>();
let identifier = ResourceIdentifier {
path: identifier.clone(),
headers: None,
};
let update = match &*update {
Update::Total(_) => ClientUpdateInstruction::restart(&identifier, &update_issues),
Update::Partial(update) => ClientUpdateInstruction::partial(
&identifier,
&update.instruction,
&update_issues,
),
Update::None => ClientUpdateInstruction::issues(&identifier, &update_issues),
};
Ok(vec![TurbopackResult {
result: ctx.env.to_js_value(&update)?,
issues: napi_issues,
diagnostics: diags.iter().map(|d| NapiDiagnostic::from(d)).collect(),
}])
},
)
}

View file

@ -4,7 +4,7 @@ use next_core::{
app_structure::{
get_entrypoints, Entrypoint as AppEntrypoint, Entrypoints as AppEntrypoints, LoaderTree,
},
emit_all_assets, get_edge_resolve_options_context,
get_edge_resolve_options_context,
mode::NextMode,
next_app::{
get_app_client_references_chunks, get_app_client_shared_chunks, get_app_page_entry,
@ -435,14 +435,6 @@ struct AppEndpoint {
#[turbo_tasks::value_impl]
impl AppEndpoint {
#[turbo_tasks::function]
fn client_relative_path(&self) -> Vc<FileSystemPath> {
self.app_project
.project()
.client_root()
.join("_next".to_string())
}
#[turbo_tasks::function]
fn app_page_entry(&self, loader_tree: Vc<LoaderTree>) -> Vc<AppEntry> {
get_app_page_entry(
@ -482,7 +474,7 @@ impl AppEndpoint {
let node_root = this.app_project.project().node_root();
let client_relative_path = self.client_relative_path();
let client_relative_path = this.app_project.project().client_relative_path();
let client_relative_path_ref = client_relative_path.await?;
let server_path = node_root.join("server".to_string());
@ -759,12 +751,9 @@ impl Endpoint for AppEndpoint {
let node_root_ref = &node_root.await?;
let node_root = this.app_project.project().node_root();
emit_all_assets(
output_assets,
node_root,
self.client_relative_path(),
this.app_project.project().node_root(),
)
this.app_project
.project()
.emit_all_output_assets(output_assets)
.await?;
let server_paths = all_server_paths(output_assets, node_root)

View file

@ -7,6 +7,7 @@ mod entrypoints;
mod pages;
pub mod project;
pub mod route;
mod versioned_content_map;
// Declare build-time information variables generated in build.rs
shadow_rs::shadow!(build);

View file

@ -1,8 +1,8 @@
use anyhow::{bail, Context, Result};
use indexmap::IndexMap;
use next_core::{
all_server_paths, create_page_loader_entry_module, emit_all_assets,
get_asset_path_from_pathname, get_edge_resolve_options_context,
all_server_paths, create_page_loader_entry_module, get_asset_path_from_pathname,
get_edge_resolve_options_context,
mode::NextMode,
next_client::{
get_client_module_options_context, get_client_resolve_options_context,
@ -813,20 +813,17 @@ impl Endpoint for PageEndpoint {
let this = self.await?;
let node_root = this.pages_project.project().node_root();
emit_all_assets(
output_assets,
node_root,
this.pages_project.project().client_relative_path(),
this.pages_project.project().node_root(),
)
this.pages_project
.project()
.emit_all_output_assets(output_assets)
.await?;
let node_root = this.pages_project.project().node_root();
let server_paths = all_server_paths(output_assets, node_root)
.await?
.clone_value();
let node_root = &this.pages_project.project().node_root().await?;
let node_root = &node_root.await?;
let written_endpoint = match *output.await? {
PageEndpointOutput::NodeJs {
entry_chunk,

View file

@ -3,8 +3,9 @@ use std::path::MAIN_SEPARATOR;
use anyhow::Result;
use indexmap::{map::Entry, IndexMap};
use next_core::{
all_assets_from_entries,
app_structure::find_app_dir,
get_edge_chunking_context, get_edge_compile_time_info,
emit_assets, get_edge_chunking_context, get_edge_compile_time_info,
mode::NextMode,
next_client::{get_client_chunking_context, get_client_compile_time_info},
next_config::{JsConfig, NextConfig},
@ -14,7 +15,8 @@ use next_core::{
};
use serde::{Deserialize, Serialize};
use turbo_tasks::{
debug::ValueDebugFormat, trace::TraceRawVcs, unit, State, TaskInput, TransientValue, Vc,
debug::ValueDebugFormat, trace::TraceRawVcs, unit, Completion, IntoTraitRef, State, TaskInput,
TransientInstance, Vc,
};
use turbopack_binding::{
turbo::{
@ -24,8 +26,13 @@ use turbopack_binding::{
turbopack::{
build::BuildChunkingContext,
core::{
chunk::ChunkingContext, compile_time_info::CompileTimeInfo, diagnostics::DiagnosticExt,
environment::ServerAddr, PROJECT_FILESYSTEM_NAME,
chunk::ChunkingContext,
compile_time_info::CompileTimeInfo,
diagnostics::DiagnosticExt,
environment::ServerAddr,
output::OutputAssets,
version::{Update, Version, VersionState, VersionedContent},
PROJECT_FILESYSTEM_NAME,
},
dev::DevChunkingContext,
ecmascript::chunk::EcmascriptChunkingContext,
@ -40,6 +47,7 @@ use crate::{
entrypoints::Entrypoints,
pages::PagesProject,
route::{Endpoint, Route},
versioned_content_map::VersionedContentMap,
};
#[derive(Debug, Serialize, Deserialize, Clone, TaskInput, PartialEq, Eq, TraceRawVcs)]
@ -73,7 +81,8 @@ pub struct Middleware {
#[turbo_tasks::value]
pub struct ProjectContainer {
state: State<ProjectOptions>,
options_state: State<ProjectOptions>,
versioned_content_map: Vc<VersionedContentMap>,
}
#[turbo_tasks::value_impl]
@ -81,21 +90,22 @@ impl ProjectContainer {
#[turbo_tasks::function]
pub fn new(options: ProjectOptions) -> Vc<Self> {
ProjectContainer {
state: State::new(options),
options_state: State::new(options),
versioned_content_map: VersionedContentMap::new(),
}
.cell()
}
#[turbo_tasks::function]
pub async fn update(self: Vc<Self>, options: ProjectOptions) -> Result<Vc<()>> {
self.await?.state.set(options);
self.await?.options_state.set(options);
Ok(unit())
}
#[turbo_tasks::function]
pub async fn project(self: Vc<Self>) -> Result<Vc<Project>> {
let this = self.await?;
let options = this.state.get();
let options = this.options_state.get();
let next_config = NextConfig::from_string(Vc::cell(options.next_config.clone()));
let js_config = JsConfig::from_string(Vc::cell(options.js_config.clone()));
let env: Vc<EnvMap> = Vc::cell(options.env.iter().cloned().collect());
@ -110,6 +120,7 @@ impl ProjectContainer {
versions, last 1 Edge versions"
.to_string(),
mode: NextMode::Development,
versioned_content_map: this.versioned_content_map,
}
.cell())
}
@ -144,6 +155,8 @@ pub struct Project {
browserslist_query: String,
mode: NextMode,
versioned_content_map: Vc<VersionedContentMap>,
}
#[turbo_tasks::value_impl]
@ -475,10 +488,73 @@ impl Project {
.cell())
}
#[turbo_tasks::function]
pub async fn emit_all_output_assets(
self: Vc<Self>,
output_assets: Vc<OutputAssets>,
) -> Result<Vc<Completion>> {
let all_output_assets = all_assets_from_entries(output_assets);
self.await?
.versioned_content_map
.insert_output_assets(all_output_assets)
.await?;
Ok(emit_assets(
all_output_assets,
self.node_root(),
self.client_relative_path(),
self.node_root(),
))
}
#[turbo_tasks::function]
async fn hmr_content(
self: Vc<Self>,
identifier: String,
) -> Result<Vc<Box<dyn VersionedContent>>> {
Ok(self
.await?
.versioned_content_map
.get(self.client_root().join(identifier)))
}
#[turbo_tasks::function]
async fn hmr_version(self: Vc<Self>, identifier: String) -> Result<Vc<Box<dyn Version>>> {
let content = self.hmr_content(identifier);
Ok(content.version())
}
/// Get the version state for a session. Initialized with the first seen
/// version in that session.
#[turbo_tasks::function]
pub async fn hmr_version_state(
self: Vc<Self>,
identifier: String,
session: TransientInstance<()>,
) -> Result<Vc<VersionState>> {
let version = self.hmr_version(identifier);
// The session argument is important to avoid caching this function between
// sessions.
let _ = session;
// INVALIDATION: This is intentionally untracked to avoid invalidating this
// function completely. We want to initialize the VersionState with the
// first seen version of the session.
VersionState::new(version.into_trait_ref_untracked().await?).await
}
/// Emits opaque HMR events whenever a change is detected in the chunk group
/// internally known as `identifier`.
#[turbo_tasks::function]
pub fn hmr_events(self: Vc<Self>, _identifier: String, _sender: TransientValue<()>) -> Vc<()> {
unit()
pub async fn hmr_update(
self: Vc<Self>,
identifier: String,
from: Vc<VersionState>,
) -> Result<Vc<Update>> {
let from = from.get();
Ok(self.hmr_content(identifier).update(from))
}
}

View file

@ -0,0 +1,81 @@
use std::collections::HashMap;
use anyhow::{bail, Result};
use turbo_tasks::{State, TryJoinIterExt, ValueDefault, ValueToString, Vc};
use turbopack_binding::{
turbo::tasks_fs::FileSystemPath,
turbopack::core::{
asset::Asset,
output::{OutputAsset, OutputAssets},
version::VersionedContent,
},
};
type VersionedContentMapInner = HashMap<Vc<FileSystemPath>, Vc<Box<dyn VersionedContent>>>;
#[turbo_tasks::value]
pub struct VersionedContentMap {
map: State<VersionedContentMapInner>,
}
impl ValueDefault for VersionedContentMap {
fn value_default() -> Vc<Self> {
VersionedContentMap {
map: State::new(HashMap::new()),
}
.cell()
}
}
impl VersionedContentMap {
// NOTE(alexkirsz) This must not be a `#[turbo_tasks::function]` because it
// should be a singleton for each project.
pub fn new() -> Vc<Self> {
Self::value_default()
}
}
#[turbo_tasks::value_impl]
impl VersionedContentMap {
#[turbo_tasks::function]
pub async fn insert_output_assets(self: Vc<Self>, assets: Vc<OutputAssets>) -> Result<()> {
let assets = assets.await?;
let entries: Vec<_> = assets
.iter()
.map(|asset| async move {
// NOTE(alexkirsz) `.versioned_content()` should not be resolved, to ensure that
// it always points to the task that computes the versioned
// content.
Ok((
asset.ident().path().resolve().await?,
asset.versioned_content(),
))
})
.try_join()
.await?;
self.await?.map.update_conditionally(move |map| {
map.extend(entries);
true
});
Ok(())
}
#[turbo_tasks::function]
pub async fn get(&self, path: Vc<FileSystemPath>) -> Result<Vc<Box<dyn VersionedContent>>> {
let content = {
// NOTE(alexkirsz) This is to avoid Rust marking this method as !Send because a
// StateRef to the map is captured across an await boundary below, even though
// it does not look like it would.
// I think this is a similar issue as https://fasterthanli.me/articles/a-rust-match-made-in-hell
let map = self.map.get();
map.get(&path).copied()
};
let Some(content) = content else {
let path = path.to_string().await?;
bail!("could not find versioned content for path {}", path);
};
// NOTE(alexkirsz) This is necessary to mark the task as active again.
content.node.connect();
Ok(content)
}
}

View file

@ -38,15 +38,35 @@ pub async fn all_server_paths(
/// Assets inside the given client root are rebased to the given client output
/// path.
#[turbo_tasks::function]
pub async fn emit_all_assets(
pub fn emit_all_assets(
assets: Vc<OutputAssets>,
node_root: Vc<FileSystemPath>,
client_relative_path: Vc<FileSystemPath>,
client_output_path: Vc<FileSystemPath>,
) -> Vc<Completion> {
emit_assets(
all_assets_from_entries(assets),
node_root,
client_relative_path,
client_output_path,
)
}
/// Emits all assets transitively reachable from the given chunks, that are
/// inside the node root or the client root.
///
/// Assets inside the given client root are rebased to the given client output
/// path.
#[turbo_tasks::function]
pub async fn emit_assets(
assets: Vc<OutputAssets>,
node_root: Vc<FileSystemPath>,
client_relative_path: Vc<FileSystemPath>,
client_output_path: Vc<FileSystemPath>,
) -> Result<Vc<Completion>> {
let all_assets = all_assets_from_entries(assets).await?;
Ok(Completions::all(
all_assets
assets
.await?
.iter()
.copied()
.map(|asset| async move {
@ -94,7 +114,7 @@ fn emit_rebase(
/// Walks the asset graph from multiple assets and collect all referenced
/// assets.
#[turbo_tasks::function]
async fn all_assets_from_entries(entries: Vc<OutputAssets>) -> Result<Vc<OutputAssets>> {
pub async fn all_assets_from_entries(entries: Vc<OutputAssets>) -> Result<Vc<OutputAssets>> {
Ok(Vc::cell(
AdjacencyMap::new()
.skip_duplicates()

View file

@ -55,7 +55,7 @@ pub use app_segment_config::{
parse_segment_config_from_loader_tree, parse_segment_config_from_source,
};
pub use app_source::create_app_source;
pub use emit::{all_server_paths, emit_all_assets};
pub use emit::{all_assets_from_entries, all_server_paths, emit_all_assets, emit_assets};
pub use next_edge::context::{
get_edge_chunking_context, get_edge_compile_time_info, get_edge_resolve_options_context,
};

View file

@ -470,9 +470,14 @@ interface Entrypoints {
pagesErrorEndpoint: Endpoint
}
interface Update {
update: unknown
}
interface Project {
update(options: ProjectOptions): Promise<void>
entrypointsSubscribe(): AsyncIterableIterator<TurbopackResult<Entrypoints>>
hmrEvents(identifier: string): AsyncIterableIterator<TurbopackResult<Update>>
}
export type Route =
@ -666,8 +671,6 @@ function bindingToApi(binding: any, _wasm: boolean) {
pagesDocumentEndpoint: NapiEndpoint
pagesAppEndpoint: NapiEndpoint
pagesErrorEndpoint: NapiEndpoint
issues: Issue[]
diagnostics: Diagnostics[]
}
type NapiMiddleware = {
@ -702,8 +705,10 @@ function bindingToApi(binding: any, _wasm: boolean) {
}
)
const subscription = subscribe<NapiEntrypoints>(false, async (callback) =>
binding.projectEntrypointsSubscribe(await this._nativeProject, callback)
const subscription = subscribe<TurbopackResult<NapiEntrypoints>>(
false,
async (callback) =>
binding.projectEntrypointsSubscribe(this._nativeProject, callback)
)
return (async function* () {
for await (const entrypoints of subscription) {
@ -775,6 +780,15 @@ function bindingToApi(binding: any, _wasm: boolean) {
}
})()
}
hmrEvents(identifier: string) {
const subscription = subscribe<TurbopackResult<Update>>(
true,
async (callback) =>
binding.projectHmrEvents(this._nativeProject, identifier, callback)
)
return subscription
}
}
class EndpointImpl implements Endpoint {