From 7c122e7f36de5c946ac960a1691c5dd41f26e6e6 Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Thu, 18 Jul 2024 16:50:30 -0700 Subject: [PATCH] Add the support for custom L4 connector This allows user defined L4 connect() to be used so that they can customize the connect behavior such as changing socket options and simulating errors in tests. --- .bleep | 2 +- pingora-core/src/connectors/l4.rs | 160 ++++++++++++++++++----------- pingora-core/src/connectors/mod.rs | 1 + pingora-core/src/upstreams/peer.rs | 4 + 4 files changed, 104 insertions(+), 63 deletions(-) diff --git a/.bleep b/.bleep index f00c19a..311a546 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -26e2e108b43f8e1739801106e958f50892bd55cd \ No newline at end of file +78a170341a0fb030b8bcb2afe84afb268cdc5b2d \ No newline at end of file diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index 449ea4c..d226a8b 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; use log::debug; use pingora_error::{Context, Error, ErrorType::*, OrErr, Result}; use rand::seq::SliceRandom; @@ -26,6 +27,12 @@ use crate::protocols::l4::stream::Stream; use crate::protocols::{GetSocketDigest, SocketDigest}; use crate::upstreams::peer::Peer; +/// The interface to establish a L4 connection +#[async_trait] +pub trait Connect: std::fmt::Debug { + async fn connect(&self, addr: &SocketAddr) -> Result; +} + /// Establish a connection (l4) to the given peer using its settings and an optional bind address. pub async fn connect

