h2c support

This commit is contained in:
Andrew Hauck 2024-06-10 10:23:48 -07:00 committed by Matthew (mbg)
parent 2d30077683
commit a1f1ad8a43
7 changed files with 126 additions and 55 deletions

2
.bleep
View file

@ -1 +1 @@
278b102db964d20ff55a372f876e09f9f19ca999
bc0d789447bc0704c6cf01210eaae7ba77c8dad9

View file

@ -53,6 +53,13 @@ pub trait ServerApp {
/// This callback will be called once after the service stops listening to its endpoints.
async fn cleanup(&self) {}
}
#[non_exhaustive]
#[derive(Default)]
/// HTTP Server options that control how the server handles some transport types.
pub struct HttpServerOptions {
/// Use HTTP/2 for plaintext.
pub h2c: bool,
}
/// This trait defines the interface of an HTTP application.
#[cfg_attr(not(doc_async_trait), async_trait)]
@ -77,6 +84,14 @@ pub trait HttpServerApp {
None
}
/// Provide HTTP server options used to override default behavior. This function will be called
/// every time a new connection is processed.
///
/// A `None` means no server options will be applied.
fn server_options(&self) -> Option<&HttpServerOptions> {
None
}
async fn http_cleanup(&self) {}
}
@ -90,54 +105,53 @@ where
stream: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
match stream.selected_alpn_proto() {
Some(ALPN::H2) => {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
// TODO: allow h2c and http/1.1 to co-exist
if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Err(e) => {
error!("H2 handshake error {e}");
return None;
}
Ok(c) => c,
};
loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
let h2_stream =
server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
error!("H2 handshake error {e}");
// It is common for the client to just disconnect TCP without properly
// closing H2. So we don't log the errors here
debug!("H2 error when accepting new stream {e}");
return None;
}
Ok(c) => c,
Ok(s) => s?, // None means the connection is ready to be closed
};
loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
let h2_stream =
server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
// It is common for the client to just disconnect TCP without properly
// closing H2. So we don't log the errors here
debug!("H2 error when accepting new stream {e}");
return None;
}
Ok(s) => s?, // None means the connection is ready to be closed
};
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
}
_ => {
// No ALPN or ALPN::H1 or something else, just try Http1
self.process_new_http(ServerSession::new_http1(stream), shutdown)
.await
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
} else {
// No ALPN or ALPN::H1 and h2c was not configured, fallback to HTTP/1.1
self.process_new_http(ServerSession::new_http1(stream), shutdown)
.await
}
}

View file

@ -52,7 +52,7 @@ use tokio::sync::{mpsc, Notify};
use tokio::time;
use pingora_cache::NoCacheReason;
use pingora_core::apps::HttpServerApp;
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::connectors::{http::Connector, ConnectorOptions};
use pingora_core::modules::http::compression::ResponseCompressionBuilder;
use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
@ -95,6 +95,7 @@ pub struct HttpProxy<SV> {
inner: SV, // TODO: name it better than inner
client_upstream: Connector,
shutdown: Notify,
pub server_options: Option<HttpServerOptions>,
pub downstream_modules: HttpModules,
}
@ -104,6 +105,7 @@ impl<SV> HttpProxy<SV> {
inner,
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
shutdown: Notify::new(),
server_options: None,
downstream_modules: HttpModules::new(),
}
}
@ -725,6 +727,10 @@ where
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
}
fn server_options(&self) -> Option<&HttpServerOptions> {
self.server_options.as_ref()
}
// TODO implement h2_options
}

View file

