Windows support 2/n: Support FD types on different platforms

This commit is contained in:
Yuchen Wu 2024-08-30 09:41:09 -07:00 committed by Gustav Davidsson
parent e288bfe8f0
commit 059768551c
7 changed files with 49 additions and 21 deletions

2
.bleep
View file

@ -1 +1 @@
2351cdf592f9986201d754e6ee1f37f493f69abb bea67a70dff1b8a8a04d46b0c322e8fac1120d0b

View file

@ -16,7 +16,7 @@ use super::HttpSession;
use crate::connectors::{ConnectorOptions, TransportConnector}; use crate::connectors::{ConnectorOptions, TransportConnector};
use crate::protocols::http::v1::client::HttpSession as Http1Session; use crate::protocols::http::v1::client::HttpSession as Http1Session;
use crate::protocols::http::v2::client::{drive_connection, Http2Session}; use crate::protocols::http::v2::client::{drive_connection, Http2Session};
use crate::protocols::{Digest, Stream}; use crate::protocols::{Digest, Stream, UniqueIDType};
use crate::upstreams::peer::{Peer, ALPN}; use crate::upstreams::peer::{Peer, ALPN};
use bytes::Bytes; use bytes::Bytes;
@ -47,7 +47,7 @@ pub(crate) struct ConnectionRefInner {
connection_stub: Stub, connection_stub: Stub,
closed: watch::Receiver<bool>, closed: watch::Receiver<bool>,
ping_timeout_occurred: Arc<AtomicBool>, ping_timeout_occurred: Arc<AtomicBool>,
id: i32, id: UniqueIDType,
// max concurrent streams this connection is allowed to create // max concurrent streams this connection is allowed to create
max_streams: usize, max_streams: usize,
// how many concurrent streams already active // how many concurrent streams already active
@ -69,7 +69,7 @@ impl ConnectionRef {
send_req: SendRequest<Bytes>, send_req: SendRequest<Bytes>,
closed: watch::Receiver<bool>, closed: watch::Receiver<bool>,
ping_timeout_occurred: Arc<AtomicBool>, ping_timeout_occurred: Arc<AtomicBool>,
id: i32, id: UniqueIDType,
max_streams: usize, max_streams: usize,
digest: Digest, digest: Digest,
) -> Self { ) -> Self {
@ -98,7 +98,7 @@ impl ConnectionRef {
self.0.current_streams.fetch_sub(1, Ordering::SeqCst); self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
} }
pub fn id(&self) -> i32 { pub fn id(&self) -> UniqueIDType {
self.0.id self.0.id
} }
@ -196,7 +196,7 @@ impl InUsePool {
// release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist) // release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist)
// the caller should update the ref and then decide where to put it (in use pool or idle) // the caller should update the ref and then decide where to put it (in use pool or idle)
fn release(&self, reuse_hash: u64, id: i32) -> Option<ConnectionRef> { fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option<ConnectionRef> {
let pools = self.pools.read(); let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) { if let Some(pool) = pools.get(&reuse_hash) {
pool.remove(id) pool.remove(id)

View file

@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::body::{BodyReader, BodyWriter}; use super::body::{BodyReader, BodyWriter};
use super::common::*; use super::common::*;
use crate::protocols::http::HttpTask; use crate::protocols::http::HttpTask;
use crate::protocols::{Digest, SocketAddr, Stream, UniqueID}; use crate::protocols::{Digest, SocketAddr, Stream, UniqueID, UniqueIDType};
use crate::utils::{BufRef, KVRef}; use crate::utils::{BufRef, KVRef};
/// The HTTP 1.x client session /// The HTTP 1.x client session
@ -717,7 +717,7 @@ pub(crate) fn http_req_header_to_wire(req: &RequestHeader) -> Option<BytesMut> {
} }
impl UniqueID for HttpSession { impl UniqueID for HttpSession {
fn id(&self) -> i32 { fn id(&self) -> UniqueIDType {
self.underlying_stream.id() self.underlying_stream.id()
} }
} }

View file

@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::watch; use tokio::sync::watch;
use crate::connectors::http::v2::ConnectionRef; use crate::connectors::http::v2::ConnectionRef;
use crate::protocols::{Digest, SocketAddr}; use crate::protocols::{Digest, SocketAddr, UniqueIDType};
pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout"); pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout");
@ -336,7 +336,7 @@ impl Http2Session {
} }
/// the FD of the underlying connection /// the FD of the underlying connection
pub fn fd(&self) -> i32 { pub fn fd(&self) -> UniqueIDType {
self.conn.id() self.conn.id()
} }
@ -427,7 +427,7 @@ use tokio::sync::oneshot;
pub async fn drive_connection<S>( pub async fn drive_connection<S>(
mut c: client::Connection<S>, mut c: client::Connection<S>,
id: i32, id: UniqueIDType,
closed: watch::Sender<bool>, closed: watch::Sender<bool>,
ping_interval: Option<Duration>, ping_interval: Option<Duration>,
ping_timeout_occurred: Arc<AtomicBool>, ping_timeout_occurred: Arc<AtomicBool>,
@ -481,7 +481,7 @@ async fn do_ping_pong(
interval: Duration, interval: Duration,
tx: oneshot::Sender<()>, tx: oneshot::Sender<()>,
dropped: Arc<AtomicBool>, dropped: Arc<AtomicBool>,
id: i32, id: UniqueIDType,
) { ) {
// delay before sending the first ping, no need to race with the first request // delay before sending the first ping, no need to race with the first request
tokio::time::sleep(interval).await; tokio::time::sleep(interval).await;

View file

@ -30,7 +30,7 @@ use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive};
use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::raw_connect::ProxyDigest;
use crate::protocols::{ use crate::protocols::{
GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest, GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest,
UniqueID, UniqueID, UniqueIDType,
}; };
use crate::upstreams::peer::Tracer; use crate::upstreams::peer::Tracer;
@ -202,7 +202,7 @@ impl AsRawFd for Stream {
} }
impl UniqueID for Stream { impl UniqueID for Stream {
fn id(&self) -> i32 { fn id(&self) -> UniqueIDType {
self.as_raw_fd() self.as_raw_fd()
} }
} }

View file

@ -32,6 +32,11 @@ use std::fmt::Debug;
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc; use std::sync::Arc;
#[cfg(unix)]
pub type UniqueIDType = i32;
#[cfg(windows)]
pub type UniqueIDType = usize;
/// Define how a protocol should shutdown its connection. /// Define how a protocol should shutdown its connection.
#[async_trait] #[async_trait]
pub trait Shutdown { pub trait Shutdown {
@ -42,7 +47,7 @@ pub trait Shutdown {
pub trait UniqueID { pub trait UniqueID {
/// The ID returned should be unique among all existing connections of the same type. /// The ID returned should be unique among all existing connections of the same type.
/// But ID can be recycled after a connection is shutdown. /// But ID can be recycled after a connection is shutdown.
fn id(&self) -> i32; fn id(&self) -> UniqueIDType;
} }
/// Interface to get TLS info /// Interface to get TLS info
@ -126,7 +131,7 @@ mod ext_io_impl {
async fn shutdown(&mut self) -> () {} async fn shutdown(&mut self) -> () {}
} }
impl UniqueID for Mock { impl UniqueID for Mock {
fn id(&self) -> i32 { fn id(&self) -> UniqueIDType {
0 0
} }
} }
@ -154,7 +159,7 @@ mod ext_io_impl {
async fn shutdown(&mut self) -> () {} async fn shutdown(&mut self) -> () {}
} }
impl<T> UniqueID for Cursor<T> { impl<T> UniqueID for Cursor<T> {
fn id(&self) -> i32 { fn id(&self) -> UniqueIDType {
0 0
} }
} }
@ -182,7 +187,7 @@ mod ext_io_impl {
async fn shutdown(&mut self) -> () {} async fn shutdown(&mut self) -> () {}
} }
impl UniqueID for DuplexStream { impl UniqueID for DuplexStream {
fn id(&self) -> i32 { fn id(&self) -> UniqueIDType {
0 0
} }
} }
@ -204,15 +209,27 @@ mod ext_io_impl {
} }
} }
#[cfg(unix)]
pub(crate) trait ConnFdReusable { pub(crate) trait ConnFdReusable {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool; fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
} }
#[cfg(windows)]
pub(crate) trait ConnSockReusable {
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
}
use l4::socket::SocketAddr; use l4::socket::SocketAddr;
use log::{debug, error}; use log::{debug, error};
#[cfg(unix)]
use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr}; use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
use std::{net::SocketAddr as InetSocketAddr, os::unix::prelude::AsRawFd, path::Path}; #[cfg(unix)]
use std::os::unix::prelude::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::{net::SocketAddr as InetSocketAddr, path::Path};
#[cfg(unix)]
impl ConnFdReusable for SocketAddr { impl ConnFdReusable for SocketAddr {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool { fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
match self { match self {
@ -225,6 +242,16 @@ impl ConnFdReusable for SocketAddr {
} }
} }
#[cfg(windows)]
impl ConnSockReusable for SocketAddr {
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
match self {
SocketAddr::Inet(addr) => addr.check_sock_match(sock),
}
}
}
#[cfg(unix)]
impl ConnFdReusable for Path { impl ConnFdReusable for Path {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool { fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
let fd = fd.as_raw_fd(); let fd = fd.as_raw_fd();
@ -252,6 +279,7 @@ impl ConnFdReusable for Path {
} }
} }
#[cfg(unix)]
impl ConnFdReusable for InetSocketAddr { impl ConnFdReusable for InetSocketAddr {
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool { fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
let fd = fd.as_raw_fd(); let fd = fd.as_raw_fd();

View file

@ -26,7 +26,7 @@ pub use boringssl_openssl::*;
pub mod dummy_tls; pub mod dummy_tls;
use crate::protocols::digest::TimingDigest; use crate::protocols::digest::TimingDigest;
use crate::protocols::{Ssl, UniqueID}; use crate::protocols::{Ssl, UniqueID, UniqueIDType};
use crate::tls::{self, ssl, tokio_ssl::SslStream as InnerSsl}; use crate::tls::{self, ssl, tokio_ssl::SslStream as InnerSsl};
use log::warn; use log::warn;
use pingora_error::{ErrorType::*, OrErr, Result}; use pingora_error::{ErrorType::*, OrErr, Result};
@ -169,7 +169,7 @@ impl<T> UniqueID for SslStream<T>
where where
T: UniqueID, T: UniqueID,
{ {
fn id(&self) -> i32 { fn id(&self) -> UniqueIDType {
self.ssl.get_ref().id() self.ssl.get_ref().id()
} }
} }