From 059768551c8f1e1c31a000c3971a77b1e6dc9c78 Mon Sep 17 00:00:00 2001 From: Yuchen Wu Date: Fri, 30 Aug 2024 09:41:09 -0700 Subject: [PATCH] Windows support 2/n: Support FD types on different platforms --- .bleep | 2 +- pingora-core/src/connectors/http/v2.rs | 10 +++--- pingora-core/src/protocols/http/v1/client.rs | 4 +-- pingora-core/src/protocols/http/v2/client.rs | 8 ++--- pingora-core/src/protocols/l4/stream.rs | 4 +-- pingora-core/src/protocols/mod.rs | 38 +++++++++++++++++--- pingora-core/src/protocols/tls/mod.rs | 4 +-- 7 files changed, 49 insertions(+), 21 deletions(-) diff --git a/.bleep b/.bleep index 5081a5b..f71cb64 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -2351cdf592f9986201d754e6ee1f37f493f69abb \ No newline at end of file +bea67a70dff1b8a8a04d46b0c322e8fac1120d0b \ No newline at end of file diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs index 433bc4b..60e26fb 100644 --- a/pingora-core/src/connectors/http/v2.rs +++ b/pingora-core/src/connectors/http/v2.rs @@ -16,7 +16,7 @@ use super::HttpSession; use crate::connectors::{ConnectorOptions, TransportConnector}; use crate::protocols::http::v1::client::HttpSession as Http1Session; use crate::protocols::http::v2::client::{drive_connection, Http2Session}; -use crate::protocols::{Digest, Stream}; +use crate::protocols::{Digest, Stream, UniqueIDType}; use crate::upstreams::peer::{Peer, ALPN}; use bytes::Bytes; @@ -47,7 +47,7 @@ pub(crate) struct ConnectionRefInner { connection_stub: Stub, closed: watch::Receiver, ping_timeout_occurred: Arc, - id: i32, + id: UniqueIDType, // max concurrent streams this connection is allowed to create max_streams: usize, // how many concurrent streams already active @@ -69,7 +69,7 @@ impl ConnectionRef { send_req: SendRequest, closed: watch::Receiver, ping_timeout_occurred: Arc, - id: i32, + id: UniqueIDType, max_streams: usize, digest: Digest, ) -> Self { @@ -98,7 +98,7 @@ impl ConnectionRef { self.0.current_streams.fetch_sub(1, Ordering::SeqCst); } - pub fn id(&self) -> i32 { + pub fn id(&self) -> UniqueIDType { self.0.id } @@ -196,7 +196,7 @@ impl InUsePool { // release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist) // the caller should update the ref and then decide where to put it (in use pool or idle) - fn release(&self, reuse_hash: u64, id: i32) -> Option { + fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option { let pools = self.pools.read(); if let Some(pool) = pools.get(&reuse_hash) { pool.remove(id) diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 8c2ab14..2b2640b 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::HttpTask; -use crate::protocols::{Digest, SocketAddr, Stream, UniqueID}; +use crate::protocols::{Digest, SocketAddr, Stream, UniqueID, UniqueIDType}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x client session @@ -717,7 +717,7 @@ pub(crate) fn http_req_header_to_wire(req: &RequestHeader) -> Option { } impl UniqueID for HttpSession { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.underlying_stream.id() } } diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 9bdbff4..1d89004 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::watch; use crate::connectors::http::v2::ConnectionRef; -use crate::protocols::{Digest, SocketAddr}; +use crate::protocols::{Digest, SocketAddr, UniqueIDType}; pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout"); @@ -336,7 +336,7 @@ impl Http2Session { } /// the FD of the underlying connection - pub fn fd(&self) -> i32 { + pub fn fd(&self) -> UniqueIDType { self.conn.id() } @@ -427,7 +427,7 @@ use tokio::sync::oneshot; pub async fn drive_connection( mut c: client::Connection, - id: i32, + id: UniqueIDType, closed: watch::Sender, ping_interval: Option, ping_timeout_occurred: Arc, @@ -481,7 +481,7 @@ async fn do_ping_pong( interval: Duration, tx: oneshot::Sender<()>, dropped: Arc, - id: i32, + id: UniqueIDType, ) { // delay before sending the first ping, no need to race with the first request tokio::time::sleep(interval).await; diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index ee91a8a..edcb188 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -30,7 +30,7 @@ use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive}; use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::{ GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest, - UniqueID, + UniqueID, UniqueIDType, }; use crate::upstreams::peer::Tracer; @@ -202,7 +202,7 @@ impl AsRawFd for Stream { } impl UniqueID for Stream { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.as_raw_fd() } } diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs index fb30992..4c1aa88 100644 --- a/pingora-core/src/protocols/mod.rs +++ b/pingora-core/src/protocols/mod.rs @@ -32,6 +32,11 @@ use std::fmt::Debug; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; +#[cfg(unix)] +pub type UniqueIDType = i32; +#[cfg(windows)] +pub type UniqueIDType = usize; + /// Define how a protocol should shutdown its connection. #[async_trait] pub trait Shutdown { @@ -42,7 +47,7 @@ pub trait Shutdown { pub trait UniqueID { /// The ID returned should be unique among all existing connections of the same type. /// But ID can be recycled after a connection is shutdown. - fn id(&self) -> i32; + fn id(&self) -> UniqueIDType; } /// Interface to get TLS info @@ -126,7 +131,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for Mock { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -154,7 +159,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for Cursor { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -182,7 +187,7 @@ mod ext_io_impl { async fn shutdown(&mut self) -> () {} } impl UniqueID for DuplexStream { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { 0 } } @@ -204,15 +209,27 @@ mod ext_io_impl { } } +#[cfg(unix)] pub(crate) trait ConnFdReusable { fn check_fd_match(&self, fd: V) -> bool; } +#[cfg(windows)] +pub(crate) trait ConnSockReusable { + fn check_sock_match(&self, sock: V) -> bool; +} + use l4::socket::SocketAddr; use log::{debug, error}; +#[cfg(unix)] use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr}; -use std::{net::SocketAddr as InetSocketAddr, os::unix::prelude::AsRawFd, path::Path}; +#[cfg(unix)] +use std::os::unix::prelude::AsRawFd; +#[cfg(windows)] +use std::os::windows::io::AsRawSocket; +use std::{net::SocketAddr as InetSocketAddr, path::Path}; +#[cfg(unix)] impl ConnFdReusable for SocketAddr { fn check_fd_match(&self, fd: V) -> bool { match self { @@ -225,6 +242,16 @@ impl ConnFdReusable for SocketAddr { } } +#[cfg(windows)] +impl ConnSockReusable for SocketAddr { + fn check_sock_match(&self, sock: V) -> bool { + match self { + SocketAddr::Inet(addr) => addr.check_sock_match(sock), + } + } +} + +#[cfg(unix)] impl ConnFdReusable for Path { fn check_fd_match(&self, fd: V) -> bool { let fd = fd.as_raw_fd(); @@ -252,6 +279,7 @@ impl ConnFdReusable for Path { } } +#[cfg(unix)] impl ConnFdReusable for InetSocketAddr { fn check_fd_match(&self, fd: V) -> bool { let fd = fd.as_raw_fd(); diff --git a/pingora-core/src/protocols/tls/mod.rs b/pingora-core/src/protocols/tls/mod.rs index ca353c5..89fe0d3 100644 --- a/pingora-core/src/protocols/tls/mod.rs +++ b/pingora-core/src/protocols/tls/mod.rs @@ -26,7 +26,7 @@ pub use boringssl_openssl::*; pub mod dummy_tls; use crate::protocols::digest::TimingDigest; -use crate::protocols::{Ssl, UniqueID}; +use crate::protocols::{Ssl, UniqueID, UniqueIDType}; use crate::tls::{self, ssl, tokio_ssl::SslStream as InnerSsl}; use log::warn; use pingora_error::{ErrorType::*, OrErr, Result}; @@ -169,7 +169,7 @@ impl UniqueID for SslStream where T: UniqueID, { - fn id(&self) -> i32 { + fn id(&self) -> UniqueIDType { self.ssl.get_ref().id() } }