diff --git a/.bleep b/.bleep index 0f44214..5081a5b 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -30c624970d46b07efd110fcfb8dd0b6f9a099e2b \ No newline at end of file +2351cdf592f9986201d754e6ee1f37f493f69abb \ No newline at end of file diff --git a/pingora-cache/Cargo.toml b/pingora-cache/Cargo.toml index 2dc3f2f..6ae6c12 100644 --- a/pingora-cache/Cargo.toml +++ b/pingora-cache/Cargo.toml @@ -43,6 +43,7 @@ lru = { workspace = true } ahash = { workspace = true } hex = "0.4" httparse = { workspace = true } +strum = { version = "0.26", features = ["derive"] } [dev-dependencies] tokio-test = "0.4" diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index cdcfcc9..f1f8bd2 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -21,6 +21,7 @@ use key::{CacheHashKey, HashBinary}; use lock::WritePermit; use pingora_error::Result; use pingora_http::ResponseHeader; +use rustracing::tag::Tag; use std::time::{Duration, Instant, SystemTime}; use trace::CacheTraceCTX; @@ -1047,7 +1048,7 @@ impl HttpCache { /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock. pub async fn cache_lock_wait(&mut self) -> LockStatus { let inner = self.inner_mut(); - let _span = inner.traces.child("cache_lock"); + let mut span = inner.traces.child("cache_lock"); let lock = inner.lock.take(); // remove the lock from self if let Some(Locked::Read(r)) = lock { let now = Instant::now(); @@ -1059,7 +1060,10 @@ impl HttpCache { .lock_duration .map_or(lock_duration, |d| d + lock_duration), ); - r.lock_status() // TODO: tag the span with lock status + let status = r.lock_status(); + let tag_value: &'static str = status.into(); + span.set_tag(|| Tag::new("status", tag_value)); + status } else { // should always call is_cache_locked() before this function panic!("cache_lock_wait on wrong type of lock") diff --git a/pingora-cache/src/lock.rs b/pingora-cache/src/lock.rs index 7f50691..8853cb1 100644 --- a/pingora-cache/src/lock.rs +++ b/pingora-cache/src/lock.rs @@ -100,12 +100,14 @@ impl CacheLock { } } +use log::warn; use std::sync::atomic::{AtomicU8, Ordering}; use std::time::{Duration, Instant}; +use strum::IntoStaticStr; use tokio::sync::Semaphore; /// Status which the read locks could possibly see. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)] pub enum LockStatus { /// Waiting for the writer to populate the asset Waiting, @@ -180,7 +182,7 @@ impl LockCore { } fn lock_status(&self) -> LockStatus { - self.lock_status.load(Ordering::Relaxed).into() + self.lock_status.load(Ordering::SeqCst).into() } } @@ -197,11 +199,22 @@ impl ReadLock { return; } - // TODO: should subtract now - start so that the lock don't wait beyond start + timeout - // Also need to be careful not to wake everyone up at the same time + // TODO: need to be careful not to wake everyone up at the same time // (maybe not an issue because regular cache lock release behaves that way) - let _ = timeout(self.0.timeout, self.0.lock.acquire()).await; - // permit is returned to Semaphore right away + if let Some(duration) = self.0.timeout.checked_sub(self.0.lock_start.elapsed()) { + match timeout(duration, self.0.lock.acquire()).await { + Ok(Ok(_)) => { // permit is returned to Semaphore right away + } + Ok(Err(e)) => { + warn!("error acquiring semaphore {e:?}") + } + Err(_) => { + self.0 + .lock_status + .store(LockStatus::Timeout.into(), Ordering::SeqCst); + } + } + } } /// Test if it is still locked @@ -211,7 +224,7 @@ impl ReadLock { /// Whether the lock is expired, e.g., the writer has been holding the lock for too long pub fn expired(&self) -> bool { - // NOTE: this whether the lock is currently expired + // NOTE: this is whether the lock is currently expired // not whether it was timed out during wait() self.0.lock_start.elapsed() >= self.0.timeout }