diff --git a/.bleep b/.bleep index f00c19a..311a546 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -26e2e108b43f8e1739801106e958f50892bd55cd \ No newline at end of file +78a170341a0fb030b8bcb2afe84afb268cdc5b2d \ No newline at end of file diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index 449ea4c..d226a8b 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; use log::debug; use pingora_error::{Context, Error, ErrorType::*, OrErr, Result}; use rand::seq::SliceRandom; @@ -26,6 +27,12 @@ use crate::protocols::l4::stream::Stream; use crate::protocols::{GetSocketDigest, SocketDigest}; use crate::upstreams::peer::Peer; +/// The interface to establish a L4 connection +#[async_trait] +pub trait Connect: std::fmt::Debug { + async fn connect(&self, addr: &SocketAddr) -> Result; +} + /// Establish a connection (l4) to the given peer using its settings and an optional bind address. pub async fn connect

(peer: &P, bind_to: Option) -> Result where @@ -37,72 +44,78 @@ where .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer)); } let peer_addr = peer.address(); - let mut stream: Stream = match peer_addr { - SocketAddr::Inet(addr) => { - let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| { - if peer.tcp_fast_open() { - set_tcp_fastopen_connect(socket.as_raw_fd())?; - } - if let Some(recv_buf) = peer.tcp_recv_buf() { - debug!("Setting recv buf size"); - set_recv_buf(socket.as_raw_fd(), recv_buf)?; - } - if let Some(dscp) = peer.dscp() { - debug!("Setting dscp"); - set_dscp(socket.as_raw_fd(), dscp)?; - } - Ok(()) - }); - let conn_res = match peer.connection_timeout() { - Some(t) => pingora_timeout::timeout(t, connect_future) - .await - .explain_err(ConnectTimedout, |_| { - format!("timeout {t:?} connecting to server {peer}") - })?, - None => connect_future.await, - }; - match conn_res { - Ok(socket) => { - debug!("connected to new server: {}", peer.address()); - Ok(socket.into()) - } - Err(e) => { - let c = format!("Fail to connect to {peer}"); - match e.etype() { - SocketError | BindError => Error::e_because(InternalError, c, e), - _ => Err(e.more_context(c)), + let mut stream: Stream = + if let Some(custom_l4) = peer.get_peer_options().and_then(|o| o.custom_l4.as_ref()) { + custom_l4.connect(peer_addr).await? + } else { + match peer_addr { + SocketAddr::Inet(addr) => { + let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| { + if peer.tcp_fast_open() { + set_tcp_fastopen_connect(socket.as_raw_fd())?; + } + if let Some(recv_buf) = peer.tcp_recv_buf() { + debug!("Setting recv buf size"); + set_recv_buf(socket.as_raw_fd(), recv_buf)?; + } + if let Some(dscp) = peer.dscp() { + debug!("Setting dscp"); + set_dscp(socket.as_raw_fd(), dscp)?; + } + Ok(()) + }); + let conn_res = match peer.connection_timeout() { + Some(t) => pingora_timeout::timeout(t, connect_future) + .await + .explain_err(ConnectTimedout, |_| { + format!("timeout {t:?} connecting to server {peer}") + })?, + None => connect_future.await, + }; + match conn_res { + Ok(socket) => { + debug!("connected to new server: {}", peer.address()); + Ok(socket.into()) + } + Err(e) => { + let c = format!("Fail to connect to {peer}"); + match e.etype() { + SocketError | BindError => Error::e_because(InternalError, c, e), + _ => Err(e.more_context(c)), + } + } } } - } - } - SocketAddr::Unix(addr) => { - let connect_future = connect_uds( - addr.as_pathname() - .expect("non-pathname unix sockets not supported as peer"), - ); - let conn_res = match peer.connection_timeout() { - Some(t) => pingora_timeout::timeout(t, connect_future) - .await - .explain_err(ConnectTimedout, |_| { - format!("timeout {t:?} connecting to server {peer}") - })?, - None => connect_future.await, - }; - match conn_res { - Ok(socket) => { - debug!("connected to new server: {}", peer.address()); - Ok(socket.into()) - } - Err(e) => { - let c = format!("Fail to connect to {peer}"); - match e.etype() { - SocketError | BindError => Error::e_because(InternalError, c, e), - _ => Err(e.more_context(c)), + SocketAddr::Unix(addr) => { + let connect_future = connect_uds( + addr.as_pathname() + .expect("non-pathname unix sockets not supported as peer"), + ); + let conn_res = match peer.connection_timeout() { + Some(t) => pingora_timeout::timeout(t, connect_future) + .await + .explain_err(ConnectTimedout, |_| { + format!("timeout {t:?} connecting to server {peer}") + })?, + None => connect_future.await, + }; + match conn_res { + Ok(socket) => { + debug!("connected to new server: {}", peer.address()); + Ok(socket.into()) + } + Err(e) => { + let c = format!("Fail to connect to {peer}"); + match e.etype() { + SocketError | BindError => Error::e_because(InternalError, c, e), + _ => Err(e.more_context(c)), + } + } } } - } - } - }?; + }? + }; + let tracer = peer.get_tracer(); if let Some(t) = tracer { t.0.on_connected(); @@ -249,6 +262,29 @@ mod tests { assert_eq!(new_session.unwrap_err().etype(), &ConnectTimedout) } + #[tokio::test] + async fn test_custom_connect() { + #[derive(Debug)] + struct MyL4; + #[async_trait] + impl Connect for MyL4 { + async fn connect(&self, _addr: &SocketAddr) -> Result { + tokio::net::TcpStream::connect("1.1.1.1:80") + .await + .map(|s| s.into()) + .or_fail() + } + } + // :79 shouldn't be able to be connected to + let mut peer = BasicPeer::new("1.1.1.1:79"); + peer.options.custom_l4 = Some(std::sync::Arc::new(MyL4 {})); + + let new_session = connect(&peer, None).await; + + // but MyL4 connects to :80 instead + assert!(new_session.is_ok()); + } + #[tokio::test] async fn test_connect_proxy_fail() { let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string()); diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index 9bfd91b..d13f3a9 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -25,6 +25,7 @@ use crate::tls::ssl::SslConnector; use crate::upstreams::peer::{Peer, ALPN}; use l4::connect as l4_connect; +pub use l4::Connect as L4Connect; use log::{debug, error, warn}; use offload::OffloadRuntime; use parking_lot::RwLock; diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index d0c8125..c4a23a8 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -29,6 +29,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use crate::connectors::L4Connect; use crate::protocols::l4::socket::SocketAddr; use crate::protocols::ConnFdReusable; use crate::protocols::TcpKeepalive; @@ -322,6 +323,8 @@ pub struct PeerOptions { pub tcp_fast_open: bool, // use Arc because Clone is required but not allowed in trait object pub tracer: Option, + // A custom L4 connector to use to establish new L4 connections + pub custom_l4: Option>, } impl PeerOptions { @@ -350,6 +353,7 @@ impl PeerOptions { second_keyshare: true, // default true and noop when not using PQ curves tcp_fast_open: false, tracer: None, + custom_l4: None, } }