Merge branch 'main' into compression-headers

This commit is contained in:
Wladimir Palant 2024-06-21 19:05:11 +02:00 committed by GitHub
commit 973d5969e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 362 additions and 182 deletions

2
.bleep
View file

@ -1 +1 @@
a73e00f51bc6643e93abedc2a763d38122ffb21d
f70d8b77a4085cbe11b9559317f6d6e7e49914db

View file

@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
# TODO: add nightly
toolchain: [stable, 1.72]
toolchain: [1.78, 1.72]
runs-on: ubuntu-latest
# Only run on "pull_request" event for external PRs. This is to avoid
# duplicate builds for PRs created from internal branches.

View file

@ -40,6 +40,8 @@ clap = { version = "3.2.25", features = ["derive"] }
once_cell = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8"
strum = "0.26.2"
strum_macros = "0.26.2"
libc = "0.2.70"
chrono = { version = "~0.4.31", features = ["alloc"], default-features = false }
thread_local = "1.0"

View file

@ -53,6 +53,13 @@ pub trait ServerApp {
/// This callback will be called once after the service stops listening to its endpoints.
async fn cleanup(&self) {}
}
#[non_exhaustive]
#[derive(Default)]
/// HTTP Server options that control how the server handles some transport types.
pub struct HttpServerOptions {
/// Use HTTP/2 for plaintext.
pub h2c: bool,
}
/// This trait defines the interface of an HTTP application.
#[cfg_attr(not(doc_async_trait), async_trait)]
@ -77,6 +84,14 @@ pub trait HttpServerApp {
None
}
/// Provide HTTP server options used to override default behavior. This function will be called
/// every time a new connection is processed.
///
/// A `None` means no server options will be applied.
fn server_options(&self) -> Option<&HttpServerOptions> {
None
}
async fn http_cleanup(&self) {}
}
@ -90,54 +105,53 @@ where
stream: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
match stream.selected_alpn_proto() {
Some(ALPN::H2) => {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
// TODO: allow h2c and http/1.1 to co-exist
if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Err(e) => {
error!("H2 handshake error {e}");
return None;
}
Ok(c) => c,
};
loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
let h2_stream =
server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
error!("H2 handshake error {e}");
// It is common for the client to just disconnect TCP without properly
// closing H2. So we don't log the errors here
debug!("H2 error when accepting new stream {e}");
return None;
}
Ok(c) => c,
Ok(s) => s?, // None means the connection is ready to be closed
};
loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
let h2_stream =
server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
// It is common for the client to just disconnect TCP without properly
// closing H2. So we don't log the errors here
debug!("H2 error when accepting new stream {e}");
return None;
}
Ok(s) => s?, // None means the connection is ready to be closed
};
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
}
_ => {
// No ALPN or ALPN::H1 or something else, just try Http1
self.process_new_http(ServerSession::new_http1(stream), shutdown)
.await
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
} else {
// No ALPN or ALPN::H1 and h2c was not configured, fallback to HTTP/1.1
self.process_new_http(ServerSession::new_http1(stream), shutdown)
.await
}
}

View file

