Add request body filter

This commit is contained in:
Yuchen Wu 2024-05-13 15:39:21 -07:00 committed by Edward Wang
parent 34b2a35d7b
commit f38f3b9a38
4 changed files with 123 additions and 61 deletions

2
.bleep
View file

@ -1 +1 @@
2c9d4c55853235e908a1acd20454ebe7b979d246
1952c0cbc08e1cef0a5ed280ce55b17e7066e49d

View file

@ -223,7 +223,14 @@ impl<SV> HttpProxy<SV> {
.reserve()
.await
.or_err(InternalError, "reserving body pipe")?;
send_body_to_pipe(buffer, downstream_state.is_done(), send_permit).await;
self.send_body_to_pipe(
session,
buffer,
downstream_state.is_done(),
send_permit,
ctx,
)
.await?;
}
let mut response_state = ResponseStateMachine::new();
@ -288,12 +295,15 @@ impl<SV> HttpProxy<SV> {
response_state.maybe_set_upstream_done(true);
}
// TODO: consider just drain this if serve_from_cache is set
let request_done = send_body_to_pipe(
let is_body_done = session.is_body_done();
let request_done = self.send_body_to_pipe(
session,
body,
session.is_body_done(),
is_body_done,
send_permit.unwrap(), // safe because we checked is_ok()
ctx,
)
.await;
.await?;
downstream_state.maybe_finished(request_done);
},
@ -520,30 +530,48 @@ impl<SV> HttpProxy<SV> {
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
}
}
}
// TODO:: use this function to replace send_body_to2
pub(crate) async fn send_body_to_pipe(
data: Option<Bytes>,
end_of_body: bool,
tx: mpsc::Permit<'_, HttpTask>,
) -> bool {
match data {
Some(data) => {
debug!("Read {} bytes body from downstream", data.len());
if data.is_empty() && !end_of_body {
/* it is normal to get 0 bytes because of multi-chunk
* don't write 0 bytes to downstream since it will be
* misread as the terminating chunk */
return false;
}
tx.send(HttpTask::Body(Some(data), end_of_body));
end_of_body
}
None => {
tx.send(HttpTask::Body(None, true));
true
// TODO:: use this function to replace send_body_to2
async fn send_body_to_pipe(
&self,
session: &mut Session,
mut data: Option<Bytes>,
end_of_body: bool,
tx: mpsc::Permit<'_, HttpTask>,
ctx: &mut SV::CTX,
) -> Result<bool>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
// None: end of body
// this var is to signal if downstream finish sending the body, which shouldn't be
// affected by the request_body_filter
let end_of_body = end_of_body || data.is_none();
self.inner
.request_body_filter(session, &mut data, end_of_body, ctx)
.await?;
// the flag to signal to upstream
let upstream_end_of_body = end_of_body || data.is_none();
/* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to
* output anything yet.
* Don't write 0 bytes to the network since it will be
* treated as the terminating chunk */
if !upstream_end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
return Ok(false);
}
debug!(
"Read {} bytes body from downstream",
data.as_ref().map_or(-1, |d| d.len() as isize)
);
tx.send(HttpTask::Body(data, upstream_end_of_body));
Ok(end_of_body)
}
}

View file

@ -223,7 +223,14 @@ impl<SV> HttpProxy<SV> {
// retry, send buffer if it exists
if let Some(buffer) = session.as_mut().get_retry_buffer() {
send_body_to2(Ok(Some(buffer)), downstream_state.is_done(), client_body)?;
self.send_body_to2(
session,
Some(buffer),
downstream_state.is_done(),
client_body,
ctx,
)
.await?;
}
let mut response_state = ResponseStateMachine::new();
@ -260,7 +267,10 @@ impl<SV> HttpProxy<SV> {
}
}
};
let request_done = send_body_to2(Ok(body), session.is_body_done(), client_body)?;
let is_body_done = session.is_body_done();
let request_done =
self.send_body_to2(session, body, is_body_done, client_body, ctx)
.await?;
downstream_state.maybe_finished(request_done);
},
@ -497,38 +507,40 @@ impl<SV> HttpProxy<SV> {
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
}
}
}
pub(crate) fn send_body_to2(
data: Result<Option<Bytes>>,
end_of_body: bool,
client_body: &mut h2::SendStream<bytes::Bytes>,
) -> Result<bool> {
match data {
Ok(res) => match res {
Some(data) => {
let data_len = data.len();
debug!(
"Read {} bytes body from downstream, body end: {}",
data_len, end_of_body
);
if data_len == 0 && !end_of_body {
/* it is normal to get 0 bytes because of multi-chunk parsing */
return Ok(false);
}
write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
debug!("Write {} bytes body to h2 upstream", data_len);
Ok(end_of_body)
}
None => {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
debug!("Write END_STREAM to h2 upstream");
Ok(true)
}
},
Err(e) => e.into_down().into_err(),
async fn send_body_to2(
&self,
session: &mut Session,
mut data: Option<Bytes>,
end_of_body: bool,
client_body: &mut h2::SendStream<bytes::Bytes>,
ctx: &mut SV::CTX,
) -> Result<bool>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
self.inner
.request_body_filter(session, &mut data, end_of_body, ctx)
.await?;
/* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter.
* Although there is no harm writing empty byte to h2, unlike h1, we ignore it
* for consistency */
if !end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
return Ok(false);
}
if let Some(data) = data {
debug!("Write {} bytes body to h2 upstream", data.len());
write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
} else {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
}
Ok(end_of_body)
}
}

View file

@ -14,6 +14,7 @@
use super::*;
use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
use std::time::Duration;
/// The interface to control the HTTP proxy
///
@ -55,6 +56,27 @@ pub trait ProxyHttp {
Ok(false)
}
/// Handle the incoming request body.
///
/// This function will be called every time a piece of request body is received. The `body` is
/// **not the entire request body**.
///
/// The async nature of this function allows to throttle the upload speed and/or executing
/// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
/// who process the requests themselves.
async fn request_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
/// This filter decides if the request is cacheable and what cache backend to use
///
/// The caller can interact with `Session.cache` to enable caching.
@ -238,7 +260,7 @@ pub trait ProxyHttp {
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<Option<std::time::Duration>>
) -> Result<Option<Duration>>
where
Self::CTX: Send + Sync,
{