diff --git a/.bleep b/.bleep index 4536067..4cef6a0 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -80970257fefa5cff505f63552dd2ae74372d501a \ No newline at end of file +36269b8823b23381398508138bbab33c03ba7681 \ No newline at end of file diff --git a/pingora-load-balancing/src/health_check.rs b/pingora-load-balancing/src/health_check.rs index ba5e6c0..b02776d 100644 --- a/pingora-load-balancing/src/health_check.rs +++ b/pingora-load-balancing/src/health_check.rs @@ -24,6 +24,14 @@ use pingora_http::{RequestHeader, ResponseHeader}; use std::sync::Arc; use std::time::Duration; +#[async_trait] +pub trait HealthObserve { + /// Observes the health of a [Backend], can be used for monitoring purposes. + async fn observe(&self, target: &Backend, healthy: bool); +} +/// Provided to a [HealthCheck] to observe changes to [Backend] health. +pub type HealthObserveCallback = Box; + /// [HealthCheck] is the interface to implement health check for backends #[async_trait] pub trait HealthCheck { @@ -31,6 +39,10 @@ pub trait HealthCheck { /// /// `Ok(())`` if the check passes, otherwise the check fails. async fn check(&self, target: &Backend) -> Result<()>; + + /// Called when the health changes for a [Backend]. + async fn health_status_change(&self, _target: &Backend, _healthy: bool) {} + /// This function defines how many *consecutive* checks should flip the health of a backend. /// /// For example: with `success``: `true`: this function should return the @@ -56,6 +68,8 @@ pub struct TcpHealthCheck { /// set, it will also try to establish a TLS connection on top of the TCP connection. pub peer_template: BasicPeer, connector: TransportConnector, + /// A callback that is invoked when the `healthy` status changes for a [Backend]. + pub health_changed_callback: Option, } impl Default for TcpHealthCheck { @@ -67,6 +81,7 @@ impl Default for TcpHealthCheck { consecutive_failure: 1, peer_template, connector: TransportConnector::new(None), + health_changed_callback: None, } } } @@ -110,6 +125,12 @@ impl HealthCheck for TcpHealthCheck { peer._address = target.addr.clone(); self.connector.get_stream(&peer).await.map(|_| {}) } + + async fn health_status_change(&self, target: &Backend, healthy: bool) { + if let Some(callback) = &self.health_changed_callback { + callback.observe(target, healthy).await; + } + } } type Validator = Box Result<()> + Send + Sync>; @@ -147,6 +168,8 @@ pub struct HttpHealthCheck { /// Sometimes the health check endpoint lives one a different port than the actual backend. /// Setting this option allows the health check to perform on the given port of the backend IP. pub port_override: Option, + /// A callback that is invoked when the `healthy` status changes for a [Backend]. + pub health_changed_callback: Option, } impl HttpHealthCheck { @@ -174,6 +197,7 @@ impl HttpHealthCheck { req, validator: None, port_override: None, + health_changed_callback: None, } } @@ -235,6 +259,11 @@ impl HealthCheck for HttpHealthCheck { Ok(()) } + async fn health_status_change(&self, target: &Backend, healthy: bool) { + if let Some(callback) = &self.health_changed_callback { + callback.observe(target, healthy).await; + } + } } #[derive(Clone)] @@ -313,8 +342,14 @@ impl Health { #[cfg(test)] mod test { + use std::{ + collections::{BTreeSet, HashMap}, + sync::atomic::{AtomicU16, Ordering}, + }; + use super::*; - use crate::SocketAddr; + use crate::{discovery, Backends, SocketAddr}; + use async_trait::async_trait; use http::Extensions; #[tokio::test] @@ -387,4 +422,78 @@ mod test { assert!(http_check.check(&backend).await.is_ok()); } + + #[tokio::test] + async fn test_health_observe() { + struct Observe { + unhealthy_count: Arc, + } + #[async_trait] + impl HealthObserve for Observe { + async fn observe(&self, _target: &Backend, healthy: bool) { + if !healthy { + self.unhealthy_count.fetch_add(1, Ordering::Relaxed); + } + } + } + + let good_backend = Backend::new("127.0.0.1:79").unwrap(); + let new_good_backends = || -> (BTreeSet, HashMap) { + let mut healthy = HashMap::new(); + healthy.insert(good_backend.hash_key(), true); + let mut backends = BTreeSet::new(); + backends.extend(vec![good_backend.clone()]); + (backends, healthy) + }; + // tcp health check + { + let unhealthy_count = Arc::new(AtomicU16::new(0)); + let ob = Observe { + unhealthy_count: unhealthy_count.clone(), + }; + let bob = Box::new(ob); + let tcp_check = TcpHealthCheck { + health_changed_callback: Some(bob), + ..Default::default() + }; + + let discovery = discovery::Static::default(); + let mut backends = Backends::new(Box::new(discovery)); + backends.set_health_check(Box::new(tcp_check)); + let result = new_good_backends(); + backends.do_update(result.0, result.1, |_backend: Arc>| {}); + // the backend is ready + assert!(backends.ready(&good_backend)); + + // run health check + backends.run_health_check(false).await; + assert!(1 == unhealthy_count.load(Ordering::Relaxed)); + // backend is unhealthy + assert!(!backends.ready(&good_backend)); + } + + // http health check + { + let unhealthy_count = Arc::new(AtomicU16::new(0)); + let ob = Observe { + unhealthy_count: unhealthy_count.clone(), + }; + let bob = Box::new(ob); + + let mut https_check = HttpHealthCheck::new("one.one.one.one", true); + https_check.health_changed_callback = Some(bob); + + let discovery = discovery::Static::default(); + let mut backends = Backends::new(Box::new(discovery)); + backends.set_health_check(Box::new(https_check)); + let result = new_good_backends(); + backends.do_update(result.0, result.1, |_backend: Arc>| {}); + // the backend is ready + assert!(backends.ready(&good_backend)); + // run health check + backends.run_health_check(false).await; + assert!(1 == unhealthy_count.load(Ordering::Relaxed)); + assert!(!backends.ready(&good_backend)); + } + } } diff --git a/pingora-load-balancing/src/lib.rs b/pingora-load-balancing/src/lib.rs index 4009dc1..4a7433e 100644 --- a/pingora-load-balancing/src/lib.rs +++ b/pingora-load-balancing/src/lib.rs @@ -266,6 +266,7 @@ impl Backends { let flipped = h.observe_health(errored.is_none(), check.health_threshold(errored.is_none())); if flipped { + check.health_status_change(backend, errored.is_none()).await; if let Some(e) = errored { warn!("{backend:?} becomes unhealthy, {e}"); } else {