@ -18,7 +18,11 @@ use crate::proxy_common::*;
use pingora_core::protocols::http::v2::client::{write_body, Http2Session};
// add scheme and authority as required by h2 lib
fn update_h2_scheme_authority(header: &mut http::request::Parts, raw_host: &[u8]) -> Result<()> {
fn update_h2_scheme_authority(
header: &mut http::request::Parts,
raw_host: &[u8],
tls: bool,
) -> Result<()> {
let authority = if let Ok(s) = std::str::from_utf8(raw_host) {
if s.starts_with('[') {
// don't mess with ipv6 host
@ -43,8 +47,9 @@ fn update_h2_scheme_authority(header: &mut http::request::Parts, raw_host: &[u8]
);
};
let scheme = if tls { "https" } else { "http" };
let uri = http::uri::Builder::new()
.scheme("https")
.scheme(scheme)
.authority(authority)
.path_and_query(header.uri.path_and_query().as_ref().unwrap().as_str())
.build();
@ -123,7 +128,7 @@ impl<SV> HttpProxy<SV> {
// H2 requires authority to be set, so copy that from H1 host if that is set
if let Some(host) = host {
if let Err(e) = update_h2_scheme_authority(&mut req, host.as_bytes()) {
if let Err(e) = update_h2_scheme_authority(&mut req, host.as_bytes(), peer.is_tls()) {
return (false, Some(e));
}
}
@ -619,14 +624,20 @@ fn test_update_authority() {
.unwrap()
.into_parts()
.0;
update_h2_scheme_authority(&mut parts, b"example.com").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
assert_eq!("example.com", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:456").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com:456", true).unwrap();
assert_eq!("example.com:456", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com:", true).unwrap();
assert_eq!("example.com:", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:123:345").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com:123:345", true).unwrap();
assert_eq!("example.com:123", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"[::1]").unwrap();
update_h2_scheme_authority(&mut parts, b"[::1]", true).unwrap();
assert_eq!("[::1]", parts.uri.authority().unwrap());
// verify scheme
update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
assert_eq!("https://example.com", parts.uri);
update_h2_scheme_authority(&mut parts, b"example.com", false).unwrap();
assert_eq!("http://example.com", parts.uri);
}

View file

@ -14,7 +14,7 @@
mod utils;
use hyper::Client;
use hyper::{body::HttpBody, header::HeaderValue, Body, Client};
use hyperlocal::{UnixClientExt, Uri};
use reqwest::{header, StatusCode};
@ -143,6 +143,28 @@ async fn test_h2_to_h2() {
assert_eq!(body, "Hello World!\n");
}
#[tokio::test]
async fn test_h2c_to_h2c() {
init();
let client = hyper::client::Client::builder()
.http2_only(true)
.build_http();
let mut req = hyper::Request::builder()
.uri("http://127.0.0.1:6146")
.body(Body::empty())
.unwrap();
req.headers_mut()
.insert("x-h2", HeaderValue::from_bytes(b"true").unwrap());
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
let body = res.into_body().data().await.unwrap().unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
}
#[tokio::test]
async fn test_h2_to_h2_host_override() {
init();

View file

@ -80,7 +80,7 @@ http {
}
server {
listen 8000;
listen 8000 http2;
# 8001 is used for bad_lb test only to avoid unexpected connection reuse
listen 8001;
listen [::]:8000;

View file

@ -26,6 +26,7 @@ use pingora_cache::{
set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason,
RespCacheable,
};
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::modules::http::compression::ResponseCompression;
use pingora_core::protocols::{l4::socket::SocketAddr, Digest};
use pingora_core::server::configuration::Opt;
@ -270,11 +271,18 @@ impl ProxyHttp for ExampleProxyHttp {
.headers
.get("x-port")
.map_or("8000", |v| v.to_str().unwrap());
let peer = Box::new(HttpPeer::new(
let mut peer = Box::new(HttpPeer::new(
format!("127.0.0.1:{port}"),
false,
"".to_string(),
));
if session.get_header_bytes("x-h2") == b"true" {
// default is 1, 1
peer.options.set_http_version(2, 2);
}
Ok(peer)
}
@ -502,6 +510,15 @@ fn test_main() {
proxy_service_http.add_tcp("0.0.0.0:6147");
proxy_service_http.add_uds("/tmp/pingora_proxy.sock", None);
let mut proxy_service_h2c =
pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttp {});
let http_logic = proxy_service_h2c.app_logic_mut().unwrap();
let mut http_server_options = HttpServerOptions::default();
http_server_options.h2c = true;
http_logic.server_options = Some(http_server_options);
proxy_service_h2c.add_tcp("0.0.0.0:6146");
let mut proxy_service_https =
pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttps {});
proxy_service_https.add_tcp("0.0.0.0:6149");
@ -517,6 +534,7 @@ fn test_main() {
proxy_service_cache.add_tcp("0.0.0.0:6148");
let services: Vec<Box<dyn Service>> = vec![
Box::new(proxy_service_h2c),
Box::new(proxy_service_http),
Box::new(proxy_service_https),
Box::new(proxy_service_cache),