Run upstream_response_filter on 304 from upstream cache revalidation

Previously only the response_filter would run on revalidated responses
with no way to process the upstream 304.
Also avoid passing through upstream 304 on revalidate
response_cache_filter errors and instead serve from cache. This
simplifies this edge case because it saves us from needing to check
whether the upstream header filter already ran on the 304 later.
This commit is contained in:
ewang 2024-02-23 17:46:04 -08:00 committed by Yuchen Wu
parent fa0ee75740
commit e811382224
6 changed files with 51 additions and 15 deletions

2
.bleep
View file

@ -1 +1 @@
553d7cabaf8ea7a28c5cd5464a53dd5edbb5083a
fec326a416d91d8d1d5b7a165ba371c478c1e278

View file

@ -87,7 +87,7 @@ This phase is to modify requests before sending to upstream.
### `upstream_response_filter()/upstream_response_body_filter()`
This phase is triggered after an upstream response header/body is received.
This phase is to modify response headers (or body) before sending to downstream. Note that this phase is called _prior_ to HTTP caching and therefore any changes made here will affect the response stored in the HTTP cache.
This phase is to modify or process response headers (or body) before sending to downstream. Note that this phase is called _prior_ to HTTP caching and therefore any changes made here will affect the response stored in the HTTP cache.
### `response_filter()/response_body_filter()/response_trailer_filter()`
This phase is triggered after a response header/body/trailer is ready to send to downstream.

View file

@ -505,6 +505,8 @@ impl<SV> HttpProxy<SV> {
HttpTask::Header(resp, _eos) => {
if resp.status == StatusCode::NOT_MODIFIED {
if session.cache.maybe_cache_meta().is_some() {
// run upstream response filters on upstream 304 first
self.inner.upstream_response_filter(session, resp, ctx);
// 304 doesn't contain all the headers, merge 304 into cached 200 header
// in order for response_cache_filter to run correctly
let merged_header = session.cache.revalidate_merge_header(resp);
@ -525,11 +527,10 @@ impl<SV> HttpProxy<SV> {
meta.set_variance(old_variance);
}
if let Err(e) = session.cache.revalidate_cache_meta(meta).await {
warn!("revalidate_cache_meta failed {:?}", e);
// Fail open: we can continue use the revalidated response even
// if the meta failed to write to storage
warn!("revalidate_cache_meta failed {e:?}");
}
// We can continue use the revalidated one even the meta was failed
// to write to storage
true
}
Ok(Uncacheable(reason)) => {
// This response was once cacheable, and upstream tells us it has not changed
@ -544,17 +545,24 @@ impl<SV> HttpProxy<SV> {
// (downstream may have a different cacheability assessment and could cache the 304)
//TODO: log more
warn!("Uncacheable {:?} 304 received", reason);
warn!("Uncacheable {reason:?} 304 received");
session.cache.response_became_uncacheable(reason);
session.cache.revalidate_uncacheable(merged_header, reason);
true
}
Err(e) => {
warn!("Error {:?} response_cache_filter during revalidation, disable caching", e);
session.cache.disable(NoCacheReason::InternalError);
false
// Error during revalidation, similarly to the reasons above
// (avoid poisoning downstream cache with passthrough 304),
// allow serving the stored response without updating cache
warn!("Error {e:?} response_cache_filter during revalidation");
session.cache.revalidate_uncacheable(
merged_header,
NoCacheReason::InternalError,
);
// Assume the next 304 may succeed, so don't mark uncacheable
}
}
// always serve from cache after receiving the 304
true
} else {
//TODO: log more
warn!("304 received without cached asset, disable caching");

View file

@ -160,7 +160,9 @@ pub trait ProxyHttp {
///
/// The modification is before caching so any change here will be stored in cache if enabled.
///
/// Responses served from cache won't trigger this filter.
/// Responses served from cache won't trigger this filter. If the cache needed revalidation,
/// only the 304 from upstream will trigger the filter (though it will be merged into the
/// cached header, not served directly to downstream).
fn upstream_response_filter(
&self,
_session: &mut Session,

View file

@ -305,6 +305,7 @@ mod test_cache {
let headers = res.headers();
let cache_miss_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap();
assert_eq!(headers["x-cache-status"], "miss");
assert_eq!(headers["x-upstream-status"], "200");
assert_eq!(res.text().await.unwrap(), "hello world");
let res = reqwest::get(url).await.unwrap();
@ -312,6 +313,7 @@ mod test_cache {
let headers = res.headers();
let cache_hit_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap();
assert_eq!(headers["x-cache-status"], "hit");
assert!(headers.get("x-upstream-status").is_none());
assert_eq!(res.text().await.unwrap(), "hello world");
assert_eq!(cache_miss_epoch, cache_hit_epoch);
@ -323,6 +325,7 @@ mod test_cache {
let headers = res.headers();
let cache_expired_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap();
assert_eq!(headers["x-cache-status"], "revalidated");
assert_eq!(headers["x-upstream-status"], "304");
assert_eq!(res.text().await.unwrap(), "hello world");
// still the old object

View file

@ -197,12 +197,21 @@ static EVICTION_MANAGER: Lazy<Manager> = Lazy::new(|| Manager::new(8192)); // 81
static CACHE_LOCK: Lazy<CacheLock> =
Lazy::new(|| CacheLock::new(std::time::Duration::from_secs(2)));
// #[allow(clippy::upper_case_acronyms)]
pub struct CacheCTX {
upstream_status: Option<u16>,
}
pub struct ExampleProxyCache {}
#[async_trait]
impl ProxyHttp for ExampleProxyCache {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
type CTX = CacheCTX;
fn new_ctx(&self) -> Self::CTX {
CacheCTX {
upstream_status: None,
}
}
async fn upstream_peer(
&self,
@ -282,11 +291,22 @@ impl ProxyHttp for ExampleProxyCache {
Ok(resp_cacheable(cc.as_ref(), resp, false, &CACHE_DEFAULT))
}
fn upstream_response_filter(
&self,
_session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) where
Self::CTX: Send + Sync,
{
ctx.upstream_status = Some(upstream_response.status.into());
}
async fn response_filter(
&self,
session: &mut Session,
upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
@ -315,6 +335,9 @@ impl ProxyHttp for ExampleProxyCache {
if let Some(d) = session.cache.lock_duration() {
upstream_response.insert_header("x-cache-lock-time-ms", format!("{}", d.as_millis()))?
}
if let Some(up_stat) = ctx.upstream_status {
upstream_response.insert_header("x-upstream-status", up_stat.to_string())?;
}
Ok(())
}