Add support for downstream h2 trailers and add an upstream h2 response trailer filter

This commit is contained in:
Andrew Hauck 2024-04-12 16:56:57 -07:00 committed by Kevin Guthrie
parent 93ad08ea3e
commit a6bd816f16
7 changed files with 81 additions and 17 deletions

2
.bleep
View file

@ -1 +1 @@
54aea7e241fff10bb31cf067348afb8c139fa39c 892caa6218fc1fd7c5e3f2a2b0d22365ee4c3166

View file

@ -20,8 +20,8 @@ use super::v2::server::HttpSession as SessionV2;
use super::HttpTask; use super::HttpTask;
use crate::protocols::{Digest, SocketAddr, Stream}; use crate::protocols::{Digest, SocketAddr, Stream};
use bytes::Bytes; use bytes::Bytes;
use http::header::AsHeaderName;
use http::HeaderValue; use http::HeaderValue;
use http::{header::AsHeaderName, HeaderMap};
use log::error; use log::error;
use pingora_error::Result; use pingora_error::Result;
use pingora_http::{RequestHeader, ResponseHeader}; use pingora_http::{RequestHeader, ResponseHeader};
@ -141,6 +141,14 @@ impl Session {
} }
} }
/// Write the response trailers to client
pub async fn write_response_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
match self {
Self::H1(_) => Ok(()), // TODO: support trailers for h1
Self::H2(s) => s.write_trailers(trailers),
}
}
/// Finish the life of this request. /// Finish the life of this request.
/// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None. /// For H1, if connection reuse is supported, a Some(Stream) will be returned, otherwise None.
/// For H2, always return None because H2 stream is not reusable. /// For H2, always return None because H2 stream is not reusable.

View file

