mirror of
https://github.com/cloudflare/pingora.git
synced 2024-09-20 02:31:35 +02:00
Windows support 2/n: Support FD types on different platforms
This commit is contained in:
parent
e288bfe8f0
commit
2853309dcd
7 changed files with 49 additions and 21 deletions
2
.bleep
2
.bleep
|
@ -1 +1 @@
|
|||
2351cdf592f9986201d754e6ee1f37f493f69abb
|
||||
bea67a70dff1b8a8a04d46b0c322e8fac1120d0b
|
|
@ -16,7 +16,7 @@ use super::HttpSession;
|
|||
use crate::connectors::{ConnectorOptions, TransportConnector};
|
||||
use crate::protocols::http::v1::client::HttpSession as Http1Session;
|
||||
use crate::protocols::http::v2::client::{drive_connection, Http2Session};
|
||||
use crate::protocols::{Digest, Stream};
|
||||
use crate::protocols::{Digest, Stream, UniqueIDType};
|
||||
use crate::upstreams::peer::{Peer, ALPN};
|
||||
|
||||
use bytes::Bytes;
|
||||
|
@ -47,7 +47,7 @@ pub(crate) struct ConnectionRefInner {
|
|||
connection_stub: Stub,
|
||||
closed: watch::Receiver<bool>,
|
||||
ping_timeout_occurred: Arc<AtomicBool>,
|
||||
id: i32,
|
||||
id: UniqueIDType,
|
||||
// max concurrent streams this connection is allowed to create
|
||||
max_streams: usize,
|
||||
// how many concurrent streams already active
|
||||
|
@ -69,7 +69,7 @@ impl ConnectionRef {
|
|||
send_req: SendRequest<Bytes>,
|
||||
closed: watch::Receiver<bool>,
|
||||
ping_timeout_occurred: Arc<AtomicBool>,
|
||||
id: i32,
|
||||
id: UniqueIDType,
|
||||
max_streams: usize,
|
||||
digest: Digest,
|
||||
) -> Self {
|
||||
|
@ -98,7 +98,7 @@ impl ConnectionRef {
|
|||
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn id(&self) -> i32 {
|
||||
pub fn id(&self) -> UniqueIDType {
|
||||
self.0.id
|
||||
}
|
||||
|
||||
|
@ -196,7 +196,7 @@ impl InUsePool {
|
|||
|
||||
// release a h2_stream, this functional will cause an ConnectionRef to be returned (if exist)
|
||||
// the caller should update the ref and then decide where to put it (in use pool or idle)
|
||||
fn release(&self, reuse_hash: u64, id: i32) -> Option<ConnectionRef> {
|
||||
fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option<ConnectionRef> {
|
||||
let pools = self.pools.read();
|
||||
if let Some(pool) = pools.get(&reuse_hash) {
|
||||
pool.remove(id)
|
||||
|
|
|
@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|||
use super::body::{BodyReader, BodyWriter};
|
||||
use super::common::*;
|
||||
use crate::protocols::http::HttpTask;
|
||||
use crate::protocols::{Digest, SocketAddr, Stream, UniqueID};
|
||||
use crate::protocols::{Digest, SocketAddr, Stream, UniqueID, UniqueIDType};
|
||||
use crate::utils::{BufRef, KVRef};
|
||||
|
||||
/// The HTTP 1.x client session
|
||||
|
@ -717,7 +717,7 @@ pub(crate) fn http_req_header_to_wire(req: &RequestHeader) -> Option<BytesMut> {
|
|||
}
|
||||
|
||||
impl UniqueID for HttpSession {
|
||||
fn id(&self) -> i32 {
|
||||
fn id(&self) -> UniqueIDType {
|
||||
self.underlying_stream.id()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
|||
use tokio::sync::watch;
|
||||
|
||||
use crate::connectors::http::v2::ConnectionRef;
|
||||
use crate::protocols::{Digest, SocketAddr};
|
||||
use crate::protocols::{Digest, SocketAddr, UniqueIDType};
|
||||
|
||||
pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout");
|
||||
|
||||
|
@ -336,7 +336,7 @@ impl Http2Session {
|
|||
}
|
||||
|
||||
/// the FD of the underlying connection
|
||||
pub fn fd(&self) -> i32 {
|
||||
pub fn fd(&self) -> UniqueIDType {
|
||||
self.conn.id()
|
||||
}
|
||||
|
||||
|
@ -427,7 +427,7 @@ use tokio::sync::oneshot;
|
|||
|
||||
pub async fn drive_connection<S>(
|
||||
mut c: client::Connection<S>,
|
||||
id: i32,
|
||||
id: UniqueIDType,
|
||||
closed: watch::Sender<bool>,
|
||||
ping_interval: Option<Duration>,
|
||||
ping_timeout_occurred: Arc<AtomicBool>,
|
||||
|
@ -481,7 +481,7 @@ async fn do_ping_pong(
|
|||
interval: Duration,
|
||||
tx: oneshot::Sender<()>,
|
||||
dropped: Arc<AtomicBool>,
|
||||
id: i32,
|
||||
id: UniqueIDType,
|
||||
) {
|
||||
// delay before sending the first ping, no need to race with the first request
|
||||
tokio::time::sleep(interval).await;
|
||||
|
|
|
@ -30,7 +30,7 @@ use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive};
|
|||
use crate::protocols::raw_connect::ProxyDigest;
|
||||
use crate::protocols::{
|
||||
GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest,
|
||||
UniqueID,
|
||||
UniqueID, UniqueIDType,
|
||||
};
|
||||
use crate::upstreams::peer::Tracer;
|
||||
|
||||
|
@ -202,7 +202,7 @@ impl AsRawFd for Stream {
|
|||
}
|
||||
|
||||
impl UniqueID for Stream {
|
||||
fn id(&self) -> i32 {
|
||||
fn id(&self) -> UniqueIDType {
|
||||
self.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,11 @@ use std::fmt::Debug;
|
|||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(unix)]
|
||||
pub type UniqueIDType = i32;
|
||||
#[cfg(windows)]
|
||||
pub type UniqueIDType = usize;
|
||||
|
||||
/// Define how a protocol should shutdown its connection.
|
||||
#[async_trait]
|
||||
pub trait Shutdown {
|
||||
|
@ -42,7 +47,7 @@ pub trait Shutdown {
|
|||
pub trait UniqueID {
|
||||
/// The ID returned should be unique among all existing connections of the same type.
|
||||
/// But ID can be recycled after a connection is shutdown.
|
||||
fn id(&self) -> i32;
|
||||
fn id(&self) -> UniqueIDType;
|
||||
}
|
||||
|
||||
/// Interface to get TLS info
|
||||
|
@ -126,7 +131,7 @@ mod ext_io_impl {
|
|||
async fn shutdown(&mut self) -> () {}
|
||||
}
|
||||
impl UniqueID for Mock {
|
||||
fn id(&self) -> i32 {
|
||||
fn id(&self) -> UniqueIDType {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
@ -154,7 +159,7 @@ mod ext_io_impl {
|
|||
async fn shutdown(&mut self) -> () {}
|
||||
}
|
||||
impl<T> UniqueID for Cursor<T> {
|
||||
fn id(&self) -> i32 {
|
||||
fn id(&self) -> UniqueIDType {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
@ -182,7 +187,7 @@ mod ext_io_impl {
|
|||
async fn shutdown(&mut self) -> () {}
|
||||
}
|
||||
impl UniqueID for DuplexStream {
|
||||
fn id(&self) -> i32 {
|
||||
fn id(&self) -> UniqueIDType {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
@ -204,15 +209,27 @@ mod ext_io_impl {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) trait ConnFdReusable {
|
||||
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
pub(crate) trait ConnSockReusable {
|
||||
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
|
||||
}
|
||||
|
||||
use l4::socket::SocketAddr;
|
||||
use log::{debug, error};
|
||||
#[cfg(unix)]
|
||||
use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
|
||||
use std::{net::SocketAddr as InetSocketAddr, os::unix::prelude::AsRawFd, path::Path};
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
#[cfg(windows)]
|
||||
use std::os::windows::io::AsRawSocket;
|
||||
use std::{net::SocketAddr as InetSocketAddr, path::Path};
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ConnFdReusable for SocketAddr {
|
||||
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
|
||||
match self {
|
||||
|
@ -225,6 +242,16 @@ impl ConnFdReusable for SocketAddr {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
impl ConnSockReusable for SocketAddr {
|
||||
fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
|
||||
match self {
|
||||
SocketAddr::Inet(addr) => addr.check_sock_match(sock),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ConnFdReusable for Path {
|
||||
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
|
||||
let fd = fd.as_raw_fd();
|
||||
|
@ -252,6 +279,7 @@ impl ConnFdReusable for Path {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ConnFdReusable for InetSocketAddr {
|
||||
fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
|
||||
let fd = fd.as_raw_fd();
|
||||
|
|
|
@ -26,7 +26,7 @@ pub use boringssl_openssl::*;
|
|||
pub mod dummy_tls;
|
||||
|
||||
use crate::protocols::digest::TimingDigest;
|
||||
use crate::protocols::{Ssl, UniqueID};
|
||||
use crate::protocols::{Ssl, UniqueID, UniqueIDType};
|
||||
use crate::tls::{self, ssl, tokio_ssl::SslStream as InnerSsl};
|
||||
use log::warn;
|
||||
use pingora_error::{ErrorType::*, OrErr, Result};
|
||||
|
@ -169,7 +169,7 @@ impl<T> UniqueID for SslStream<T>
|
|||
where
|
||||
T: UniqueID,
|
||||
{
|
||||
fn id(&self) -> i32 {
|
||||
fn id(&self) -> UniqueIDType {
|
||||
self.ssl.get_ref().id()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue