Add the support for custom L4 connector

This allows user defined L4 connect() to be used so that they can
customize the connect behavior such as changing socket options and
simulating errors in tests.
This commit is contained in:
Yuchen Wu 2024-07-18 16:50:30 -07:00 committed by Yuchen Wu
parent 60787db4a1
commit 7c122e7f36
4 changed files with 104 additions and 63 deletions

2
.bleep
View file

@ -1 +1 @@
26e2e108b43f8e1739801106e958f50892bd55cd
78a170341a0fb030b8bcb2afe84afb268cdc5b2d

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
@ -26,6 +27,12 @@ use crate::protocols::l4::stream::Stream;
use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;
/// The interface to establish a L4 connection
#[async_trait]
pub trait Connect: std::fmt::Debug {
async fn connect(&self, addr: &SocketAddr) -> Result<Stream>;
}
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
where
@ -37,72 +44,78 @@ where
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
if peer.tcp_fast_open() {
set_tcp_fastopen_connect(socket.as_raw_fd())?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(socket.as_raw_fd(), dscp)?;
}
Ok(())
});
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
let mut stream: Stream =
if let Some(custom_l4) = peer.get_peer_options().and_then(|o| o.custom_l4.as_ref()) {
custom_l4.connect(peer_addr).await?
} else {
match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
if peer.tcp_fast_open() {
set_tcp_fastopen_connect(socket.as_raw_fd())?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(socket.as_raw_fd(), dscp)?;
}
Ok(())
});
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
}
}
}
}
}
}
SocketAddr::Unix(addr) => {
let connect_future = connect_uds(
addr.as_pathname()
.expect("non-pathname unix sockets not supported as peer"),
);
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
SocketAddr::Unix(addr) => {
let connect_future = connect_uds(
addr.as_pathname()
.expect("non-pathname unix sockets not supported as peer"),
);
let conn_res = match peer.connection_timeout() {
Some(t) => pingora_timeout::timeout(t, connect_future)
.await
.explain_err(ConnectTimedout, |_| {
format!("timeout {t:?} connecting to server {peer}")
})?,
None => connect_future.await,
};
match conn_res {
Ok(socket) => {
debug!("connected to new server: {}", peer.address());
Ok(socket.into())
}
Err(e) => {
let c = format!("Fail to connect to {peer}");
match e.etype() {
SocketError | BindError => Error::e_because(InternalError, c, e),
_ => Err(e.more_context(c)),
}
}
}
}
}
}
}?;
}?
};
let tracer = peer.get_tracer();
if let Some(t) = tracer {
t.0.on_connected();
@ -249,6 +262,29 @@ mod tests {
assert_eq!(new_session.unwrap_err().etype(), &ConnectTimedout)
}
#[tokio::test]
async fn test_custom_connect() {
#[derive(Debug)]
struct MyL4;
#[async_trait]
impl Connect for MyL4 {
async fn connect(&self, _addr: &SocketAddr) -> Result<Stream> {
tokio::net::TcpStream::connect("1.1.1.1:80")
.await
.map(|s| s.into())
.or_fail()
}
}
// :79 shouldn't be able to be connected to
let mut peer = BasicPeer::new("1.1.1.1:79");
peer.options.custom_l4 = Some(std::sync::Arc::new(MyL4 {}));
let new_session = connect(&peer, None).await;
// but MyL4 connects to :80 instead
assert!(new_session.is_ok());
}
#[tokio::test]
async fn test_connect_proxy_fail() {
let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string());

View file

@ -25,6 +25,7 @@ use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};
use l4::connect as l4_connect;
pub use l4::Connect as L4Connect;
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;

View file

@ -29,6 +29,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use crate::connectors::L4Connect;
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::ConnFdReusable;
use crate::protocols::TcpKeepalive;
@ -322,6 +323,8 @@ pub struct PeerOptions {
pub tcp_fast_open: bool,
// use Arc because Clone is required but not allowed in trait object
pub tracer: Option<Tracer>,
// A custom L4 connector to use to establish new L4 connections
pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
}
impl PeerOptions {
@ -350,6 +353,7 @@ impl PeerOptions {
second_keyshare: true, // default true and noop when not using PQ curves
tcp_fast_open: false,
tracer: None,
custom_l4: None,
}
}