@ -20,7 +20,7 @@ use h2::server;
use h2::server::SendResponse; use h2::server::SendResponse;
use h2::{RecvStream, SendStream}; use h2::{RecvStream, SendStream};
use http::header::HeaderName; use http::header::HeaderName;
use http::{header, Response}; use http::{header, HeaderMap, Response};
use log::{debug, warn}; use log::{debug, warn};
use pingora_http::{RequestHeader, ResponseHeader}; use pingora_http::{RequestHeader, ResponseHeader};
use std::sync::Arc; use std::sync::Arc;
@ -256,6 +256,27 @@ impl HttpSession {
Ok(()) Ok(())
} }
/// Write response trailers to the client, this also closes the stream.
pub fn write_trailers(&mut self, trailers: HeaderMap) -> Result<()> {
if self.ended {
warn!("Tried to write trailers after end of stream, dropping them");
return Ok(());
}
let Some(writer) = self.send_response_body.as_mut() else {
return Err(Error::explain(
ErrorType::H2Error,
"try to send trailers before header is sent",
));
};
writer.send_trailers(trailers).or_err(
ErrorType::WriteError,
"while writing h2 response trailers to downstream",
)?;
// sending trailers closes the stream
self.ended = true;
Ok(())
}
/// Similar to [Self::write_response_header], this function takes a reference instead /// Similar to [Self::write_response_header], this function takes a reference instead
pub fn write_response_header_ref(&mut self, header: &ResponseHeader, end: bool) -> Result<()> { pub fn write_response_header_ref(&mut self, header: &ResponseHeader, end: bool) -> Result<()> {
self.write_response_header(Box::new(header.clone()), end) self.write_response_header(Box::new(header.clone()), end)
@ -305,7 +326,11 @@ impl HttpSession {
} }
None => end, None => end,
}, },
HttpTask::Trailer(_) => true, // trailer is not supported yet HttpTask::Trailer(Some(trailers)) => {
self.write_trailers(*trailers)?;
true
}
HttpTask::Trailer(None) => true,
HttpTask::Done => { HttpTask::Done => {
self.finish().map_err(|e| e.into_down())?; self.finish().map_err(|e| e.into_down())?;
return Ok(true); return Ok(true);
@ -442,7 +467,7 @@ impl HttpSession {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use http::{Method, Request}; use http::{HeaderValue, Method, Request};
use tokio::io::duplex; use tokio::io::duplex;
#[tokio::test] #[tokio::test]
@ -451,6 +476,10 @@ mod test {
let client_body = "test client body"; let client_body = "test client body";
let server_body = "test server body"; let server_body = "test server body";
let mut expected_trailers = HeaderMap::new();
expected_trailers.insert("test", HeaderValue::from_static("trailers"));
let trailers = expected_trailers.clone();
tokio::spawn(async move { tokio::spawn(async move {
let (h2, connection) = h2::client::handshake(client).await.unwrap(); let (h2, connection) = h2::client::handshake(client).await.unwrap();
tokio::spawn(async move { tokio::spawn(async move {
@ -473,6 +502,8 @@ mod test {
assert_eq!(head.status, 200); assert_eq!(head.status, 200);
let data = body.data().await.unwrap().unwrap(); let data = body.data().await.unwrap().unwrap();
assert_eq!(data, server_body); assert_eq!(data, server_body);
let resp_trailers = body.trailers().await.unwrap().unwrap();
assert_eq!(resp_trailers, expected_trailers);
}); });
let mut connection = handshake(Box::new(server), None).await.unwrap(); let mut connection = handshake(Box::new(server), None).await.unwrap();
@ -482,6 +513,7 @@ mod test {
.await .await
.unwrap() .unwrap()
{ {
let trailers = trailers.clone();
tokio::spawn(async move { tokio::spawn(async move {
let req = http.req_header(); let req = http.req_header();
assert_eq!(req.method, Method::GET); assert_eq!(req.method, Method::GET);
@ -519,6 +551,7 @@ mod test {
http.write_body(server_body.into(), false).unwrap(); http.write_body(server_body.into(), false).unwrap();
assert_eq!(http.body_bytes_sent(), 16); assert_eq!(http.body_bytes_sent(), 16);
http.write_trailers(trailers).unwrap();
http.finish().unwrap(); http.finish().unwrap();
}); });
} }

View file

@ -222,7 +222,12 @@ impl<SV> HttpProxy<SV> {
} }
} }
fn upstream_filter(&self, session: &mut Session, task: &mut HttpTask, ctx: &mut SV::CTX) fn upstream_filter(
&self,
session: &mut Session,
task: &mut HttpTask,
ctx: &mut SV::CTX,
) -> Result<()>
where where
SV: ProxyHttp, SV: ProxyHttp,
{ {
@ -233,10 +238,14 @@ impl<SV> HttpProxy<SV> {
HttpTask::Body(data, eos) => self HttpTask::Body(data, eos) => self
.inner .inner
.upstream_response_body_filter(session, data, *eos, ctx), .upstream_response_body_filter(session, data, *eos, ctx),
HttpTask::Trailer(Some(trailers)) => self
.inner
.upstream_response_trailer_filter(session, trailers, ctx)?,
_ => { _ => {
// TODO: add other upstream filter traits // task does not support a filter
} }
} }
Ok(())
} }
async fn finish( async fn finish(

View file

@ -431,7 +431,7 @@ impl<SV> HttpProxy<SV> {
{ {
// skip caching if already served from cache // skip caching if already served from cache
if !from_cache { if !from_cache {
self.upstream_filter(session, &mut task, ctx); self.upstream_filter(session, &mut task, ctx)?;
// cache the original response before any downstream transformation // cache the original response before any downstream transformation
// requests that bypassed cache still need to run filters to see if the response has become cacheable // requests that bypassed cache still need to run filters to see if the response has become cacheable
@ -515,7 +515,7 @@ impl<SV> HttpProxy<SV> {
} }
Ok(HttpTask::Body(data, end)) Ok(HttpTask::Body(data, end))
} }
HttpTask::Trailer(h) => Ok(HttpTask::Trailer(h)), // no h1 trailer filter yet HttpTask::Trailer(h) => Ok(HttpTask::Trailer(h)), // TODO: support trailers for h1
HttpTask::Done => Ok(task), HttpTask::Done => Ok(task),
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
} }

View file

@ -381,7 +381,7 @@ impl<SV> HttpProxy<SV> {
SV::CTX: Send + Sync, SV::CTX: Send + Sync,
{ {
if !from_cache { if !from_cache {
self.upstream_filter(session, &mut task, ctx); self.upstream_filter(session, &mut task, ctx)?;
// cache the original response before any downstream transformation // cache the original response before any downstream transformation
// requests that bypassed cache still need to run filters to see if the response has become cacheable // requests that bypassed cache still need to run filters to see if the response has become cacheable
@ -457,18 +457,18 @@ impl<SV> HttpProxy<SV> {
.inner .inner
.response_body_filter(session, &mut data, eos, ctx)? .response_body_filter(session, &mut data, eos, ctx)?
{ {
trace!("delaying response for {:?}", duration); trace!("delaying response for {duration:?}");
time::sleep(duration).await; time::sleep(duration).await;
} }
Ok(HttpTask::Body(data, eos)) Ok(HttpTask::Body(data, eos))
} }
HttpTask::Trailer(header_map) => { HttpTask::Trailer(mut trailers) => {
let trailer_buffer = match header_map { let trailer_buffer = match trailers.as_mut() {
Some(mut trailer_map) => { Some(trailers) => {
debug!("Parsing response trailers.."); debug!("Parsing response trailers..");
match self match self
.inner .inner
.response_trailer_filter(session, &mut trailer_map, ctx) .response_trailer_filter(session, trailers, ctx)
.await .await
{ {
Ok(buf) => buf, Ok(buf) => buf,
@ -490,7 +490,7 @@ impl<SV> HttpProxy<SV> {
// https://http2.github.io/http2-spec/#malformed // https://http2.github.io/http2-spec/#malformed
Ok(HttpTask::Body(Some(buffer), true)) Ok(HttpTask::Body(Some(buffer), true))
} else { } else {
Ok(HttpTask::Done) Ok(HttpTask::Trailer(trailers))
} }
} }
HttpTask::Done => Ok(task), HttpTask::Done => Ok(task),

View file

@ -221,6 +221,16 @@ pub trait ProxyHttp {
) { ) {
} }
/// Similar to [Self::upstream_response_filter()] but for response trailers
fn upstream_response_trailer_filter(
&self,
_session: &mut Session,
_upstream_trailers: &mut header::HeaderMap,
_ctx: &mut Self::CTX,
) -> Result<()> {
Ok(())
}
/// Similar to [Self::response_filter()] but for response body chunks /// Similar to [Self::response_filter()] but for response body chunks
fn response_body_filter( fn response_body_filter(
&self, &self,
@ -235,7 +245,11 @@ pub trait ProxyHttp {
Ok(None) Ok(None)
} }
/// When a trailer is received. /// Similar to [Self::response_filter()] but for response trailers.
/// Note, returning an Ok(Some(Bytes)) will result in the downstream response
/// trailers being written to the response body.
///
/// TODO: make this interface more intuitive
async fn response_trailer_filter( async fn response_trailer_filter(
&self, &self,
_session: &mut Session, _session: &mut Session,