mirror of
https://github.com/cloudflare/pingora.git
synced 2024-09-20 02:31:35 +02:00
change lock status memory ordering, tag spans
This changes the memory ordering for the lock status load to `SeqCst` from `Relaxed` to eliminate a potential source of panics. Panics had the frames: ``` pingora_proxy::proxy_cache::<T>::handle_lock_status (proxy_cache.rs:748) pingora_proxy::proxy_cache::<T>::proxy_cache::{{closure}} (proxy_cache.rs:211) pingora_proxy::HttpProxy<T>::process_request::{{closure}} (lib.rs:509) pingora_proxy::HttpProxy<T>::process_new_http::{{closure}} (lib.rs:727) ``` which showed we were checking on the status of the lock, after waiting on it, and still seeing its status as waiting. The status is returned by value, so this is not a time-of-check to time-of-use problem, this is an inconsistency in how the lock status is managed. The change in memory order is mostly for the sake of this programmer's attempts to understand what is happening. This also completes a couple of TODOs to limit the wait period as well as tag the span with the lock status.
This commit is contained in:
parent
5e3e774a8d
commit
e288bfe8f0
4 changed files with 28 additions and 10 deletions
2
.bleep
2
.bleep
|
@ -1 +1 @@
|
||||||
30c624970d46b07efd110fcfb8dd0b6f9a099e2b
|
2351cdf592f9986201d754e6ee1f37f493f69abb
|
|
@ -43,6 +43,7 @@ lru = { workspace = true }
|
||||||
ahash = { workspace = true }
|
ahash = { workspace = true }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
httparse = { workspace = true }
|
httparse = { workspace = true }
|
||||||
|
strum = { version = "0.26", features = ["derive"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-test = "0.4"
|
tokio-test = "0.4"
|
||||||
|
|
|
@ -21,6 +21,7 @@ use key::{CacheHashKey, HashBinary};
|
||||||
use lock::WritePermit;
|
use lock::WritePermit;
|
||||||
use pingora_error::Result;
|
use pingora_error::Result;
|
||||||
use pingora_http::ResponseHeader;
|
use pingora_http::ResponseHeader;
|
||||||
|
use rustracing::tag::Tag;
|
||||||
use std::time::{Duration, Instant, SystemTime};
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
use trace::CacheTraceCTX;
|
use trace::CacheTraceCTX;
|
||||||
|
|
||||||
|
@ -1047,7 +1048,7 @@ impl HttpCache {
|
||||||
/// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
|
/// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
|
||||||
pub async fn cache_lock_wait(&mut self) -> LockStatus {
|
pub async fn cache_lock_wait(&mut self) -> LockStatus {
|
||||||
let inner = self.inner_mut();
|
let inner = self.inner_mut();
|
||||||
let _span = inner.traces.child("cache_lock");
|
let mut span = inner.traces.child("cache_lock");
|
||||||
let lock = inner.lock.take(); // remove the lock from self
|
let lock = inner.lock.take(); // remove the lock from self
|
||||||
if let Some(Locked::Read(r)) = lock {
|
if let Some(Locked::Read(r)) = lock {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
@ -1059,7 +1060,10 @@ impl HttpCache {
|
||||||
.lock_duration
|
.lock_duration
|
||||||
.map_or(lock_duration, |d| d + lock_duration),
|
.map_or(lock_duration, |d| d + lock_duration),
|
||||||
);
|
);
|
||||||
r.lock_status() // TODO: tag the span with lock status
|
let status = r.lock_status();
|
||||||
|
let tag_value: &'static str = status.into();
|
||||||
|
span.set_tag(|| Tag::new("status", tag_value));
|
||||||
|
status
|
||||||
} else {
|
} else {
|
||||||
// should always call is_cache_locked() before this function
|
// should always call is_cache_locked() before this function
|
||||||
panic!("cache_lock_wait on wrong type of lock")
|
panic!("cache_lock_wait on wrong type of lock")
|
||||||
|
|
|
@ -100,12 +100,14 @@ impl CacheLock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use log::warn;
|
||||||
use std::sync::atomic::{AtomicU8, Ordering};
|
use std::sync::atomic::{AtomicU8, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
use strum::IntoStaticStr;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
/// Status which the read locks could possibly see.
|
/// Status which the read locks could possibly see.
|
||||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq, IntoStaticStr)]
|
||||||
pub enum LockStatus {
|
pub enum LockStatus {
|
||||||
/// Waiting for the writer to populate the asset
|
/// Waiting for the writer to populate the asset
|
||||||
Waiting,
|
Waiting,
|
||||||
|
@ -180,7 +182,7 @@ impl LockCore {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock_status(&self) -> LockStatus {
|
fn lock_status(&self) -> LockStatus {
|
||||||
self.lock_status.load(Ordering::Relaxed).into()
|
self.lock_status.load(Ordering::SeqCst).into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,11 +199,22 @@ impl ReadLock {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: should subtract now - start so that the lock don't wait beyond start + timeout
|
// TODO: need to be careful not to wake everyone up at the same time
|
||||||
// Also need to be careful not to wake everyone up at the same time
|
|
||||||
// (maybe not an issue because regular cache lock release behaves that way)
|
// (maybe not an issue because regular cache lock release behaves that way)
|
||||||
let _ = timeout(self.0.timeout, self.0.lock.acquire()).await;
|
if let Some(duration) = self.0.timeout.checked_sub(self.0.lock_start.elapsed()) {
|
||||||
// permit is returned to Semaphore right away
|
match timeout(duration, self.0.lock.acquire()).await {
|
||||||
|
Ok(Ok(_)) => { // permit is returned to Semaphore right away
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
warn!("error acquiring semaphore {e:?}")
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
self.0
|
||||||
|
.lock_status
|
||||||
|
.store(LockStatus::Timeout.into(), Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test if it is still locked
|
/// Test if it is still locked
|
||||||
|
@ -211,7 +224,7 @@ impl ReadLock {
|
||||||
|
|
||||||
/// Whether the lock is expired, e.g., the writer has been holding the lock for too long
|
/// Whether the lock is expired, e.g., the writer has been holding the lock for too long
|
||||||
pub fn expired(&self) -> bool {
|
pub fn expired(&self) -> bool {
|
||||||
// NOTE: this whether the lock is currently expired
|
// NOTE: this is whether the lock is currently expired
|
||||||
// not whether it was timed out during wait()
|
// not whether it was timed out during wait()
|
||||||
self.0.lock_start.elapsed() >= self.0.timeout
|
self.0.lock_start.elapsed() >= self.0.timeout
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue