Add server_addr and client_addr to Session

This commit is contained in:
ewang 2024-03-05 16:56:37 -08:00 committed by Edward Wang
parent 3e09114c4d
commit 20fd391f3e
20 changed files with 568 additions and 43 deletions

2
.bleep
View file

@ -1 +1 @@
90d84b32f4528ede68b8351c896a101af788113d
c16c9e8bfd9334b77a6a7c1123954f41037c06c3

View file

@ -24,6 +24,7 @@ use std::sync::Arc;
use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;
@ -91,6 +92,15 @@ where
) -> Option<Stream> {
match stream.selected_alpn_proto() {
Some(ALPN::H2) => {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
@ -100,10 +110,12 @@ where
}
Ok(c) => c,
};
loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await;
let h2_stream =
server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
// It is common for client to just disconnect TCP without properly

View file

@ -65,6 +65,7 @@ impl Connector {
#[cfg(test)]
mod tests {
use super::*;
use crate::protocols::l4::socket::SocketAddr;
use crate::upstreams::peer::HttpPeer;
use pingora_http::RequestHeader;
@ -85,6 +86,8 @@ mod tests {
let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into());
// make a new connection to 1.1.1.1
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
let server_addr = http.server_addr().unwrap();
assert_eq!(*server_addr, "1.1.1.1:80".parse::<SocketAddr>().unwrap());
assert!(!reused);
// this http is not even used, so not be able to reuse
@ -104,6 +107,8 @@ mod tests {
let peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
// make a new connection to https://1.1.1.1
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
let server_addr = http.server_addr().unwrap();
assert_eq!(*server_addr, "1.1.1.1:443".parse::<SocketAddr>().unwrap());
assert!(!reused);
// this http is not even used, so not be able to reuse

View file

@ -369,6 +369,7 @@ async fn handshake(
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
};
// TODO: make these configurable
let (send_req, connection) = Builder::new()

View file

@ -16,10 +16,12 @@ use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
use std::net::SocketAddr as InetSocketAddr;
use std::os::unix::io::AsRawFd;
use crate::protocols::l4::ext::{connect as tcp_connect, connect_uds, set_tcp_keepalive};
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::l4::stream::Stream;
use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
@ -32,7 +34,8 @@ where
.await
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let mut stream: Stream = match peer.address() {
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref());
let conn_res = match peer.connection_timeout() {
@ -97,6 +100,14 @@ where
}
stream.set_nodelay()?;
let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());
digest
.peer_addr
.set(Some(peer_addr.clone()))
.expect("newly created OnceCell must be empty");
stream.set_socket_digest(digest);
Ok(stream)
}

View file

@ -17,11 +17,14 @@
use std::sync::Arc;
use std::time::SystemTime;
use once_cell::sync::OnceCell;
use super::l4::socket::SocketAddr;
use super::raw_connect::ProxyDigest;
use super::ssl::digest::SslDigest;
/// The information can be extracted from a connection
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct Digest {
/// Information regarding the TLS of this connection if any
pub ssl_digest: Option<Arc<SslDigest>>,
@ -29,6 +32,8 @@ pub struct Digest {
pub timing_digest: Vec<Option<TimingDigest>>,
/// information regarding the CONNECT proxy this connection uses.
pub proxy_digest: Option<Arc<ProxyDigest>>,
/// Information about underlying socket/fd of this connection
pub socket_digest: Option<Arc<SocketDigest>>,
}
/// The interface to return protocol related information
@ -53,6 +58,38 @@ impl Default for TimingDigest {
}
}
#[derive(Debug)]
/// The interface to return socket-related information
pub struct SocketDigest {
raw_fd: std::os::unix::io::RawFd,
/// Remote socket address
pub peer_addr: OnceCell<Option<SocketAddr>>,
/// Local socket address
pub local_addr: OnceCell<Option<SocketAddr>>,
}
impl SocketDigest {
pub fn from_raw_fd(raw_fd: std::os::unix::io::RawFd) -> SocketDigest {
SocketDigest {
raw_fd,
peer_addr: OnceCell::new(),
local_addr: OnceCell::new(),
}
}
pub fn peer_addr(&self) -> Option<&SocketAddr> {
self.peer_addr
.get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, true))
.as_ref()
}
pub fn local_addr(&self) -> Option<&SocketAddr> {
self.local_addr
.get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, false))
.as_ref()
}
}
/// The interface to return timing information
pub trait GetTimingDigest {
/// Return the timing for each layer from the lowest layer to upper
@ -64,3 +101,9 @@ pub trait GetProxyDigest {
fn get_proxy_digest(&self) -> Option<Arc<ProxyDigest>>;
fn set_proxy_digest(&mut self, _digest: ProxyDigest) {}
}
/// The interface to set or return socket information
pub trait GetSocketDigest {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>>;
fn set_socket_digest(&mut self, _socket_digest: SocketDigest) {}
}

View file

@ -19,7 +19,7 @@ use std::time::Duration;
use super::v1::client::HttpSession as Http1Session;
use super::v2::client::Http2Session;
use crate::protocols::Digest;
use crate::protocols::{Digest, SocketAddr};
/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
pub enum HttpSession {
@ -151,11 +151,27 @@ impl HttpSession {
/// Return the [Digest] of the connection
///
/// For reused connection, the timing in the digest will reflect its initial handshakes
/// The caller should check if the connection is reused to avoid misuse the timing field
/// The caller should check if the connection is reused to avoid misuse of the timing field
pub fn digest(&self) -> Option<&Digest> {
match self {
Self::H1(s) => Some(s.digest()),
Self::H2(s) => s.digest(),
}
}
/// Return the server (peer) address of the connection.
pub fn server_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.server_addr(),
Self::H2(s) => s.server_addr(),
}
}
/// Return the client (local) address of the connection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.client_addr(),
Self::H2(s) => s.client_addr(),
}
}
}

View file

@ -18,7 +18,7 @@ use super::error_resp;
use super::v1::server::HttpSession as SessionV1;
use super::v2::server::HttpSession as SessionV2;
use super::HttpTask;
use crate::protocols::Stream;
use crate::protocols::{SocketAddr, Stream};
use bytes::Bytes;
use http::header::AsHeaderName;
use http::HeaderValue;
@ -330,4 +330,20 @@ impl Session {
Self::H2(s) => s.body_bytes_sent(),
}
}
/// Return the client (peer) address of the connnection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.client_addr(),
Self::H2(s) => s.client_addr(),
}
}
/// Return the server (local) address of the connection.
pub fn server_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.server_addr(),
Self::H2(s) => s.server_addr(),
}
}
}

View file

@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::body::{BodyReader, BodyWriter};
use super::common::*;
use crate::protocols::http::HttpTask;
use crate::protocols::{Digest, Stream, UniqueID};
use crate::protocols::{Digest, SocketAddr, Stream, UniqueID};
use crate::utils::{BufRef, KVRef};
/// The HTTP 1.x client session
@ -65,6 +65,7 @@ impl HttpSession {
ssl_digest: stream.get_ssl_digest(),
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
HttpSession {
underlying_stream: stream,
@ -601,6 +602,22 @@ impl HttpSession {
pub fn digest(&self) -> &Digest {
&self.digest
}
/// Return the server (peer) address recorded in the connection digest.
pub fn server_addr(&self) -> Option<&SocketAddr> {
self.digest()
.socket_digest
.as_ref()
.map(|d| d.peer_addr())?
}
/// Return the client (local) address recorded in the connection digest.
pub fn client_addr(&self) -> Option<&SocketAddr> {
self.digest()
.socket_digest
.as_ref()
.map(|d| d.local_addr())?
}
}
#[inline]

View file

@ -31,7 +31,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::body::{BodyReader, BodyWriter};
use super::common::*;
use crate::protocols::http::{body_buffer::FixedBuffer, date, error_resp, HttpTask};
use crate::protocols::Stream;
use crate::protocols::{Digest, SocketAddr, Stream};
use crate::utils::{BufRef, KVRef};
/// The HTTP 1.x server session
@ -68,6 +68,8 @@ pub struct HttpSession {
/// Whether this session is an upgraded session. This flag is calculated when sending the
/// response header to the client.
upgraded: bool,
/// Digest to track underlying connection metrics
digest: Box<Digest>,
}
impl HttpSession {
@ -75,6 +77,14 @@ impl HttpSession {
/// The created session needs to call [`Self::read_request()`] first before performing
/// any other operations.
pub fn new(underlying_stream: Stream) -> Self {
// TODO: maybe we should put digest in the connection itself
let digest = Box::new(Digest {
ssl_digest: underlying_stream.get_ssl_digest(),
timing_digest: underlying_stream.get_timing_digest(),
proxy_digest: underlying_stream.get_proxy_digest(),
socket_digest: underlying_stream.get_socket_digest(),
});
HttpSession {
underlying_stream,
buf: Bytes::new(), // zero size, with be replaced by parsed header later
@ -92,6 +102,7 @@ impl HttpSession {
body_bytes_sent: 0,
retry_buffer: None,
upgraded: false,
digest,
}
}
@ -751,6 +762,27 @@ impl HttpSession {
}
}
/// Return the [Digest] of the connection.
pub fn digest(&self) -> &Digest {
&self.digest
}
/// Return the client (peer) address of the underlying connnection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
self.digest()
.socket_digest
.as_ref()
.map(|d| d.peer_addr())?
}
/// Return the server (local) address of the underlying connnection.
pub fn server_addr(&self) -> Option<&SocketAddr> {
self.digest()
.socket_digest
.as_ref()
.map(|d| d.local_addr())?
}
/// Consume `self`, if the connection can be reused, the underlying stream will be returned
/// to be fed to the next [`Self::new()`]. The next session can just call [`Self::read_request()`].
/// If the connection cannot be reused, the underlying stream will be closed and `None` will be

View file

@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::watch;
use crate::connectors::http::v2::ConnectionRef;
use crate::protocols::Digest;
use crate::protocols::{Digest, SocketAddr};
pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout");
@ -310,6 +310,24 @@ impl Http2Session {
Some(self.conn.digest())
}
/// Return the server (peer) address recorded in the connection digest.
pub fn server_addr(&self) -> Option<&SocketAddr> {
self.conn
.digest()
.socket_digest
.as_ref()
.map(|d| d.peer_addr())?
}
/// Return the client (local) address recorded in the connection digest.
pub fn client_addr(&self) -> Option<&SocketAddr> {
self.conn
.digest()
.socket_digest
.as_ref()
.map(|d| d.local_addr())?
}
/// the FD of the underlying connection
pub fn fd(&self) -> i32 {
self.conn.id()

View file

@ -23,12 +23,13 @@ use http::header::HeaderName;
use http::{header, Response};
use log::{debug, warn};
use pingora_http::{RequestHeader, ResponseHeader};
use std::sync::Arc;
use crate::protocols::http::body_buffer::FixedBuffer;
use crate::protocols::http::date::get_cached_date;
use crate::protocols::http::v1::client::http_req_header_to_wire;
use crate::protocols::http::HttpTask;
use crate::protocols::Stream;
use crate::protocols::{Digest, SocketAddr, Stream};
use crate::{Error, ErrorType, OrErr, Result};
const BODY_BUF_LIMIT: usize = 1024 * 64;
@ -95,6 +96,8 @@ pub struct HttpSession {
body_sent: usize,
// buffered request body for retry logic
retry_buffer: Option<FixedBuffer>,
// digest to record underlying connection info
digest: Arc<Digest>,
}
impl HttpSession {
@ -102,11 +105,19 @@ impl HttpSession {
/// This function returns a new HTTP/2 session when the provided HTTP/2 connection, `conn`,
/// establishes a new HTTP/2 stream to this server.
///
/// A [`Digest`] from the IO stream is also stored in the resulting session, since the
/// session doesn't have access to the underlying stream (and the stream itself isn't
/// accessible from the `h2::server::Connection`).
///
/// Note: in order to handle all **existing** and new HTTP/2 sessions, the server must call
/// this function in a loop until the client decides to close the connection.
///
/// `None` will be returned when the connection is closing so that the loop can exit.
pub async fn from_h2_conn(conn: &mut H2Connection<Stream>) -> Result<Option<Self>> {
///
pub async fn from_h2_conn(
conn: &mut H2Connection<Stream>,
digest: Arc<Digest>,
) -> Result<Option<Self>> {
// NOTE: conn.accept().await is what drives the entire connection.
let res = conn.accept().await.transpose().or_err(
ErrorType::H2Error,
@ -125,6 +136,7 @@ impl HttpSession {
body_read: 0,
body_sent: 0,
retry_buffer: None,
digest,
}
}))
}
@ -405,6 +417,21 @@ impl HttpSession {
pub fn body_bytes_sent(&self) -> usize {
self.body_sent
}
/// Return the [Digest] of the connection.
pub fn digest(&self) -> Option<&Digest> {
Some(&self.digest)
}
/// Return the server (local) address recorded in the connection digest.
pub fn server_addr(&self) -> Option<&SocketAddr> {
self.digest.socket_digest.as_ref().map(|d| d.local_addr())?
}
/// Return the client (peer) address recorded in the connection digest.
pub fn client_addr(&self) -> Option<&SocketAddr> {
self.digest.socket_digest.as_ref().map(|d| d.peer_addr())?
}
}
#[cfg(test)]
@ -444,8 +471,12 @@ mod test {
});
let mut connection = handshake(Box::new(server), None).await.unwrap();
let digest = Arc::new(Digest::default());
while let Some(mut http) = HttpSession::from_h2_conn(&mut connection).await.unwrap() {
while let Some(mut http) = HttpSession::from_h2_conn(&mut connection, digest.clone())
.await
.unwrap()
{
tokio::spawn(async move {
let req = http.req_header();
assert_eq!(req.method, Method::GET);

View file

@ -18,6 +18,7 @@ use std::io;
use std::os::unix::io::AsRawFd;
use tokio::net::{TcpListener, UnixListener};
use crate::protocols::digest::{GetSocketDigest, SocketDigest};
use crate::protocols::l4::stream::Stream;
/// The type for generic listener for both TCP and Unix domain socket
@ -40,7 +41,7 @@ impl From<UnixListener> for Listener {
}
impl AsRawFd for Listener {
fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
match &self {
Self::Tcp(l) => l.as_raw_fd(),
Self::Unix(l) => l.as_raw_fd(),
@ -52,8 +53,32 @@ impl Listener {
/// Accept a connection from the listening endpoint
pub async fn accept(&self) -> io::Result<Stream> {
match &self {
Self::Tcp(l) => l.accept().await.map(|(stream, _)| stream.into()),
Self::Unix(l) => l.accept().await.map(|(stream, _)| stream.into()),
Self::Tcp(l) => l.accept().await.map(|(stream, peer_addr)| {
let mut s: Stream = stream.into();
let digest = SocketDigest::from_raw_fd(s.as_raw_fd());
digest
.peer_addr
.set(Some(peer_addr.into()))
.expect("newly created OnceCell must be empty");
s.set_socket_digest(digest);
// TODO: if listening on a specific bind address, we could save
// an extra syscall looking up the local_addr later if we can pass
// and init it in the socket digest here
s
}),
Self::Unix(l) => l.accept().await.map(|(stream, peer_addr)| {
let mut s: Stream = stream.into();
let digest = SocketDigest::from_raw_fd(s.as_raw_fd());
// note: if unnamed/abstract UDS, it will be `None`
// (see TryFrom<tokio::net::unix::SocketAddr>)
let addr = peer_addr.try_into().ok();
digest
.peer_addr
.set(addr)
.expect("newly created OnceCell must be empty");
s.set_socket_digest(digest);
s
}),
}
}
}

View file

@ -15,10 +15,12 @@
//! Generic socket type
use crate::{Error, OrErr};
use nix::sys::socket::{getpeername, getsockname, SockaddrStorage};
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::net::SocketAddr as StdSockAddr;
use std::os::unix::net::SocketAddr as StdUnixSockAddr;
use tokio::net::unix::SocketAddr as TokioUnixSockAddr;
/// [`SocketAddr`] is a storage type that contains either a Internet (IP address)
/// socket address or a Unix domain socket address.
@ -53,6 +55,40 @@ impl SocketAddr {
addr.set_port(port)
}
}
fn from_sockaddr_storage(sock: &SockaddrStorage) -> Option<SocketAddr> {
if let Some(v4) = sock.as_sockaddr_in() {
return Some(SocketAddr::Inet(StdSockAddr::V4(
std::net::SocketAddrV4::new(v4.ip().into(), v4.port()),
)));
} else if let Some(v6) = sock.as_sockaddr_in6() {
return Some(SocketAddr::Inet(StdSockAddr::V6(
std::net::SocketAddrV6::new(v6.ip(), v6.port(), v6.flowinfo(), v6.scope_id()),
)));
}
// TODO: don't set abstract / unnamed for now,
// for parity with how we treat these types in TryFrom<TokioUnixSockAddr>
Some(SocketAddr::Unix(
sock.as_unix_addr()
.map(|addr| addr.path().map(StdUnixSockAddr::from_pathname))??
.ok()?,
))
}
pub fn from_raw_fd(fd: std::os::unix::io::RawFd, peer_addr: bool) -> Option<SocketAddr> {
let sockaddr_storage = if peer_addr {
getpeername(fd)
} else {
getsockname(fd)
};
match sockaddr_storage {
Ok(sockaddr) => Self::from_sockaddr_storage(&sockaddr),
// could be errors such as EBADF, i.e. fd is no longer a valid socket
// fail open in this case
Err(_e) => None,
}
}
}
impl std::fmt::Display for SocketAddr {
@ -167,6 +203,34 @@ impl std::net::ToSocketAddrs for SocketAddr {
}
}
impl From<StdSockAddr> for SocketAddr {
fn from(sockaddr: StdSockAddr) -> Self {
SocketAddr::Inet(sockaddr)
}
}
impl From<StdUnixSockAddr> for SocketAddr {
fn from(sockaddr: StdUnixSockAddr) -> Self {
SocketAddr::Unix(sockaddr)
}
}
// TODO: ideally mio/tokio will start using the std version of the unix `SocketAddr`
// so we can avoid a fallible conversion
// https://github.com/tokio-rs/mio/issues/1527
impl TryFrom<TokioUnixSockAddr> for SocketAddr {
type Error = String;
fn try_from(value: TokioUnixSockAddr) -> Result<Self, Self::Error> {
if let Some(Ok(addr)) = value.as_pathname().map(StdUnixSockAddr::from_pathname) {
Ok(addr.into())
} else {
// may be unnamed/abstract UDS
Err(format!("could not convert {value:?} to SocketAddr"))
}
}
}
#[cfg(test)]
mod test {
use super::*;

View file

@ -27,7 +27,10 @@ use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf};
use tokio::net::{TcpStream, UnixStream};
use crate::protocols::raw_connect::ProxyDigest;
use crate::protocols::{GetProxyDigest, GetTimingDigest, Shutdown, Ssl, TimingDigest, UniqueID};
use crate::protocols::{
GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest,
UniqueID,
};
use crate::upstreams::peer::Tracer;
#[derive(Debug)]
@ -105,6 +108,15 @@ impl AsyncWrite for RawStream {
}
}
impl AsRawFd for RawStream {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
match self {
RawStream::Tcp(s) => s.as_raw_fd(),
RawStream::Unix(s) => s.as_raw_fd(),
}
}
}
// Large read buffering helps reducing syscalls with little trade-off
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
const BUF_READ_SIZE: usize = 64 * 1024;
@ -123,6 +135,7 @@ pub struct Stream {
stream: BufStream<RawStream>,
buffer_write: bool,
proxy_digest: Option<Arc<ProxyDigest>>,
socket_digest: Option<Arc<SocketDigest>>,
/// When this connection is established
pub established_ts: SystemTime,
/// The distributed tracing object for this stream
@ -147,6 +160,7 @@ impl From<TcpStream> for Stream {
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
socket_digest: None,
tracer: None,
}
}
@ -159,17 +173,21 @@ impl From<UnixStream> for Stream {
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
socket_digest: None,
tracer: None,
}
}
}
impl AsRawFd for Stream {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.stream.get_ref().as_raw_fd()
}
}
impl UniqueID for Stream {
fn id(&self) -> i32 {
match &self.stream.get_ref() {
RawStream::Tcp(s) => s.as_raw_fd(),
RawStream::Unix(s) => s.as_raw_fd(),
}
self.as_raw_fd()
}
}
@ -204,6 +222,16 @@ impl GetProxyDigest for Stream {
}
}
impl GetSocketDigest for Stream {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
self.socket_digest.clone()
}
fn set_socket_digest(&mut self, socket_digest: SocketDigest) {
self.socket_digest = Some(Arc::new(socket_digest))
}
}
impl Drop for Stream {
fn drop(&mut self) {
if let Some(t) = self.tracer.as_ref() {

View file

@ -20,7 +20,10 @@ pub mod l4;
pub mod raw_connect;
pub mod ssl;
pub use digest::{Digest, GetProxyDigest, GetTimingDigest, ProtoDigest, TimingDigest};
pub use digest::{
Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
TimingDigest,
};
pub use ssl::ALPN;
use async_trait::async_trait;
@ -71,6 +74,7 @@ pub trait IO:
+ Ssl
+ GetTimingDigest
+ GetProxyDigest
+ GetSocketDigest
+ Unpin
+ Debug
+ Send
@ -90,6 +94,7 @@ impl<
+ Ssl
+ GetTimingDigest
+ GetProxyDigest
+ GetSocketDigest
+ Unpin
+ Debug
+ Send
@ -134,6 +139,11 @@ mod ext_io_impl {
None
}
}
impl GetSocketDigest for Mock {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
use std::io::Cursor;
@ -157,6 +167,11 @@ mod ext_io_impl {
None
}
}
impl<T> GetSocketDigest for Cursor<T> {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
use tokio::io::DuplexStream;
@ -180,6 +195,11 @@ mod ext_io_impl {
None
}
}
impl GetSocketDigest for DuplexStream {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
}
pub(crate) trait ConnFdReusable {

View file

@ -16,7 +16,9 @@
use super::SslStream;
use crate::protocols::raw_connect::ProxyDigest;
use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO};
use crate::protocols::{
GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, TimingDigest, IO,
};
use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
@ -90,3 +92,15 @@ where
self.get_ref().get_proxy_digest()
}
}
impl<S> GetSocketDigest for SslStream<S>
where
S: GetSocketDigest,
{
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
self.get_ref().get_socket_digest()
}
fn set_socket_digest(&mut self, socket_digest: SocketDigest) {
self.get_mut().set_socket_digest(socket_digest)
}
}

View file

@ -17,7 +17,9 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use pingora_cache::lock::WritePermit;
use pingora_core::protocols::raw_connect::ProxyDigest;
use pingora_core::protocols::{GetProxyDigest, GetTimingDigest, Ssl, TimingDigest, UniqueID};
use pingora_core::protocols::{
GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID,
};
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf};
@ -85,6 +87,12 @@ impl GetProxyDigest for DummyIO {
}
}
impl GetSocketDigest for DummyIO {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
None
}
}
#[async_trait]
impl pingora_core::protocols::Shutdown for DummyIO {
async fn shutdown(&mut self) -> () {}

View file

@ -20,6 +20,10 @@ use reqwest::{header, StatusCode};
use utils::server_utils::init;
fn is_specified_port(port: u16) -> bool {
(1..65535).contains(&port)
}
#[tokio::test]
async fn test_origin_alive() {
init();
@ -36,8 +40,27 @@ async fn test_simple_proxy() {
init();
let res = reqwest::get("http://127.0.0.1:6147").await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
assert_eq!(headers["x-server-addr"], "127.0.0.1:6147");
let sockaddr = headers["x-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
assert!(is_specified_port(sockaddr.port()));
assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000");
let sockaddr = headers["x-upstream-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
assert!(is_specified_port(sockaddr.port()));
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@ -53,8 +76,28 @@ async fn test_h2_to_h1() {
let res = client.get("https://127.0.0.1:6150").send().await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
assert_eq!(headers["x-server-addr"], "127.0.0.1:6150");
let sockaddr = headers["x-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
assert!(is_specified_port(sockaddr.port()));
assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443");
let sockaddr = headers["x-upstream-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
assert!(is_specified_port(sockaddr.port()));
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@ -75,8 +118,27 @@ async fn test_h2_to_h2() {
.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
assert_eq!(headers["x-server-addr"], "127.0.0.1:6150");
let sockaddr = headers["x-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
assert!(is_specified_port(sockaddr.port()));
assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443");
let sockaddr = headers["x-upstream-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
assert!(is_specified_port(sockaddr.port()));
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@ -159,7 +221,21 @@ async fn test_simple_proxy_uds() {
assert_eq!(res.status(), reqwest::StatusCode::OK);
let (resp, body) = res.into_parts();
assert_eq!(resp.headers[header::CONTENT_LENGTH], "13");
let headers = &resp.headers;
assert_eq!(headers[header::CONTENT_LENGTH], "13");
assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock");
assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS
assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000");
let sockaddr = headers["x-upstream-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
assert!(is_specified_port(sockaddr.port()));
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
}
@ -168,15 +244,30 @@ async fn test_simple_proxy_uds() {
async fn test_simple_proxy_uds_peer() {
init();
let client = reqwest::Client::new();
let res = client
.get("http://127.0.0.1:6147")
.header("x-uds-peer", "1") // force upstream peer to be UDS
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let headers = res.headers();
let headers = &res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
assert_eq!(headers["x-server-addr"], "127.0.0.1:6147");
let sockaddr = headers["x-client-addr"]
.to_str()
.unwrap()
.parse::<std::net::SocketAddr>()
.unwrap();
assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
assert!(is_specified_port(sockaddr.port()));
assert_eq!(headers["x-upstream-client-addr"], "unset"); // unnamed UDS
assert_eq!(headers["x-upstream-server-addr"], "/tmp/nginx-test.sock");
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}

View file

@ -23,7 +23,7 @@ use pingora_cache::{
set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason,
RespCacheable,
};
use pingora_core::protocols::Digest;
use pingora_core::protocols::{l4::socket::SocketAddr, Digest};
use pingora_core::server::configuration::Opt;
use pingora_core::services::Service;
use pingora_core::upstreams::peer::HttpPeer;
@ -38,15 +38,72 @@ use structopt::StructOpt;
pub struct ExampleProxyHttps {}
#[allow(clippy::upper_case_acronyms)]
#[derive(Default)]
pub struct CTX {
conn_reused: bool,
upstream_client_addr: Option<SocketAddr>,
upstream_server_addr: Option<SocketAddr>,
}
// Common logic for both ProxyHttp(s) types
fn connected_to_upstream_common(
reused: bool,
digest: Option<&Digest>,
ctx: &mut CTX,
) -> Result<()> {
ctx.conn_reused = reused;
let socket_digest = digest
.expect("upstream connector digest should be set for HTTP sessions")
.socket_digest
.as_ref()
.expect("socket digest should be set for HTTP sessions");
ctx.upstream_client_addr = socket_digest.local_addr().cloned();
ctx.upstream_server_addr = socket_digest.peer_addr().cloned();
Ok(())
}
fn response_filter_common(
session: &mut Session,
response: &mut ResponseHeader,
ctx: &mut CTX,
) -> Result<()> {
if ctx.conn_reused {
response.insert_header("x-conn-reuse", "1")?;
}
let client_addr = session.client_addr();
let server_addr = session.server_addr();
response.insert_header(
"x-client-addr",
client_addr.map_or_else(|| "unset".into(), |a| a.to_string()),
)?;
response.insert_header(
"x-server-addr",
server_addr.map_or_else(|| "unset".into(), |a| a.to_string()),
)?;
response.insert_header(
"x-upstream-client-addr",
ctx.upstream_client_addr
.as_ref()
.map_or_else(|| "unset".into(), |a| a.to_string()),
)?;
response.insert_header(
"x-upstream-server-addr",
ctx.upstream_server_addr
.as_ref()
.map_or_else(|| "unset".into(), |a| a.to_string()),
)?;
Ok(())
}
#[async_trait]
impl ProxyHttp for ExampleProxyHttps {
type CTX = CTX;
fn new_ctx(&self) -> Self::CTX {
CTX { conn_reused: false }
CTX::default()
}
async fn upstream_peer(
@ -101,17 +158,14 @@ impl ProxyHttp for ExampleProxyHttps {
async fn response_filter(
&self,
_session: &mut Session,
session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
if ctx.conn_reused {
upstream_response.insert_header("x-conn-reuse", "1")?;
}
Ok(())
response_filter_common(session, upstream_response, ctx)
}
async fn upstream_request_filter(
@ -119,10 +173,7 @@ impl ProxyHttp for ExampleProxyHttps {
session: &mut Session,
req: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
) -> Result<()> {
let host = session.get_header_bytes("host-override");
if host != b"" {
req.insert_header("host", host)?;
@ -136,11 +187,10 @@ impl ProxyHttp for ExampleProxyHttps {
reused: bool,
_peer: &HttpPeer,
_fd: std::os::unix::io::RawFd,
_digest: Option<&Digest>,
digest: Option<&Digest>,
ctx: &mut CTX,
) -> Result<()> {
ctx.conn_reused = reused;
Ok(())
connected_to_upstream_common(reused, digest, ctx)
}
}
@ -148,8 +198,10 @@ pub struct ExampleProxyHttp {}
#[async_trait]
impl ProxyHttp for ExampleProxyHttp {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
type CTX = CTX;
fn new_ctx(&self) -> Self::CTX {
CTX::default()
}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let req = session.req_header();
@ -164,6 +216,15 @@ impl ProxyHttp for ExampleProxyHttp {
Ok(false)
}
async fn response_filter(
&self,
session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) -> Result<()> {
response_filter_common(session, upstream_response, ctx)
}
async fn upstream_peer(
&self,
session: &mut Session,
@ -182,12 +243,24 @@ impl ProxyHttp for ExampleProxyHttp {
.get("x-port")
.map_or("8000", |v| v.to_str().unwrap());
let peer = Box::new(HttpPeer::new(
format!("127.0.0.1:{}", port),
format!("127.0.0.1:{port}"),
false,
"".to_string(),
));
Ok(peer)
}
async fn connected_to_upstream(
&self,
_http_session: &mut Session,
reused: bool,
_peer: &HttpPeer,
_fd: std::os::unix::io::RawFd,
digest: Option<&Digest>,
ctx: &mut CTX,
) -> Result<()> {
connected_to_upstream_common(reused, digest, ctx)
}
}
static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);