Add purge_response callback

Allow generating custom responses to purge requests (requests to
invalidate or delete from the HTTP cache).
This commit is contained in:
ewang 2024-04-03 11:16:15 -07:00 committed by Yuchen Wu
parent b493ac0c95
commit cfb89c6eb5
5 changed files with 94 additions and 43 deletions

2
.bleep
View file

@ -1 +1 @@
64c5e3fa4a538348e25a1ac5fe18245fc6d1eefc
b10bac2a769c0b0307110f10280c9518705512dd

View file

@ -79,6 +79,7 @@ mod subrequest;
use subrequest::Ctx as SubReqCtx;
pub use proxy_purge::PurgeStatus;
pub use proxy_trait::ProxyHttp;
pub mod prelude {

View file

@ -62,11 +62,7 @@ impl<SV> HttpProxy<SV> {
// cache purge logic: PURGE short-circuits rest of request
if self.inner.is_purge(session, ctx) {
if session.cache.enabled() {
return self.proxy_purge(session, ctx).await;
} else {
return Some(proxy_purge::write_no_purge_response(session).await);
}
return self.proxy_purge(session, ctx).await;
}
// bypass cache lookup if we predict to be uncacheable

View file

@ -13,6 +13,33 @@
// limitations under the License.
use super::*;
use pingora_core::protocols::http::error_resp;
use std::borrow::Cow;
#[derive(Debug)]
pub enum PurgeStatus {
/// Cache was not enabled, purge ineffectual.
NoCache,
/// Asset was found in cache (and presumably purged or being purged).
Found,
/// Asset was not found in cache.
NotFound,
/// Cache returned a purge error.
/// Contains causing error in case it should affect the downstream response.
Error(Box<Error>),
}
// Return a canned response to a purge request, based on whether the cache had the asset or not
// (or otherwise returned an error).
fn purge_response(purge_status: &PurgeStatus) -> Cow<'static, ResponseHeader> {
let resp = match purge_status {
PurgeStatus::NoCache => &*NOT_PURGEABLE,
PurgeStatus::Found => &*OK,
PurgeStatus::NotFound => &*NOT_FOUND,
PurgeStatus::Error(ref _e) => &*INTERNAL_ERROR,
};
Cow::Borrowed(resp)
}
fn gen_purge_response(code: u16) -> ResponseHeader {
let mut resp = ResponseHeader::build(code, Some(3)).unwrap();
@ -25,27 +52,12 @@ fn gen_purge_response(code: u16) -> ResponseHeader {
resp
}
async fn write_purge_response(
session: &mut Session,
resp: &ResponseHeader,
) -> (bool, Option<Box<Error>>) {
match session.as_mut().write_response_header_ref(resp).await {
Ok(_) => (true, None),
// dirty, not reusable
Err(e) => (false, Some(e.into_down())),
}
}
/// Write a response for a rejected cache purge requests
pub async fn write_no_purge_response(session: &mut Session) -> (bool, Option<Box<Error>>) {
// TODO: log send error
write_purge_response(session, &NOT_PURGEABLE).await
}
static OK: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(200));
static NOT_FOUND: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(404));
// for when purge is sent to uncacheable assets
static NOT_PURGEABLE: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(405));
// on cache storage or proxy error
static INTERNAL_ERROR: Lazy<ResponseHeader> = Lazy::new(|| error_resp::gen_error_response(500));
impl<SV> HttpProxy<SV> {
pub(crate) async fn proxy_purge(
@ -57,31 +69,57 @@ impl<SV> HttpProxy<SV> {
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
match session.cache.purge().await {
Ok(found) => {
// canned PURGE response based on whether we found the asset or not
let resp = if found { &*OK } else { &*NOT_FOUND };
let (reuse, err) = write_purge_response(session, resp).await;
if let Some(e) = err.as_ref() {
error!(
"Failed to send purge response: {}, {}",
e,
self.inner.request_summary(session, ctx)
)
let purge_status = if session.cache.enabled() {
match session.cache.purge().await {
Ok(found) => {
if found {
PurgeStatus::Found
} else {
PurgeStatus::NotFound
}
}
Err(e) => {
session.cache.disable(NoCacheReason::StorageError);
warn!(
"Fail to purge cache: {e}, {}",
self.inner.request_summary(session, ctx)
);
PurgeStatus::Error(e)
}
Some((reuse, err))
}
} else {
// cache was not enabled
PurgeStatus::NoCache
};
let mut purge_resp = purge_response(&purge_status);
if let Err(e) =
self.inner
.purge_response_filter(session, ctx, purge_status, &mut purge_resp)
{
error!(
"Failed purge response filter: {e}, {}",
self.inner.request_summary(session, ctx)
);
purge_resp = Cow::Borrowed(&*INTERNAL_ERROR)
}
let write_result = match purge_resp {
Cow::Borrowed(r) => session.as_mut().write_response_header_ref(r).await,
Cow::Owned(r) => session.as_mut().write_response_header(Box::new(r)).await,
};
let (reuse, err) = match write_result {
Ok(_) => (true, None),
// dirty, not reusable
Err(e) => {
session.cache.disable(NoCacheReason::StorageError);
warn!(
"Fail to purge cache: {}, {}",
e,
let e = e.into_down();
error!(
"Failed to send purge response: {e}, {}",
self.inner.request_summary(session, ctx)
);
session.downstream_session.respond_error(500).await;
// still reusable
Some((true, Some(e)))
(false, Some(e))
}
}
};
Some((reuse, err))
}
}

View file

@ -400,4 +400,20 @@ pub trait ProxyHttp {
fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
false
}
/// This filter is called after the proxy cache generates the downstream response to the purge
/// request (to invalidate or delete from the HTTP cache), based on the purge status, which
/// indicates whether the request succeeded or failed.
///
/// The filter allows the user to modify or replace the generated downstream response.
/// If the filter returns `Err`, the proxy will instead send a 500 response.
fn purge_response_filter(
&self,
_session: &Session,
_ctx: &mut Self::CTX,
_purge_status: PurgeStatus,
_purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
) -> Result<()> {
Ok(())
}
}