@ -69,7 +69,7 @@ pub struct TcpSocketOptions {
/// IPV6_V6ONLY flag (if true, limit socket to IPv6 communication only).
/// This is mostly useful when binding to `[::]`, which on most Unix distributions
/// will bind to both IPv4 and IPv6 addresses by default.
pub ipv6_only: bool,
pub ipv6_only: Option<bool>,
/// Enable TCP fast open and set the backlog size of it.
/// See the [man page](https://man7.org/linux/man-pages/man7/tcp.7.html) for more information.
pub tcp_fastopen: Option<usize>,
@ -140,10 +140,10 @@ fn apply_tcp_socket_options(sock: &TcpSocket, opt: Option<&TcpSocketOptions>) ->
let Some(opt) = opt else {
return Ok(());
};
if opt.ipv6_only {
if let Some(ipv6_only) = opt.ipv6_only {
let socket_ref = socket2::SockRef::from(sock);
socket_ref
.set_only_v6(opt.ipv6_only)
.set_only_v6(ipv6_only)
.or_err(BindError, "failed to set IPV6_V6ONLY")?;
}
@ -318,7 +318,7 @@ mod test {
#[tokio::test]
async fn test_listen_tcp_ipv6_only() {
let sock_opt = Some(TcpSocketOptions {
ipv6_only: true,
ipv6_only: Some(true),
..Default::default()
});
let mut listener = ListenerEndpoint::new(ServerAddress::Tcp("[::]:7101".into(), sock_opt));

View file

@ -19,11 +19,15 @@
use super::HttpTask;
use bytes::Bytes;
use http::header::ACCEPT_RANGES;
use log::warn;
use pingora_error::{ErrorType, Result};
use pingora_http::{RequestHeader, ResponseHeader};
use std::time::Duration;
use strum::EnumCount;
use strum_macros::EnumCount as EnumCountMacro;
mod brotli;
mod gzip;
mod zstd;
@ -49,7 +53,7 @@ pub trait Encode {
/// The caller should call the corresponding filters for the request header, response header and
/// response body. If the algorithms are supported, the output response body will be encoded.
/// The response header will be adjusted accordingly as well. If the algorithm is not supported
/// or no encoding needed, the response is untouched.
/// or no encoding is needed, the response is untouched.
///
/// If configured and if the request's `accept-encoding` header contains the algorithm supported and the
/// incoming response doesn't have that encoding, the filter will compress the response.
@ -63,23 +67,23 @@ pub struct ResponseCompressionCtx(CtxInner);
enum CtxInner {
HeaderPhase {
compression_level: u32,
decompress_enable: bool,
// Store the preferred list to compare with content-encoding
accept_encoding: Vec<Algorithm>,
encoding_levels: [u32; Algorithm::COUNT],
},
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
}
impl ResponseCompressionCtx {
/// Create a new [`ResponseCompressionCtx`] with the expected compression level. `0` will disable
/// the compression.
/// the compression. The compression level is applied across all algorithms.
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
Self(CtxInner::HeaderPhase {
compression_level,
decompress_enable,
accept_encoding: Vec::new(),
encoding_levels: [compression_level; Algorithm::COUNT],
})
}
@ -88,10 +92,10 @@ impl ResponseCompressionCtx {
pub fn is_enabled(&self) -> bool {
match &self.0 {
CtxInner::HeaderPhase {
compression_level,
decompress_enable,
accept_encoding: _,
} => *compression_level != 0 || *decompress_enable,
encoding_levels: levels,
} => levels.iter().any(|l| *l != 0) || *decompress_enable,
CtxInner::BodyPhase(c) => c.is_some(),
}
}
@ -101,25 +105,41 @@ impl ResponseCompressionCtx {
pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> {
match &self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => None,
CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()),
}
}
/// Adjust the compression level.
/// Adjust the compression level for all compression algorithms.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_level(&mut self, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level,
decompress_enable: _,
accept_encoding: _,
encoding_levels: levels,
} => {
*compression_level = new_level;
*levels = [new_level; Algorithm::COUNT];
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}
/// Adjust the compression level for a specific algorithm.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: levels,
} => {
levels[algorithm.index()] = new_level;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
@ -131,9 +151,9 @@ impl ResponseCompressionCtx {
pub fn adjust_decompression(&mut self, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable,
accept_encoding: _,
encoding_levels: _,
} => {
*decompress_enable = enabled;
}
@ -148,9 +168,9 @@ impl ResponseCompressionCtx {
}
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable: _,
accept_encoding,
encoding_levels: _,
} => parse_accept_encoding(
req.headers.get(http::header::ACCEPT_ENCODING),
accept_encoding,
@ -166,9 +186,9 @@ impl ResponseCompressionCtx {
}
match &self.0 {
CtxInner::HeaderPhase {
compression_level,
decompress_enable,
accept_encoding,
encoding_levels: levels,
} => {
if resp.status.is_informational() {
if resp.status == http::status::StatusCode::SWITCHING_PROTOCOLS {
@ -194,7 +214,7 @@ impl ResponseCompressionCtx {
let action = decide_action(resp, accept_encoding);
let encoder = match action {
Action::Noop => None,
Action::Compress(algorithm) => algorithm.compressor(*compression_level),
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable),
};
if encoder.is_some() {
@ -212,9 +232,9 @@ impl ResponseCompressionCtx {
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => panic!("Wrong phase: HeaderPhase"),
CtxInner::BodyPhase(compressor) => {
let result = compressor
@ -264,8 +284,8 @@ impl ResponseCompressionCtx {
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Algorithm {
#[derive(Debug, PartialEq, Eq, Clone, Copy, EnumCountMacro)]
pub enum Algorithm {
Any, // the "*"
Gzip,
Brotli,
@ -309,6 +329,10 @@ impl Algorithm {
}
}
}
pub fn index(&self) -> usize {
*self as usize
}
}
impl From<&str> for Algorithm {
@ -721,6 +745,10 @@ fn test_adjust_response_header() {
b"gzip"
);
assert!(header.headers.get("content-length").is_none());
assert_eq!(
header.headers.get("accept-ranges").unwrap().as_bytes(),
b"none"
);
assert_eq!(
header.headers.get("transfer-encoding").unwrap().as_bytes(),
b"chunked"

View file

@ -40,7 +40,7 @@ pub enum HttpTask {
Trailer(Option<Box<http::HeaderMap>>),
/// Signal that the response is already finished
Done,
/// Signal that the reading of the response encounters errors.
/// Signal that the reading of the response encountered errors.
Failed(pingora_error::BError),
}

View file

@ -365,7 +365,7 @@ impl Session {
}
}
/// Return the client (peer) address of the connnection.
/// Return the client (peer) address of the connection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.client_addr(),

View file

@ -783,7 +783,7 @@ impl HttpSession {
&self.digest
}
/// Return the client (peer) address of the underlying connnection.
/// Return the client (peer) address of the underlying connection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
self.digest()
.socket_digest
@ -791,7 +791,7 @@ impl HttpSession {
.map(|d| d.peer_addr())?
}
/// Return the server (local) address of the underlying connnection.
/// Return the server (local) address of the underlying connection.
pub fn server_addr(&self) -> Option<&SocketAddr> {
self.digest()
.socket_digest

View file

@ -18,7 +18,7 @@
use libc::socklen_t;
#[cfg(target_os = "linux")]
use libc::{c_int, c_void};
use libc::{c_int, c_ulonglong, c_void};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use std::io::{self, ErrorKind};
use std::mem;
@ -31,60 +31,60 @@ use tokio::net::{TcpSocket, TcpStream, UnixStream};
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct TCP_INFO {
tcpi_state: u8,
tcpi_ca_state: u8,
tcpi_retransmits: u8,
tcpi_probes: u8,
tcpi_backoff: u8,
tcpi_options: u8,
tcpi_snd_wscale_4_rcv_wscale_4: u8,
tcpi_delivery_rate_app_limited: u8,
tcpi_rto: u32,
tcpi_ato: u32,
tcpi_snd_mss: u32,
tcpi_rcv_mss: u32,
tcpi_unacked: u32,
tcpi_sacked: u32,
tcpi_lost: u32,
tcpi_retrans: u32,
tcpi_fackets: u32,
tcpi_last_data_sent: u32,
tcpi_last_ack_sent: u32,
tcpi_last_data_recv: u32,
tcpi_last_ack_recv: u32,
tcpi_pmtu: u32,
tcpi_rcv_ssthresh: u32,
pub tcpi_state: u8,
pub tcpi_ca_state: u8,
pub tcpi_retransmits: u8,
pub tcpi_probes: u8,
pub tcpi_backoff: u8,
pub tcpi_options: u8,
pub tcpi_snd_wscale_4_rcv_wscale_4: u8,
pub tcpi_delivery_rate_app_limited: u8,
pub tcpi_rto: u32,
pub tcpi_ato: u32,
pub tcpi_snd_mss: u32,
pub tcpi_rcv_mss: u32,
pub tcpi_unacked: u32,
pub tcpi_sacked: u32,
pub tcpi_lost: u32,
pub tcpi_retrans: u32,
pub tcpi_fackets: u32,
pub tcpi_last_data_sent: u32,
pub tcpi_last_ack_sent: u32,
pub tcpi_last_data_recv: u32,
pub tcpi_last_ack_recv: u32,
pub tcpi_pmtu: u32,
pub tcpi_rcv_ssthresh: u32,
pub tcpi_rtt: u32,
tcpi_rttvar: u32,
tcpi_snd_ssthresh: u32,
tcpi_snd_cwnd: u32,
tcpi_advmss: u32,
tcpi_reordering: u32,
tcpi_rcv_rtt: u32,
pub tcpi_rttvar: u32,
pub tcpi_snd_ssthresh: u32,
pub tcpi_snd_cwnd: u32,
pub tcpi_advmss: u32,
pub tcpi_reordering: u32,
pub tcpi_rcv_rtt: u32,
pub tcpi_rcv_space: u32,
tcpi_total_retrans: u32,
tcpi_pacing_rate: u64,
tcpi_max_pacing_rate: u64,
tcpi_bytes_acked: u64,
tcpi_bytes_received: u64,
tcpi_segs_out: u32,
tcpi_segs_in: u32,
tcpi_notsent_bytes: u32,
tcpi_min_rtt: u32,
tcpi_data_segs_in: u32,
tcpi_data_segs_out: u32,
tcpi_delivery_rate: u64,
tcpi_busy_time: u64,
tcpi_rwnd_limited: u64,
tcpi_sndbuf_limited: u64,
tcpi_delivered: u32,
tcpi_delivered_ce: u32,
tcpi_bytes_sent: u64,
tcpi_bytes_retrans: u64,
tcpi_dsack_dups: u32,
tcpi_reord_seen: u32,
tcpi_rcv_ooopack: u32,
tcpi_snd_wnd: u32,
pub tcpi_total_retrans: u32,
pub tcpi_pacing_rate: u64,
pub tcpi_max_pacing_rate: u64,
pub tcpi_bytes_acked: u64,
pub tcpi_bytes_received: u64,
pub tcpi_segs_out: u32,
pub tcpi_segs_in: u32,
pub tcpi_notsent_bytes: u32,
pub tcpi_min_rtt: u32,
pub tcpi_data_segs_in: u32,
pub tcpi_data_segs_out: u32,
pub tcpi_delivery_rate: u64,
pub tcpi_busy_time: u64,
pub tcpi_rwnd_limited: u64,
pub tcpi_sndbuf_limited: u64,
pub tcpi_delivered: u32,
pub tcpi_delivered_ce: u32,
pub tcpi_bytes_sent: u64,
pub tcpi_bytes_retrans: u64,
pub tcpi_dsack_dups: u32,
pub tcpi_reord_seen: u32,
pub tcpi_rcv_ooopack: u32,
pub tcpi_snd_wnd: u32,
pub tcpi_rcv_wnd: u32,
// and more, see include/linux/tcp.h
}
@ -131,6 +131,24 @@ fn get_opt<T>(
}
}
#[cfg(target_os = "linux")]
fn get_opt_sized<T>(sock: c_int, opt: c_int, val: c_int) -> io::Result<T> {
let mut payload = mem::MaybeUninit::zeroed();
let expected_size = mem::size_of::<T>() as socklen_t;
let mut size = expected_size;
get_opt(sock, opt, val, &mut payload, &mut size)?;
if size != expected_size {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"get_opt size mismatch",
));
}
// Assume getsockopt() will set the value properly
let payload = unsafe { payload.assume_init() };
Ok(payload)
}
#[cfg(target_os = "linux")]
fn cvt_linux_error(t: i32) -> io::Result<i32> {
if t == -1 {
@ -198,22 +216,7 @@ fn set_keepalive(_fd: RawFd, _ka: &TcpKeepalive) -> io::Result<()> {
/// Get the kernel TCP_INFO for the given FD.
#[cfg(target_os = "linux")]
pub fn get_tcp_info(fd: RawFd) -> io::Result<TCP_INFO> {
let mut tcp_info = unsafe { TCP_INFO::new() };
let mut data_len: socklen_t = TCP_INFO::len();
get_opt(
fd,
libc::IPPROTO_TCP,
libc::TCP_INFO,
&mut tcp_info,
&mut data_len,
)?;
if data_len != TCP_INFO::len() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"TCP_INFO struct size mismatch",
));
}
Ok(tcp_info)
get_opt_sized(fd, libc::IPPROTO_TCP, libc::TCP_INFO)
}
#[cfg(not(target_os = "linux"))]
@ -235,20 +238,11 @@ pub fn set_recv_buf(_fd: RawFd, _: usize) -> Result<()> {
#[cfg(target_os = "linux")]
pub fn get_recv_buf(fd: RawFd) -> io::Result<usize> {
let mut recv_size: c_int = 0;
let mut size = std::mem::size_of::<c_int>() as u32;
get_opt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVBUF,
&mut recv_size,
&mut size,
)?;
Ok(recv_size as usize)
get_opt_sized::<c_int>(fd, libc::SOL_SOCKET, libc::SO_RCVBUF).map(|v| v as usize)
}
#[cfg(not(target_os = "linux"))]
pub fn get_recv_buf(_fd: RawFd) -> Result<usize> {
pub fn get_recv_buf(_fd: RawFd) -> io::Result<usize> {
Ok(0)
}
@ -281,6 +275,16 @@ pub fn set_tcp_fastopen_backlog(_fd: RawFd, _backlog: usize) -> Result<()> {
Ok(())
}
#[cfg(target_os = "linux")]
pub fn get_socket_cookie(fd: RawFd) -> io::Result<u64> {
get_opt_sized::<c_ulonglong>(fd, libc::SOL_SOCKET, libc::SO_COOKIE)
}
#[cfg(not(target_os = "linux"))]
pub fn get_socket_cookie(_fd: RawFd) -> io::Result<u64> {
Ok(0) // SO_COOKIE is a Linux concept
}
/// connect() to the given address while optionally binding to the specific source address.
///
/// The `set_socket` callback can be used to tune the socket before `connect()` is called.

View file

@ -512,17 +512,18 @@ impl AccumulatedDuration {
self.start();
}
}
// Pending: start the timer, Ready(Err()): does not matter
Poll::Ready(Err(_)) => {
self.stop();
}
_ => self.start(),
}
}
fn poll_time(&mut self, result: &Poll<io::Result<()>>) {
match result {
Poll::Ready(Ok(())) => {
Poll::Ready(_) => {
self.stop();
}
// Pending: start the timer, Ready(Err()): does not matter
_ => self.start(),
}
}

View file

@ -40,9 +40,10 @@ pub struct ServerConf {
pub version: usize,
/// Whether to run this process in the background.
pub daemon: bool,
/// When configured, error log will be written to the given file. Otherwise StdErr will be used.
/// When configured and `daemon` setting is `true`, error log will be written to the given
/// file. Otherwise StdErr will be used.
pub error_log: Option<String>,
/// The pid (process ID) file of this server
/// The pid (process ID) file of this server to be created when running in background
pub pid_file: String,
/// the path to the upgrade socket
///

View file

@ -15,7 +15,10 @@
//! Defines where to connect to and how to connect to a remote server
use ahash::AHasher;
use pingora_error::{ErrorType::InternalError, OrErr, Result};
use pingora_error::{
ErrorType::{InternalError, SocketError},
OrErr, Result,
};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::hash::{Hash, Hasher};
@ -431,9 +434,11 @@ impl HttpPeer {
}
/// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.
pub fn new_uds(path: &str, tls: bool, sni: String) -> Self {
let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error
Self::new_from_sockaddr(addr, tls, sni)
pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
let addr = SocketAddr::Unix(
UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
);
Ok(Self::new_from_sockaddr(addr, tls, sni))
}
/// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port

View file

@ -63,6 +63,8 @@ impl ServeHttp for EchoApp {
}
pub struct MyServer {
// Maybe useful in the future
#[allow(dead_code)]
pub handle: thread::JoinHandle<()>,
}

View file

@ -539,6 +539,35 @@ impl<T, E> OrErr<T, E> for Result<T, E> {
}
}
/// Helper trait to convert an [Option] to an [Error] with context.
pub trait OkOrErr<T> {
fn or_err(self, et: ErrorType, context: &'static str) -> Result<T, BError>;
fn or_err_with<C: Into<ImmutStr>, F: FnOnce() -> C>(
self,
et: ErrorType,
context: F,
) -> Result<T, BError>;
}
impl<T> OkOrErr<T> for Option<T> {
/// Convert the [Option] to a new [Error] with [ErrorType] and context if None, Ok otherwise.
///
/// This is a shortcut for .ok_or(Error::explain())
fn or_err(self, et: ErrorType, context: &'static str) -> Result<T, BError> {
self.ok_or(Error::explain(et, context))
}
/// Similar to to_err(), but takes a closure, which is useful for constructing String.
fn or_err_with<C: Into<ImmutStr>, F: FnOnce() -> C>(
self,
et: ErrorType,
context: F,
) -> Result<T, BError> {
self.ok_or_else(|| Error::explain(et, context()))
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -586,4 +615,30 @@ mod tests {
" HTTPStatus context: another cause: InternalError"
);
}
#[test]
fn test_option_some_ok() {
let m = Some(2);
let o = m.or_err(ErrorType::InternalError, "some is not an error!");
assert_eq!(2, o.unwrap());
let o = m.or_err_with(ErrorType::InternalError, || "some is not an error!");
assert_eq!(2, o.unwrap());
}
#[test]
fn test_option_none_err() {
let m: Option<i32> = None;
let e1 = m.or_err(ErrorType::InternalError, "none is an error!");
assert_eq!(
format!("{}", e1.unwrap_err()),
" InternalError context: none is an error!"
);
let e1 = m.or_err_with(ErrorType::InternalError, || "none is an error!");
assert_eq!(
format!("{}", e1.unwrap_err()),
" InternalError context: none is an error!"
);
}
}

View file

@ -60,7 +60,6 @@
use std::cmp::Ordering;
use std::io::Write;
use std::net::SocketAddr;
use std::usize;
use crc32fast::Hasher;

View file

@ -100,7 +100,7 @@ impl std::net::ToSocketAddrs for Backend {
/// [Backends] is a collection of [Backend]s.
///
/// It includes a service discovery method (static or dynamic) to discover all
/// the available backends as well as an optionally health check method to probe the liveness
/// the available backends as well as an optional health check method to probe the liveness
/// of each backend.
pub struct Backends {
discovery: Box<dyn ServiceDiscovery + Send + Sync + 'static>,
@ -207,9 +207,9 @@ impl Backends {
Ok(self.do_update(new_backends, enablement))
}
/// Run health check on all the backend if it is set.
/// Run health check on all backends if it is set.
///
/// When `parallel: true`, all the backends are checked in parallel instead of sequentially
/// When `parallel: true`, all backends are checked in parallel instead of sequentially
pub async fn run_health_check(&self, parallel: bool) {
use crate::health_check::HealthCheck;
use log::{info, warn};

View file

@ -58,8 +58,12 @@ pub trait SelectionAlgorithm {
fn next(&self, key: &[u8]) -> u64;
}
/// [FVN](https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function) hashing
/// [FNV](https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function) hashing
/// on weighted backends
pub type FNVHash = Weighted<fnv::FnvHasher>;
/// Alias of [`FNVHash`] for backwards compatibility until the next breaking change
#[doc(hidden)]
pub type FVNHash = Weighted<fnv::FnvHasher>;
/// Random selection on weighted backends
pub type Random = Weighted<algorithms::Random>;

View file

@ -52,7 +52,7 @@ use tokio::sync::{mpsc, Notify};
use tokio::time;
use pingora_cache::NoCacheReason;
use pingora_core::apps::HttpServerApp;
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::connectors::{http::Connector, ConnectorOptions};
use pingora_core::modules::http::compression::ResponseCompressionBuilder;
use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
@ -95,6 +95,7 @@ pub struct HttpProxy<SV> {
inner: SV, // TODO: name it better than inner
client_upstream: Connector,
shutdown: Notify,
pub server_options: Option<HttpServerOptions>,
pub downstream_modules: HttpModules,
}
@ -104,6 +105,7 @@ impl<SV> HttpProxy<SV> {
inner,
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
shutdown: Notify::new(),
server_options: None,
downstream_modules: HttpModules::new(),
}
}
@ -725,6 +727,10 @@ where
// TODO: impl shutting down flag so that we don't need to read stack.is_shutting_down()
}
fn server_options(&self) -> Option<&HttpServerOptions> {
self.server_options.as_ref()
}
// TODO implement h2_options
}

View file

@ -18,7 +18,11 @@ use crate::proxy_common::*;
use pingora_core::protocols::http::v2::client::{write_body, Http2Session};
// add scheme and authority as required by h2 lib
fn update_h2_scheme_authority(header: &mut http::request::Parts, raw_host: &[u8]) -> Result<()> {
fn update_h2_scheme_authority(
header: &mut http::request::Parts,
raw_host: &[u8],
tls: bool,
) -> Result<()> {
let authority = if let Ok(s) = std::str::from_utf8(raw_host) {
if s.starts_with('[') {
// don't mess with ipv6 host
@ -43,8 +47,9 @@ fn update_h2_scheme_authority(header: &mut http::request::Parts, raw_host: &[u8]
);
};
let scheme = if tls { "https" } else { "http" };
let uri = http::uri::Builder::new()
.scheme("https")
.scheme(scheme)
.authority(authority)
.path_and_query(header.uri.path_and_query().as_ref().unwrap().as_str())
.build();
@ -123,7 +128,7 @@ impl<SV> HttpProxy<SV> {
// H2 requires authority to be set, so copy that from H1 host if that is set
if let Some(host) = host {
if let Err(e) = update_h2_scheme_authority(&mut req, host.as_bytes()) {
if let Err(e) = update_h2_scheme_authority(&mut req, host.as_bytes(), peer.is_tls()) {
return (false, Some(e));
}
}
@ -619,14 +624,20 @@ fn test_update_authority() {
.unwrap()
.into_parts()
.0;
update_h2_scheme_authority(&mut parts, b"example.com").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
assert_eq!("example.com", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:456").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com:456", true).unwrap();
assert_eq!("example.com:456", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com:", true).unwrap();
assert_eq!("example.com:", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"example.com:123:345").unwrap();
update_h2_scheme_authority(&mut parts, b"example.com:123:345", true).unwrap();
assert_eq!("example.com:123", parts.uri.authority().unwrap());
update_h2_scheme_authority(&mut parts, b"[::1]").unwrap();
update_h2_scheme_authority(&mut parts, b"[::1]", true).unwrap();
assert_eq!("[::1]", parts.uri.authority().unwrap());
// verify scheme
update_h2_scheme_authority(&mut parts, b"example.com", true).unwrap();
assert_eq!("https://example.com", parts.uri);
update_h2_scheme_authority(&mut parts, b"example.com", false).unwrap();
assert_eq!("http://example.com", parts.uri);
}

View file

@ -14,7 +14,7 @@
mod utils;
use hyper::Client;
use hyper::{body::HttpBody, header::HeaderValue, Body, Client};
use hyperlocal::{UnixClientExt, Uri};
use reqwest::{header, StatusCode};
@ -143,6 +143,28 @@ async fn test_h2_to_h2() {
assert_eq!(body, "Hello World!\n");
}
#[tokio::test]
async fn test_h2c_to_h2c() {
init();
let client = hyper::client::Client::builder()
.http2_only(true)
.build_http();
let mut req = hyper::Request::builder()
.uri("http://127.0.0.1:6146")
.body(Body::empty())
.unwrap();
req.headers_mut()
.insert("x-h2", HeaderValue::from_bytes(b"true").unwrap());
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
let body = res.into_body().data().await.unwrap().unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
}
#[tokio::test]
async fn test_h2_to_h2_host_override() {
init();

View file

@ -80,7 +80,7 @@ http {
}
server {
listen 8000;
listen 8000 http2;
# 8001 is used for bad_lb test only to avoid unexpected connection reuse
listen 8001;
listen [::]:8000;

View file

@ -26,6 +26,7 @@ use pingora_cache::{
set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason,
RespCacheable,
};
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::modules::http::compression::ResponseCompression;
use pingora_core::protocols::{l4::socket::SocketAddr, Digest};
use pingora_core::server::configuration::Opt;
@ -264,17 +265,24 @@ impl ProxyHttp for ExampleProxyHttp {
"/tmp/nginx-test.sock",
false,
"".to_string(),
)));
)?));
}
let port = req
.headers
.get("x-port")
.map_or("8000", |v| v.to_str().unwrap());
let peer = Box::new(HttpPeer::new(
let mut peer = Box::new(HttpPeer::new(
format!("127.0.0.1:{port}"),
false,
"".to_string(),
));
if session.get_header_bytes("x-h2") == b"true" {
// default is 1, 1
peer.options.set_http_version(2, 2);
}
Ok(peer)
}
@ -502,6 +510,15 @@ fn test_main() {
proxy_service_http.add_tcp("0.0.0.0:6147");
proxy_service_http.add_uds("/tmp/pingora_proxy.sock", None);
let mut proxy_service_h2c =
pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttp {});
let http_logic = proxy_service_h2c.app_logic_mut().unwrap();
let mut http_server_options = HttpServerOptions::default();
http_server_options.h2c = true;
http_logic.server_options = Some(http_server_options);
proxy_service_h2c.add_tcp("0.0.0.0:6146");
let mut proxy_service_https =
pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttps {});
proxy_service_https.add_tcp("0.0.0.0:6149");
@ -517,6 +534,7 @@ fn test_main() {
proxy_service_cache.add_tcp("0.0.0.0:6148");
let services: Vec<Box<dyn Service>> = vec![
Box::new(proxy_service_h2c),
Box::new(proxy_service_http),
Box::new(proxy_service_https),
Box::new(proxy_service_cache),

View file

@ -686,14 +686,22 @@ mod tests {
assert_eq!(cache.peek_queue(2), Some(SMALL));
assert_eq!(cache.peek_queue(3), Some(SMALL));
let evicted = cache.put(4, 4, 1);
let evicted = cache.put(4, 4, 2);
assert_eq!(evicted.len(), 1);
assert_eq!(evicted[0].data, 2);
assert_eq!(evicted[0].weight, 2);
assert_eq!(cache.peek_queue(1), Some(MAIN));
// 2 is evicted because 1 is in main
assert_eq!(cache.peek_queue(2), None);
assert_eq!(cache.peek_queue(3), Some(SMALL));
assert_eq!(cache.peek_queue(4), Some(SMALL));
// either 2, 3, or 4 was evicted. Check evicted for which.
let mut remaining = vec![2, 3, 4];
remaining.remove(
remaining
.iter()
.position(|x| *x == evicted[0].data)
.unwrap(),
);
assert_eq!(cache.peek_queue(evicted[0].key), None);
for k in remaining {
assert_eq!(cache.peek_queue(k), Some(SMALL));
}
}
}