Support opaque extension field in Backend

Sometimes service dicovery can carry arbitrary information, such as
SNI and request path to instrument how to connect to backend servers.

Backend now support to carry this type of information.
This commit is contained in:
Yuchen Wu 2024-07-19 11:22:20 -07:00 committed by Yuchen Wu
parent a51874039f
commit 999e379064
5 changed files with 57 additions and 2 deletions

2
.bleep
View file

@ -1 +1 @@
9fdf48d67b78675c989f51ec18829a81fe6976ef
9b88d76089e0f81c67cb502422148d4d26d4977e

View file

@ -29,6 +29,8 @@ rand = "0"
tokio = { workspace = true }
futures = "0"
log = { workspace = true }
http = { workspace = true }
derivative = "2.2.0"
[dev-dependencies]

View file

@ -16,6 +16,7 @@
use arc_swap::ArcSwap;
use async_trait::async_trait;
use http::Extensions;
use pingora_core::protocols::l4::socket::SocketAddr;
use pingora_error::Result;
use std::io::Result as IoResult;
@ -62,6 +63,7 @@ impl Static {
let addrs = addrs.to_socket_addrs()?.map(|addr| Backend {
addr: SocketAddr::Inet(addr),
weight: 1,
ext: Extensions::new(),
});
upstreams.extend(addrs);
}

View file

@ -315,6 +315,7 @@ impl Health {
mod test {
use super::*;
use crate::SocketAddr;
use http::Extensions;
#[tokio::test]
async fn test_tcp_check() {
@ -323,6 +324,7 @@ mod test {
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:80".parse().unwrap()),
weight: 1,
ext: Extensions::new(),
};
assert!(tcp_check.check(&backend).await.is_ok());
@ -330,6 +332,7 @@ mod test {
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:79".parse().unwrap()),
weight: 1,
ext: Extensions::new(),
};
assert!(tcp_check.check(&backend).await.is_err());
@ -341,6 +344,7 @@ mod test {
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:443".parse().unwrap()),
weight: 1,
ext: Extensions::new(),
};
assert!(tls_check.check(&backend).await.is_ok());
@ -353,6 +357,7 @@ mod test {
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:443".parse().unwrap()),
weight: 1,
ext: Extensions::new(),
};
assert!(https_check.check(&backend).await.is_ok());
@ -375,6 +380,7 @@ mod test {
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:80".parse().unwrap()),
weight: 1,
ext: Extensions::new(),
};
http_check.check(&backend).await.unwrap();

View file

@ -16,8 +16,14 @@
//! This crate provides common service discovery, health check and load balancing
//! algorithms for proxies to use.
// https://github.com/mcarton/rust-derivative/issues/112
// False positive for macro generated code
#![allow(clippy::non_canonical_partial_ord_impl)]
use arc_swap::ArcSwap;
use derivative::Derivative;
use futures::FutureExt;
pub use http::Extensions;
use pingora_core::protocols::l4::socket::SocketAddr;
use pingora_error::{ErrorType, OrErr, Result};
use std::collections::hash_map::DefaultHasher;
@ -45,13 +51,26 @@ pub mod prelude {
}
/// [Backend] represents a server to proxy or connect to.
#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Debug)]
#[derive(Derivative)]
#[derivative(Clone, Hash, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub struct Backend {
/// The address to the backend server.
pub addr: SocketAddr,
/// The relative weight of the server. Load balancing algorithms will
/// proportionally distributed traffic according to this value.
pub weight: usize,
/// The extension field to put arbitrary data to annotate the Backend.
/// The data added here is opaque to this crate hence the data is ignored by
/// functionalities of this crate. For example, two backends with the same
/// [SocketAddr] and the same weight but different `ext` data are considered
/// identical.
/// See [Extensions] for how to add and read the data.
#[derivative(PartialEq = "ignore")]
#[derivative(PartialOrd = "ignore")]
#[derivative(Hash = "ignore")]
#[derivative(Ord = "ignore")]
pub ext: Extensions,
}
impl Backend {
@ -64,6 +83,7 @@ impl Backend {
Ok(Backend {
addr: SocketAddr::Inet(addr),
weight: 1,
ext: Extensions::new(),
})
// TODO: UDS
}
@ -449,6 +469,31 @@ mod test {
assert!(backends.ready(&good2));
assert!(!backends.ready(&bad));
}
#[tokio::test]
async fn test_backends_with_ext() {
let discovery = discovery::Static::default();
let mut b1 = Backend::new("1.1.1.1:80").unwrap();
b1.ext.insert(true);
let mut b2 = Backend::new("1.0.0.1:80").unwrap();
b2.ext.insert(1u8);
discovery.add(b1.clone());
discovery.add(b2.clone());
let backends = Backends::new(Box::new(discovery));
// fill in the backends
backends.update().await.unwrap();
let backend = backends.get_backend();
assert!(backend.contains(&b1));
assert!(backend.contains(&b2));
let b2 = backend.first().unwrap();
assert_eq!(b2.ext.get::<u8>(), Some(&1));
let b1 = backend.last().unwrap();
assert_eq!(b1.ext.get::<bool>(), Some(&true));
}
#[tokio::test]
async fn test_discovery_readiness() {