Redesign the API of HTTP module

Make it async and more natural.
This commit is contained in:
Yuchen Wu 2024-05-17 14:30:15 -07:00 committed by Yuchen Wu
parent 479d9badbf
commit 2902dc5365
5 changed files with 100 additions and 37 deletions

2
.bleep
View file

@ -1 +1 @@
719b4dcafd471ecfa9e4ade3abfb217929afddc6
cccbc3b4d7229f92a0a7679accf2523bbe944e78

View file

@ -161,13 +161,16 @@ where
}
let mut module_ctx = self.modules.build_ctx();
let req = http.req_header_mut();
module_ctx.request_header_filter(req).ok()?;
module_ctx.request_header_filter(req).await.ok()?;
let new_response = self.app.response(&mut http).await;
let (parts, body) = new_response.into_parts();
let resp_header: ResponseHeader = parts.into();
let mut task = HttpTask::Header(Box::new(resp_header), body.is_empty());
module_ctx.response_filter(&mut task).ok()?;
let mut resp_header: ResponseHeader = parts.into();
module_ctx
.response_header_filter(&mut resp_header, body.is_empty())
.await
.ok()?;
let task = HttpTask::Header(Box::new(resp_header), body.is_empty());
trace!("{task:?}");
match http.response_duplex_vec(vec![task]).await {
@ -181,16 +184,14 @@ where
);
}
}
let mut task = if !body.is_empty() {
HttpTask::Body(Some(body.into()), true)
} else {
HttpTask::Body(None, true)
};
let mut body = Some(body.into());
module_ctx.response_body_filter(&mut body, true).ok()?;
let task = HttpTask::Body(body, true);
trace!("{task:?}");
module_ctx.response_filter(&mut task).ok()?;
// TODO: check if chunked encoding is needed
match http.response_duplex_vec(vec![task]).await {
Ok(_) => debug!("HTTP response written."),

View file

@ -20,6 +20,7 @@ use crate::protocols::http::compression::ResponseCompressionCtx;
/// HTTP response compression module
pub struct ResponseCompression(ResponseCompressionCtx);
#[async_trait]
impl HttpModule for ResponseCompression {
fn as_any(&self) -> &dyn std::any::Any {
self
@ -28,13 +29,30 @@ impl HttpModule for ResponseCompression {
self
}
fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
self.0.request_filter(req);
Ok(())
}
fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {
self.0.response_filter(t);
async fn response_header_filter(
&mut self,
resp: &mut ResponseHeader,
end_of_stream: bool,
) -> Result<()> {
self.0.response_header_filter(resp, end_of_stream);
Ok(())
}
fn response_body_filter(
&mut self,
body: &mut Option<Bytes>,
end_of_stream: bool,
) -> Result<()> {
if !self.0.is_enabled() {
return Ok(());
}
let compressed = self.0.response_body_filter(body.as_ref(), end_of_stream);
*body = compressed;
Ok(())
}
}

View file

@ -20,28 +20,44 @@
pub mod compression;
use crate::protocols::http::HttpTask;
use async_trait::async_trait;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use pingora_error::Result;
use pingora_http::RequestHeader;
use pingora_http::{RequestHeader, ResponseHeader};
use std::any::Any;
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;
/// The trait an HTTP traffic module needs to implement
// TODO: * async filters for, e.g., 3rd party auth server; * access the connection for, e.g., GeoIP
#[async_trait]
pub trait HttpModule {
fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
async fn request_header_filter(&mut self, _req: &mut RequestHeader) -> Result<()> {
Ok(())
}
fn request_body_filter(&mut self, body: Option<Bytes>) -> Result<Option<Bytes>> {
Ok(body)
async fn request_body_filter(
&mut self,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
) -> Result<()> {
Ok(())
}
fn response_filter(&mut self, _t: &mut HttpTask) -> Result<()> {
async fn response_header_filter(
&mut self,
_resp: &mut ResponseHeader,
_end_of_stream: bool,
) -> Result<()> {
Ok(())
}
fn response_body_filter(
&mut self,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
) -> Result<()> {
Ok(())
}
@ -168,25 +184,45 @@ impl HttpModuleCtx {
}
/// Run the `request_header_filter` for all the modules according to their orders.
pub fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
pub async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
for filter in self.module_ctx.iter_mut() {
filter.request_header_filter(req)?;
filter.request_header_filter(req).await?;
}
Ok(())
}
/// Run the `request_body_filter` for all the modules according to their orders.
pub fn request_body_filter(&mut self, mut body: Option<Bytes>) -> Result<Option<Bytes>> {
pub async fn request_body_filter(
&mut self,
body: &mut Option<Bytes>,
end_of_stream: bool,
) -> Result<()> {
for filter in self.module_ctx.iter_mut() {
body = filter.request_body_filter(body)?;
filter.request_body_filter(body, end_of_stream).await?;
}
Ok(body)
Ok(())
}
/// Run the `response_filter` for all the modules according to their orders.
pub fn response_filter(&mut self, t: &mut HttpTask) -> Result<()> {
/// Run the `response_header_filter` for all the modules according to their orders.
pub async fn response_header_filter(
&mut self,
req: &mut ResponseHeader,
end_of_stream: bool,
) -> Result<()> {
for filter in self.module_ctx.iter_mut() {
filter.response_filter(t)?;
filter.response_header_filter(req, end_of_stream).await?;
}
Ok(())
}
/// Run the `response_body_filter` for all the modules according to their orders.
pub fn response_body_filter(
&mut self,
body: &mut Option<Bytes>,
end_of_stream: bool,
) -> Result<()> {
for filter in self.module_ctx.iter_mut() {
filter.response_body_filter(body, end_of_stream)?;
}
Ok(())
}
@ -197,6 +233,7 @@ mod tests {
use super::*;
struct MyModule;
#[async_trait]
impl HttpModule for MyModule {
fn as_any(&self) -> &dyn Any {
self
@ -204,7 +241,7 @@ mod tests {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
req.insert_header("my-filter", "1")
}
}
@ -220,6 +257,7 @@ mod tests {
}
struct MyOtherModule;
#[async_trait]
impl HttpModule for MyOtherModule {
fn as_any(&self) -> &dyn Any {
self
@ -227,7 +265,7 @@ mod tests {
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
if req.headers.get("my-filter").is_some() {
// if this MyOtherModule runs after MyModule
req.insert_header("my-filter", "2")
@ -262,14 +300,14 @@ mod tests {
assert!(ctx.get_mut::<usize>().is_none());
}
#[test]
fn test_module_filter() {
#[tokio::test]
async fn test_module_filter() {
let mut http_module = HttpModules::new();
http_module.add_module(Box::new(MyOtherModuleBuilder));
http_module.add_module(Box::new(MyModuleBuilder));
let mut ctx = http_module.build_ctx();
let mut req = RequestHeader::build("Get", b"/", None).unwrap();
ctx.request_header_filter(&mut req).unwrap();
ctx.request_header_filter(&mut req).await.unwrap();
// MyModule runs before MyOtherModule
assert_eq!(req.headers.get("my-filter").unwrap(), "2");
assert!(req.headers.get("my-other-filter").is_none());

View file

@ -159,7 +159,11 @@ impl ResponseCompressionCtx {
}
}
fn response_header_filter(&mut self, resp: &mut ResponseHeader, end: bool) {
/// Feed the response header into this ctx
pub fn response_header_filter(&mut self, resp: &mut ResponseHeader, end: bool) {
if !self.is_enabled() {
return;
}
match &self.0 {
CtxInner::HeaderPhase {
compression_level,
@ -195,7 +199,8 @@ impl ResponseCompressionCtx {
}
}
fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
/// Stream the response body chunks into this ctx. The return value will be the compressed data
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
@ -222,6 +227,7 @@ impl ResponseCompressionCtx {
}
}
// TODO: retire this function, replace it with the two functions above
/// Feed the response into this ctx.
/// This filter will mutate the response accordingly if encoding is needed.
pub fn response_filter(&mut self, t: &mut HttpTask) {