test: add test for upstream health observe

This commit is contained in:
vicanso 2024-07-13 20:18:56 +08:00
parent fb62869583
commit e6c2af0e77

View file

@ -348,10 +348,13 @@ impl Health {
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicU16, Ordering};
use std::{
collections::{BTreeSet, HashMap},
sync::atomic::{AtomicU16, Ordering},
};
use super::*;
use crate::SocketAddr;
use crate::{discovery, Backends, SocketAddr};
use async_trait::async_trait;
#[tokio::test]
@ -420,53 +423,75 @@ mod test {
assert!(http_check.check(&backend).await.is_ok());
}
struct Observe {
healthy_count: Arc<AtomicU16>,
}
#[async_trait]
impl HealthObserve for Observe {
async fn health_check_callback(&self, _target: &Backend, healthy: bool) {
if healthy {
self.healthy_count.fetch_add(1, Ordering::Relaxed);
#[tokio::test]
async fn test_health_observe() {
struct Observe {
unhealthy_count: Arc<AtomicU16>,
}
#[async_trait]
impl HealthObserve for Observe {
async fn health_check_callback(&self, _target: &Backend, healthy: bool) {
if !healthy {
self.unhealthy_count.fetch_add(1, Ordering::Relaxed);
}
}
}
}
#[tokio::test]
async fn test_tcp_health_observe() {
let healthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
healthy_count: healthy_count.clone(),
let good_backend = Backend::new("127.0.0.1:79").unwrap();
let new_good_backends = || -> (BTreeSet<Backend>, HashMap<u64, bool>) {
let mut healthy = HashMap::new();
healthy.insert(good_backend.hash_key(), true);
let mut backends = BTreeSet::new();
backends.extend(vec![good_backend.clone()]);
(backends, healthy)
};
let bob = Box::new(ob);
// tcp health check
{
let unhealthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
unhealthy_count: unhealthy_count.clone(),
};
let bob = Box::new(ob);
let mut tcp_check = TcpHealthCheck::default();
tcp_check.set_callback(bob);
let mut tcp_check = TcpHealthCheck::default();
tcp_check.set_callback(bob);
let discovery = discovery::Static::default();
let mut backends = Backends::new(Box::new(discovery));
backends.set_health_check(Box::new(tcp_check));
let result = new_good_backends();
backends.do_update(result.0, result.1);
// the backend is ready
assert!(backends.ready(&good_backend));
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:80".parse().unwrap()),
weight: 1,
};
// run health check
backends.run_health_check(false).await;
assert!(1 == unhealthy_count.load(Ordering::Relaxed));
// backend is unhealthy
assert!(!backends.ready(&good_backend));
}
assert!(tcp_check.check(&backend).await.is_ok());
assert!(1 == healthy_count.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_https_health_observe() {
let healthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
healthy_count: healthy_count.clone(),
};
let bob = Box::new(ob);
let mut https_check = HttpHealthCheck::new("one.one.one.one", true);
https_check.set_callback(bob);
// http health check
{
let unhealthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
unhealthy_count: unhealthy_count.clone(),
};
let bob = Box::new(ob);
let backend = Backend {
addr: SocketAddr::Inet("1.1.1.1:443".parse().unwrap()),
weight: 1,
};
let mut https_check = HttpHealthCheck::new("one.one.one.one", true);
https_check.set_callback(bob);
assert!(https_check.check(&backend).await.is_ok());
assert!(1 == healthy_count.load(Ordering::Relaxed));
let discovery = discovery::Static::default();
let mut backends = Backends::new(Box::new(discovery));
backends.set_health_check(Box::new(https_check));
let result = new_good_backends();
backends.do_update(result.0, result.1);
// the backend is ready
assert!(backends.ready(&good_backend));
// run health check
backends.run_health_check(false).await;
assert!(1 == unhealthy_count.load(Ordering::Relaxed));
assert!(!backends.ready(&good_backend));
}
}
}