(peer: &P, bind_to: Option) -> Result where @@ -37,72 +44,78 @@ where .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer)); } let peer_addr = peer.address(); - let mut stream: Stream = match peer_addr { - SocketAddr::Inet(addr) => { - let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| { - if peer.tcp_fast_open() { - set_tcp_fastopen_connect(socket.as_raw_fd())?; - } - if let Some(recv_buf) = peer.tcp_recv_buf() { - debug!("Setting recv buf size"); - set_recv_buf(socket.as_raw_fd(), recv_buf)?; - } - if let Some(dscp) = peer.dscp() { - debug!("Setting dscp"); - set_dscp(socket.as_raw_fd(), dscp)?; - } - Ok(()) - }); - let conn_res = match peer.connection_timeout() { - Some(t) => pingora_timeout::timeout(t, connect_future) - .await - .explain_err(ConnectTimedout, |_| { - format!("timeout {t:?} connecting to server {peer}") - })?, - None => connect_future.await, - }; - match conn_res { - Ok(socket) => { - debug!("connected to new server: {}", peer.address()); - Ok(socket.into()) - } - Err(e) => { - let c = format!("Fail to connect to {peer}"); - match e.etype() { - SocketError | BindError => Error::e_because(InternalError, c, e), - _ => Err(e.more_context(c)), + let mut stream: Stream = + if let Some(custom_l4) = peer.get_peer_options().and_then(|o| o.custom_l4.as_ref()) { + custom_l4.connect(peer_addr).await? + } else { + match peer_addr { + SocketAddr::Inet(addr) => { + let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| { + if peer.tcp_fast_open() { + set_tcp_fastopen_connect(socket.as_raw_fd())?; + } + if let Some(recv_buf) = peer.tcp_recv_buf() { + debug!("Setting recv buf size"); + set_recv_buf(socket.as_raw_fd(), recv_buf)?; + } + if let Some(dscp) = peer.dscp() { + debug!("Setting dscp"); + set_dscp(socket.as_raw_fd(), dscp)?; + } + Ok(()) + }); + let conn_res = match peer.connection_timeout() { + Some(t) => pingora_timeout::timeout(t, connect_future) + .await + .explain_err(ConnectTimedout, |_| { + format!("timeout {t:?} connecting to server {peer}") + })?, + None => connect_future.await, + }; + match conn_res { + Ok(socket) => { + debug!("connected to new server: {}", peer.address()); + Ok(socket.into()) + } + Err(e) => { + let c = format!("Fail to connect to {peer}"); + match e.etype() { + SocketError | BindError => Error::e_because(InternalError, c, e), + _ => Err(e.more_context(c)), + } + } } } - } - } - SocketAddr::Unix(addr) => { - let connect_future = connect_uds( - addr.as_pathname() - .expect("non-pathname unix sockets not supported as peer"), - ); - let conn_res = match peer.connection_timeout() { - Some(t) => pingora_timeout::timeout(t, connect_future) - .await - .explain_err(ConnectTimedout, |_| { - format!("timeout {t:?} connecting to server {peer}") - })?, - None => connect_future.await, - }; - match conn_res { - Ok(socket) => { - debug!("connected to new server: {}", peer.address()); - Ok(socket.into()) - } - Err(e) => { - let c = format!("Fail to connect to {peer}"); - match e.etype() { - SocketError | BindError => Error::e_because(InternalError, c, e), - _ => Err(e.more_context(c)), + SocketAddr::Unix(addr) => { + let connect_future = connect_uds( + addr.as_pathname() + .expect("non-pathname unix sockets not supported as peer"), + ); + let conn_res = match peer.connection_timeout() { + Some(t) => pingora_timeout::timeout(t, connect_future) + .await + .explain_err(ConnectTimedout, |_| { + format!("timeout {t:?} connecting to server {peer}") + })?, + None => connect_future.await, + }; + match conn_res { + Ok(socket) => { + debug!("connected to new server: {}", peer.address()); + Ok(socket.into()) + } + Err(e) => { + let c = format!("Fail to connect to {peer}"); + match e.etype() { + SocketError | BindError => Error::e_because(InternalError, c, e), + _ => Err(e.more_context(c)), + } + } } } - } - } - }?; + }? + }; + let tracer = peer.get_tracer(); if let Some(t) = tracer { t.0.on_connected(); @@ -249,6 +262,29 @@ mod tests { assert_eq!(new_session.unwrap_err().etype(), &ConnectTimedout) } + #[tokio::test] + async fn test_custom_connect() { + #[derive(Debug)] + struct MyL4; + #[async_trait] + impl Connect for MyL4 { + async fn connect(&self, _addr: &SocketAddr) -> Result { + tokio::net::TcpStream::connect("1.1.1.1:80") + .await + .map(|s| s.into()) + .or_fail() + } + } + // :79 shouldn't be able to be connected to + let mut peer = BasicPeer::new("1.1.1.1:79"); + peer.options.custom_l4 = Some(std::sync::Arc::new(MyL4 {})); + + let new_session = connect(&peer, None).await; + + // but MyL4 connects to :80 instead + assert!(new_session.is_ok()); + } + #[tokio::test] async fn test_connect_proxy_fail() { let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string()); diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index 9bfd91b..d13f3a9 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -25,6 +25,7 @@ use crate::tls::ssl::SslConnector; use crate::upstreams::peer::{Peer, ALPN}; use l4::connect as l4_connect; +pub use l4::Connect as L4Connect; use log::{debug, error, warn}; use offload::OffloadRuntime; use parking_lot::RwLock; diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs index d0c8125..c4a23a8 100644 --- a/pingora-core/src/upstreams/peer.rs +++ b/pingora-core/src/upstreams/peer.rs @@ -29,6 +29,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use crate::connectors::L4Connect; use crate::protocols::l4::socket::SocketAddr; use crate::protocols::ConnFdReusable; use crate::protocols::TcpKeepalive; @@ -322,6 +323,8 @@ pub struct PeerOptions { pub tcp_fast_open: bool, // use Arc because Clone is required but not allowed in trait object pub tracer: Option, + // A custom L4 connector to use to establish new L4 connections + pub custom_l4: Option>, } impl PeerOptions { @@ -350,6 +353,7 @@ impl PeerOptions { second_keyshare: true, // default true and noop when not using PQ curves tcp_fast_open: false, tracer: None, + custom_l4: None, } }