mirror of
https://github.com/cloudflare/pingora.git
synced 2024-09-20 02:31:35 +02:00
TinyUFO: add the option to use sharded skip list for storage
This option makes it more memory efficient but a bit slower.
This commit is contained in:
parent
b9d4428809
commit
ab86012c66
10 changed files with 475 additions and 110 deletions
2
.bleep
2
.bleep
|
@ -1 +1 @@
|
|||
deb3c5409e938ec9c7d0da9b7a2d331eabbb2cd5
|
||||
b1c09703606d32b02f24d2e77d82936ba95e8064
|
|
@ -81,7 +81,7 @@ pub struct MemoryCache<K: Hash, T: Clone> {
|
|||
pub(crate) hasher: RandomState,
|
||||
}
|
||||
|
||||
impl<K: Hash, T: Clone + Send + Sync> MemoryCache<K, T> {
|
||||
impl<K: Hash, T: Clone + Send + Sync + 'static> MemoryCache<K, T> {
|
||||
/// Create a new [MemoryCache] with the given size.
|
||||
pub fn new(size: usize) -> Self {
|
||||
MemoryCache {
|
||||
|
|
|
@ -123,7 +123,7 @@ where
|
|||
impl<K, T, CB, S> RTCache<K, T, CB, S>
|
||||
where
|
||||
K: Hash + Send,
|
||||
T: Clone + Send + Sync,
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// Create a new [RTCache] of given size. `lock_age` defines how long a lock is valid for.
|
||||
/// `lock_timeout` is used to stop a lookup from holding on to the key for too long.
|
||||
|
@ -142,7 +142,7 @@ where
|
|||
impl<K, T, CB, S> RTCache<K, T, CB, S>
|
||||
where
|
||||
K: Hash + Send,
|
||||
T: Clone + Send + Sync,
|
||||
T: Clone + Send + Sync + 'static,
|
||||
CB: Lookup<K, T, S>,
|
||||
{
|
||||
/// Query the cache for a given value. If it exists and no TTL is configured initially, it will
|
||||
|
@ -288,7 +288,7 @@ where
|
|||
impl<K, T, CB, S> RTCache<K, T, CB, S>
|
||||
where
|
||||
K: Hash + Send,
|
||||
T: Clone + Send + Sync,
|
||||
T: Clone + Send + Sync + 'static,
|
||||
CB: MultiLookup<K, T, S>,
|
||||
{
|
||||
/// Same behavior as [RTCache::get] but for an arbitrary amount of keys.
|
||||
|
|
|
@ -20,6 +20,7 @@ ahash = { workspace = true }
|
|||
flurry = "<0.5.0" # Try not to require Rust 1.71
|
||||
parking_lot = "0"
|
||||
crossbeam-queue = "0"
|
||||
crossbeam-skiplist = "0"
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0"
|
||||
|
|
|
@ -38,12 +38,12 @@ Because of TinyUFO's lock-free design, it greatly outperforms the others.
|
|||
|
||||
### Memory overhead
|
||||
|
||||
TinyUFO provides a compact mode to trade raw read speed for more memory efficiency. Whether the saving worthy the trade off depends on the actual size and the work load. For small in-memory assets, the saved memory means more things can be cached.
|
||||
|
||||
The table below show the memory allocation (in bytes) of the compared cache library under certain workloads to store zero-sized assets.
|
||||
|
||||
| cache size | TinyUFO | LRU | moka |
|
||||
| -------- | ------- | ------- | ------ |
|
||||
| 100 | 39,409 | 9,408 | 354,376
|
||||
| 1000 | 236,053 | 128,512 | 535,888
|
||||
| 10000 | 2,290,635 | 1,075,648 | 2,489,088
|
||||
|
||||
Whether these overheads matter depends on the actual sizes and volume of the assets. The more advanced algorithms are likely to be less memory efficient than the simple LRU.
|
||||
| cache size | TinyUFO | TinyUFO compact | LRU | moka |
|
||||
| -------- | ------- | ------- | ------- | ------ |
|
||||
| 100 | 39,409 | 19,000 | 9,408 | 354,376
|
||||
| 1000 | 236,053 | 86,352 | 128,512 | 535,888
|
||||
| 10000 | 2,290,635 | 766,024| 1,075,648 | 2,489,088
|
|
@ -68,6 +68,22 @@ fn bench_tinyufo(zip_exp: f64, items: usize, cache_size_percent: f32) {
|
|||
}
|
||||
}
|
||||
|
||||
fn bench_tinyufo_compact(zip_exp: f64, items: usize, cache_size_percent: f32) {
|
||||
let cache_size = (cache_size_percent * items as f32).round() as usize;
|
||||
let tinyufo = tinyufo::TinyUfo::new_compact(cache_size, (cache_size as f32 * 1.0) as usize);
|
||||
|
||||
let mut rng = thread_rng();
|
||||
let zipf = zipf::ZipfDistribution::new(items, zip_exp).unwrap();
|
||||
|
||||
for _ in 0..ITERATIONS {
|
||||
let key = zipf.sample(&mut rng) as u64;
|
||||
|
||||
if tinyufo.get(&key).is_none() {
|
||||
tinyufo.put(key, (), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
cargo bench --bench bench_memory
|
||||
|
||||
|
@ -78,6 +94,8 @@ moka
|
|||
dhat: At t-gmax: 354,232 bytes in 1,581 blocks
|
||||
TinyUFO
|
||||
dhat: At t-gmax: 37,337 bytes in 351 blocks
|
||||
TinyUFO compat
|
||||
dhat: At t-gmax: 19,000 bytes in 60 blocks
|
||||
|
||||
total items 10000, cache size 10%
|
||||
lru
|
||||
|
@ -86,6 +104,8 @@ moka
|
|||
dhat: At t-gmax: 535,320 bytes in 7,278 blocks
|
||||
TinyUFO
|
||||
dhat: At t-gmax: 236,053 bytes in 2,182 blocks
|
||||
TinyUFO Compact
|
||||
dhat: At t-gmax: 86,352 bytes in 1,128 blocks
|
||||
|
||||
total items 100000, cache size 10%
|
||||
lru
|
||||
|
@ -94,6 +114,8 @@ moka
|
|||
dhat: At t-gmax: 2,489,088 bytes in 62,374 blocks
|
||||
TinyUFO
|
||||
dhat: At t-gmax: 2,290,635 bytes in 20,467 blocks
|
||||
TinyUFO
|
||||
dhat: At t-gmax: 766,024 bytes in 10,421 blocks
|
||||
*/
|
||||
|
||||
fn main() {
|
||||
|
@ -116,5 +138,11 @@ fn main() {
|
|||
bench_tinyufo(1.05, items, 0.1);
|
||||
println!("\nTinyUFO");
|
||||
}
|
||||
|
||||
{
|
||||
let _profiler = dhat::Profiler::new_heap();
|
||||
bench_tinyufo_compact(1.05, items, 0.1);
|
||||
println!("\nTinyUFO Compact");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ Below is from Linux + Ryzen 5 7600 CPU
|
|||
lru read total 150.423567ms, 30ns avg per operation, 33239472 ops per second
|
||||
moka read total 462.133322ms, 92ns avg per operation, 10819389 ops per second
|
||||
tinyufo read total 199.007359ms, 39ns avg per operation, 25124698 ops per second
|
||||
tinyufo compact read total 331.145859ms, 66ns avg per operation, 15099087 ops per second
|
||||
|
||||
lru read total 5.402631847s, 1.08µs avg per operation, 925474 ops per second
|
||||
...
|
||||
|
@ -45,6 +46,10 @@ tinyufo read total 208.346855ms, 41ns avg per operation, 23998444 ops per second
|
|||
...
|
||||
total 148691408 ops per second
|
||||
|
||||
tinyufo compact read total 539.403037ms, 107ns avg per operation, 9269507 ops per second
|
||||
...
|
||||
total 74130632 ops per second
|
||||
|
||||
lru mixed read/write 5.500309876s, 1.1µs avg per operation, 909039 ops per second, 407431 misses
|
||||
...
|
||||
total 6846743 ops per second
|
||||
|
@ -56,6 +61,10 @@ total 16557962 ops per second
|
|||
tinyufo mixed read/write 456.134531ms, 91ns avg per operation, 10961678 ops per second, 294977 misses
|
||||
...
|
||||
total 80865792 ops per second
|
||||
|
||||
tinyufo compact mixed read/write 638.770053ms, 127ns avg per operation, 7827543 ops per second, 294641 misses
|
||||
...
|
||||
total 62600844 ops per second
|
||||
*/
|
||||
|
||||
fn main() {
|
||||
|
@ -63,12 +72,14 @@ fn main() {
|
|||
let lru = Mutex::new(lru::LruCache::<u64, ()>::unbounded());
|
||||
let moka = moka::sync::Cache::new(ITEMS as u64 + 10);
|
||||
let tinyufo = tinyufo::TinyUfo::new(ITEMS + 10, 10);
|
||||
let tinyufo_compact = tinyufo::TinyUfo::new_compact(ITEMS + 10, 10);
|
||||
|
||||
// populate first, then we bench access/promotion
|
||||
for i in 0..ITEMS {
|
||||
lru.lock().unwrap().put(i as u64, ());
|
||||
moka.insert(i as u64, ());
|
||||
tinyufo.put(i as u64, (), 1);
|
||||
tinyufo_compact.put(i as u64, (), 1);
|
||||
}
|
||||
|
||||
// single thread
|
||||
|
@ -108,6 +119,17 @@ fn main() {
|
|||
(ITERATIONS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
|
||||
let before = Instant::now();
|
||||
for _ in 0..ITERATIONS {
|
||||
tinyufo_compact.get(&(zipf.sample(&mut rng) as u64));
|
||||
}
|
||||
let elapsed = before.elapsed();
|
||||
println!(
|
||||
"tinyufo compact read total {elapsed:?}, {:?} avg per operation, {} ops per second",
|
||||
elapsed / ITERATIONS as u32,
|
||||
(ITERATIONS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
|
||||
// concurrent
|
||||
|
||||
let before = Instant::now();
|
||||
|
@ -185,6 +207,31 @@ fn main() {
|
|||
(ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
|
||||
let before = Instant::now();
|
||||
thread::scope(|s| {
|
||||
for _ in 0..THREADS {
|
||||
s.spawn(|| {
|
||||
let mut rng = thread_rng();
|
||||
let zipf = zipf::ZipfDistribution::new(ITEMS, 1.03).unwrap();
|
||||
let before = Instant::now();
|
||||
for _ in 0..ITERATIONS {
|
||||
tinyufo_compact.get(&(zipf.sample(&mut rng) as u64));
|
||||
}
|
||||
let elapsed = before.elapsed();
|
||||
println!(
|
||||
"tinyufo compact read total {elapsed:?}, {:?} avg per operation, {} ops per second",
|
||||
elapsed / ITERATIONS as u32,
|
||||
(ITERATIONS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
let elapsed = before.elapsed();
|
||||
println!(
|
||||
"total {} ops per second",
|
||||
(ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
|
||||
///// bench mixed read and write /////
|
||||
const CACHE_SIZE: usize = 1000;
|
||||
let items: usize = 10000;
|
||||
|
@ -287,4 +334,36 @@ fn main() {
|
|||
"total {} ops per second",
|
||||
(ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
|
||||
let tinyufo_compact = tinyufo::TinyUfo::new(CACHE_SIZE, CACHE_SIZE);
|
||||
let before = Instant::now();
|
||||
thread::scope(|s| {
|
||||
for _ in 0..THREADS {
|
||||
s.spawn(|| {
|
||||
let mut miss_count = 0;
|
||||
let mut rng = thread_rng();
|
||||
let zipf = zipf::ZipfDistribution::new(items, ZIPF_EXP).unwrap();
|
||||
let before = Instant::now();
|
||||
for _ in 0..ITERATIONS {
|
||||
let key = zipf.sample(&mut rng) as u64;
|
||||
if tinyufo_compact.get(&key).is_none() {
|
||||
tinyufo_compact.put(key, (), 1);
|
||||
miss_count +=1;
|
||||
}
|
||||
}
|
||||
let elapsed = before.elapsed();
|
||||
println!(
|
||||
"tinyufo compact mixed read/write {elapsed:?}, {:?} avg per operation, {} ops per second, {miss_count} misses",
|
||||
elapsed / ITERATIONS as u32,
|
||||
(ITERATIONS as f32 / elapsed.as_secs_f32()) as u32,
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
let elapsed = before.elapsed();
|
||||
println!(
|
||||
"total {} ops per second",
|
||||
(ITERATIONS as f32 * THREADS as f32 / elapsed.as_secs_f32()) as u32
|
||||
);
|
||||
}
|
||||
|
|
174
tinyufo/src/buckets.rs
Normal file
174
tinyufo/src/buckets.rs
Normal file
|
@ -0,0 +1,174 @@
|
|||
// Copyright 2024 Cloudflare, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Concurrent storage backend
|
||||
|
||||
use super::{Bucket, Key};
|
||||
use ahash::RandomState;
|
||||
use crossbeam_skiplist::{map::Entry, SkipMap};
|
||||
use flurry::HashMap;
|
||||
|
||||
/// N-shard skip list. Memory efficient, constant time lookup on average, but a bit slower
|
||||
/// than hash map
|
||||
pub struct Compact<T>(Box<[SkipMap<Key, Bucket<T>>]>);
|
||||
|
||||
impl<T: Send + 'static> Compact<T> {
|
||||
/// Create a new [Compact]
|
||||
pub fn new(total_items: usize, items_per_shard: usize) -> Self {
|
||||
assert!(items_per_shard > 0);
|
||||
|
||||
let shards = std::cmp::max(total_items / items_per_shard, 1);
|
||||
let mut shard_array = vec![];
|
||||
for _ in 0..shards {
|
||||
shard_array.push(SkipMap::new());
|
||||
}
|
||||
Self(shard_array.into_boxed_slice())
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &Key) -> Option<Entry<Key, Bucket<T>>> {
|
||||
let shard = *key as usize % self.0.len();
|
||||
self.0[shard].get(key)
|
||||
}
|
||||
|
||||
pub fn get_map<V, F: FnOnce(Entry<Key, Bucket<T>>) -> V>(&self, key: &Key, f: F) -> Option<V> {
|
||||
let v = self.get(key);
|
||||
v.map(f)
|
||||
}
|
||||
|
||||
fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> {
|
||||
let shard = key as usize % self.0.len();
|
||||
let removed = self.0[shard].remove(&key);
|
||||
self.0[shard].insert(key, value);
|
||||
removed.map(|_| ())
|
||||
}
|
||||
|
||||
fn remove(&self, key: &Key) {
|
||||
let shard = *key as usize % self.0.len();
|
||||
(&self.0)[shard].remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
// Concurrent hash map, fast but use more memory
|
||||
pub struct Fast<T>(HashMap<Key, Bucket<T>, RandomState>);
|
||||
|
||||
impl<T: Send + Sync> Fast<T> {
|
||||
pub fn new(total_items: usize) -> Self {
|
||||
Self(HashMap::with_capacity_and_hasher(
|
||||
total_items,
|
||||
RandomState::new(),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn get_map<V, F: FnOnce(&Bucket<T>) -> V>(&self, key: &Key, f: F) -> Option<V> {
|
||||
let pinned = self.0.pin();
|
||||
let v = pinned.get(key);
|
||||
v.map(f)
|
||||
}
|
||||
|
||||
fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> {
|
||||
let pinned = self.0.pin();
|
||||
pinned.insert(key, value).map(|_| ())
|
||||
}
|
||||
|
||||
fn remove(&self, key: &Key) {
|
||||
let pinned = self.0.pin();
|
||||
pinned.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Buckets<T> {
|
||||
Fast(Box<Fast<T>>),
|
||||
Compact(Compact<T>),
|
||||
}
|
||||
|
||||
impl<T: Send + Sync + 'static> Buckets<T> {
|
||||
pub fn new_fast(items: usize) -> Self {
|
||||
Self::Fast(Box::new(Fast::new(items)))
|
||||
}
|
||||
|
||||
pub fn new_compact(items: usize, items_per_shard: usize) -> Self {
|
||||
Self::Compact(Compact::new(items, items_per_shard))
|
||||
}
|
||||
|
||||
pub fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> {
|
||||
match self {
|
||||
Self::Compact(c) => c.insert(key, value),
|
||||
Self::Fast(f) => f.insert(key, value),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&self, key: &Key) {
|
||||
match self {
|
||||
Self::Compact(c) => c.remove(key),
|
||||
Self::Fast(f) => f.remove(key),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_map<V, F: FnOnce(&Bucket<T>) -> V>(&self, key: &Key, f: F) -> Option<V> {
|
||||
match self {
|
||||
Self::Compact(c) => c.get_map(key, |v| f(v.value())),
|
||||
Self::Fast(c) => c.get_map(key, f),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn get_queue(&self, key: &Key) -> Option<bool> {
|
||||
self.get_map(key, |v| v.queue.is_main())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_fast() {
|
||||
let fast = Buckets::new_fast(10);
|
||||
|
||||
assert!(fast.get_map(&1, |_| ()).is_none());
|
||||
|
||||
let bucket = Bucket {
|
||||
queue: crate::Location::new_small(),
|
||||
weight: 1,
|
||||
uses: Default::default(),
|
||||
data: 1,
|
||||
};
|
||||
fast.insert(1, bucket);
|
||||
|
||||
assert_eq!(fast.get_map(&1, |v| v.data), Some(1));
|
||||
|
||||
fast.remove(&1);
|
||||
assert!(fast.get_map(&1, |_| ()).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compact() {
|
||||
let compact = Buckets::new_compact(10, 2);
|
||||
|
||||
assert!(compact.get_map(&1, |_| ()).is_none());
|
||||
|
||||
let bucket = Bucket {
|
||||
queue: crate::Location::new_small(),
|
||||
weight: 1,
|
||||
uses: Default::default(),
|
||||
data: 1,
|
||||
};
|
||||
compact.insert(1, bucket);
|
||||
|
||||
assert_eq!(compact.get_map(&1, |v| v.data), Some(1));
|
||||
|
||||
compact.remove(&1);
|
||||
assert!(compact.get_map(&1, |_| ()).is_none());
|
||||
}
|
||||
}
|
|
@ -39,6 +39,11 @@ impl Estimator {
|
|||
Self::new(hashes, slots)
|
||||
}
|
||||
|
||||
fn compact(items: usize) -> Self {
|
||||
let (slots, hashes) = Self::optimal_paras(items / 100);
|
||||
Self::new(hashes, slots)
|
||||
}
|
||||
|
||||
/// Create a new `Estimator` with the given amount of hashes and columns (slots).
|
||||
pub fn new(hashes: usize, slots: usize) -> Self {
|
||||
let mut estimator = Vec::with_capacity(hashes);
|
||||
|
@ -147,6 +152,15 @@ impl TinyLfu {
|
|||
window_limit: cache_size * 8,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_compact(cache_size: usize) -> Self {
|
||||
Self {
|
||||
estimator: Estimator::compact(cache_size),
|
||||
window_counter: Default::default(),
|
||||
// 8x: just a heuristic to balance the memory usage and accuracy
|
||||
window_limit: cache_size * 8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -20,14 +20,16 @@
|
|||
|
||||
use ahash::RandomState;
|
||||
use crossbeam_queue::SegQueue;
|
||||
use flurry::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::{
|
||||
AtomicBool, AtomicU8,
|
||||
Ordering::{Acquire, Relaxed, SeqCst},
|
||||
};
|
||||
mod buckets;
|
||||
mod estimation;
|
||||
|
||||
use buckets::Buckets;
|
||||
use estimation::TinyLfu;
|
||||
use std::hash::Hash;
|
||||
|
||||
|
@ -64,20 +66,20 @@ const USES_CAP: u8 = 3;
|
|||
struct Uses(AtomicU8);
|
||||
|
||||
impl Uses {
|
||||
pub fn inc_uses(&self) {
|
||||
pub fn inc_uses(&self) -> u8 {
|
||||
loop {
|
||||
let uses = self.uses();
|
||||
if uses >= USES_CAP {
|
||||
return;
|
||||
return uses;
|
||||
}
|
||||
if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) {
|
||||
// someone else beat us to it
|
||||
if new >= USES_CAP {
|
||||
// already above cap
|
||||
return;
|
||||
return new;
|
||||
} // else, try again
|
||||
} else {
|
||||
return;
|
||||
return uses + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,17 +128,6 @@ struct Bucket<T> {
|
|||
data: T,
|
||||
}
|
||||
|
||||
impl<T: Clone> Bucket<T> {
|
||||
fn update_bucket(&self, main_queue: bool, data: T, weight: Weight) -> Self {
|
||||
Self {
|
||||
uses: Uses(self.uses.uses().into()),
|
||||
queue: Location(main_queue.into()),
|
||||
weight,
|
||||
data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const SMALL_QUEUE_PERCENTAGE: f32 = 0.1;
|
||||
|
||||
struct FiFoQueues<T> {
|
||||
|
@ -154,9 +145,7 @@ struct FiFoQueues<T> {
|
|||
_t: PhantomData<T>,
|
||||
}
|
||||
|
||||
type Buckets<T> = HashMap<Key, Bucket<T>, RandomState>;
|
||||
|
||||
impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
||||
impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
|
||||
fn admit(
|
||||
&self,
|
||||
key: Key,
|
||||
|
@ -174,9 +163,29 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
|||
|
||||
assert!(weight > 0);
|
||||
let new_bucket = {
|
||||
let pinned_buckets = buckets.pin();
|
||||
let bucket = pinned_buckets.get(&key);
|
||||
let Some(bucket) = bucket else {
|
||||
let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| {
|
||||
// the item exists, in case weight changes
|
||||
let old_weight = bucket.weight;
|
||||
let uses = bucket.uses.inc_uses();
|
||||
|
||||
fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) {
|
||||
if old == new {
|
||||
return;
|
||||
}
|
||||
if old > new {
|
||||
weight.fetch_sub((old - new) as usize, SeqCst);
|
||||
} else {
|
||||
weight.fetch_add((new - old) as usize, SeqCst);
|
||||
}
|
||||
}
|
||||
let queue = bucket.queue.is_main();
|
||||
if queue == MAIN {
|
||||
update_atomic(&self.main_weight, old_weight, weight);
|
||||
} else {
|
||||
update_atomic(&self.small_weight, old_weight, weight);
|
||||
}
|
||||
(uses, queue, weight)
|
||||
}) else {
|
||||
let mut evicted = self.evict_to_limit(weight, buckets);
|
||||
// TODO: figure out the right way to compare frequencies of different weights across
|
||||
// many evicted assets. For now TinyLFU is only used when only evicting 1 item.
|
||||
|
@ -204,7 +213,7 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
|||
uses: Default::default(), // 0
|
||||
data,
|
||||
};
|
||||
let old = pinned_buckets.insert(key, bucket);
|
||||
let old = buckets.insert(key, bucket);
|
||||
if old.is_none() {
|
||||
// Always push key first before updating weight
|
||||
// If doing the other order, another concurrent thread might not
|
||||
|
@ -215,32 +224,16 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
|||
// TODO: compare old.weight and update accordingly
|
||||
return evicted;
|
||||
};
|
||||
|
||||
// the item exists, in case weight changes
|
||||
let old_weight = bucket.weight;
|
||||
bucket.uses.inc_uses();
|
||||
|
||||
fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) {
|
||||
if old == new {
|
||||
return;
|
||||
}
|
||||
if old > new {
|
||||
weight.fetch_sub((old - new) as usize, SeqCst);
|
||||
} else {
|
||||
weight.fetch_add((new - old) as usize, SeqCst);
|
||||
}
|
||||
}
|
||||
if bucket.queue.is_main() {
|
||||
update_atomic(&self.main_weight, old_weight, weight);
|
||||
bucket.update_bucket(MAIN, data, weight)
|
||||
} else {
|
||||
update_atomic(&self.small_weight, old_weight, weight);
|
||||
bucket.update_bucket(SMALL, data, weight)
|
||||
Bucket {
|
||||
queue: Location(queue.into()),
|
||||
weight,
|
||||
uses: Uses(uses.into()),
|
||||
data,
|
||||
}
|
||||
};
|
||||
|
||||
// replace the existing one
|
||||
buckets.pin().insert(key, new_bucket);
|
||||
buckets.insert(key, new_bucket);
|
||||
|
||||
// NOTE: there is a chance that the item itself is evicted if it happens to be the one selected
|
||||
// by the algorithm. We could avoid this by checking if the item is in the returned evicted items,
|
||||
|
@ -295,14 +288,9 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
|||
// empty queue, this is caught between another pop() and fetch_sub()
|
||||
return None;
|
||||
};
|
||||
let pinned_buckets = buckets.pin();
|
||||
let maybe_bucket = pinned_buckets.get(&to_evict);
|
||||
|
||||
let Some(bucket) = maybe_bucket.as_ref() else {
|
||||
//key in queue but not bucket, shouldn't happen, but ignore
|
||||
continue;
|
||||
};
|
||||
|
||||
let v = buckets
|
||||
.get_map(&to_evict, |bucket| {
|
||||
let weight = bucket.weight;
|
||||
self.small_weight.fetch_sub(weight as usize, SeqCst);
|
||||
|
||||
|
@ -312,44 +300,55 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
|||
self.main.push(to_evict);
|
||||
self.main_weight.fetch_add(weight as usize, SeqCst);
|
||||
// continue until find one to evict
|
||||
continue;
|
||||
}
|
||||
// move to ghost
|
||||
|
||||
None
|
||||
} else {
|
||||
let data = bucket.data.clone();
|
||||
let weight = bucket.weight;
|
||||
pinned_buckets.remove(&to_evict);
|
||||
return Some(KV {
|
||||
buckets.remove(&to_evict);
|
||||
Some(KV {
|
||||
key: to_evict,
|
||||
data,
|
||||
weight,
|
||||
});
|
||||
})
|
||||
}
|
||||
})
|
||||
.flatten();
|
||||
if v.is_some() {
|
||||
// found the one to evict, break
|
||||
return v;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_one_from_main(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
|
||||
loop {
|
||||
let to_evict = self.main.pop()?;
|
||||
let buckets = buckets.pin();
|
||||
let maybe_bucket = buckets.get(&to_evict);
|
||||
if let Some(bucket) = maybe_bucket.as_ref() {
|
||||
|
||||
if let Some(v) = buckets
|
||||
.get_map(&to_evict, |bucket| {
|
||||
if bucket.uses.decr_uses() > 0 {
|
||||
// put it back
|
||||
self.main.push(to_evict);
|
||||
// continue the loop
|
||||
None
|
||||
} else {
|
||||
// evict
|
||||
let weight = bucket.weight;
|
||||
self.main_weight.fetch_sub(weight as usize, SeqCst);
|
||||
let data = bucket.data.clone();
|
||||
buckets.remove(&to_evict);
|
||||
return Some(KV {
|
||||
Some(KV {
|
||||
key: to_evict,
|
||||
data,
|
||||
weight,
|
||||
});
|
||||
})
|
||||
}
|
||||
})
|
||||
.flatten()
|
||||
{
|
||||
// found the one to evict, break
|
||||
return Some(v);
|
||||
}
|
||||
} // else: key in queue but not bucket, shouldn't happen
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -357,12 +356,11 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
|
|||
/// [TinyUfo] cache
|
||||
pub struct TinyUfo<K, T> {
|
||||
queues: FiFoQueues<T>,
|
||||
buckets: HashMap<Key, Bucket<T>, RandomState>,
|
||||
buckets: Buckets<T>,
|
||||
random_status: RandomState,
|
||||
_k: PhantomData<K>,
|
||||
}
|
||||
|
||||
impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
|
||||
impl<K: Hash, T: Clone + Send + Sync + 'static> TinyUfo<K, T> {
|
||||
/// Create a new TinyUfo cache with the given weight limit and the given
|
||||
/// size limit of the ghost queue.
|
||||
pub fn new(total_weight_limit: usize, estimated_size: usize) -> Self {
|
||||
|
@ -377,7 +375,29 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
|
|||
};
|
||||
TinyUfo {
|
||||
queues,
|
||||
buckets: HashMap::with_capacity_and_hasher(estimated_size, RandomState::new()),
|
||||
buckets: Buckets::new_fast(estimated_size),
|
||||
random_status: RandomState::new(),
|
||||
_k: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new TinyUfo cache but with more memory efficient data structures.
|
||||
/// The trade-off is that the the get() is slower by a constant factor.
|
||||
/// The cache hit ratio could be higher as this type of TinyUFO allows to store
|
||||
/// more assets with the same memory.
|
||||
pub fn new_compact(total_weight_limit: usize, estimated_size: usize) -> Self {
|
||||
let queues = FiFoQueues {
|
||||
small: SegQueue::new(),
|
||||
small_weight: 0.into(),
|
||||
main: SegQueue::new(),
|
||||
main_weight: 0.into(),
|
||||
total_weight_limit,
|
||||
estimator: TinyLfu::new_compact(estimated_size),
|
||||
_t: PhantomData,
|
||||
};
|
||||
TinyUfo {
|
||||
queues,
|
||||
buckets: Buckets::new_compact(estimated_size, 32),
|
||||
random_status: RandomState::new(),
|
||||
_k: PhantomData,
|
||||
}
|
||||
|
@ -390,8 +410,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
|
|||
/// Return Some(T) if the key exists
|
||||
pub fn get(&self, key: &K) -> Option<T> {
|
||||
let key = self.random_status.hash_one(key);
|
||||
let buckets = self.buckets.pin();
|
||||
buckets.get(&key).map(|p| {
|
||||
self.buckets.get_map(&key, |p| {
|
||||
p.uses.inc_uses();
|
||||
p.data.clone()
|
||||
})
|
||||
|
@ -427,7 +446,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
|
|||
#[cfg(test)]
|
||||
fn peek_queue(&self, key: K) -> Option<bool> {
|
||||
let key = self.random_status.hash_one(&key);
|
||||
self.buckets.pin().get(&key).map(|p| p.queue.value())
|
||||
self.buckets.get_queue(&key)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -627,4 +646,54 @@ mod tests {
|
|||
assert_eq!(cache.peek_queue(3), Some(MAIN));
|
||||
assert_eq!(cache.peek_queue(4), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_evict_from_small_compact() {
|
||||
let cache = TinyUfo::new(5, 5);
|
||||
|
||||
cache.put(1, 1, 1);
|
||||
cache.put(2, 2, 2);
|
||||
cache.put(3, 3, 2);
|
||||
// cache full now
|
||||
|
||||
assert_eq!(cache.peek_queue(1), Some(SMALL));
|
||||
assert_eq!(cache.peek_queue(2), Some(SMALL));
|
||||
assert_eq!(cache.peek_queue(3), Some(SMALL));
|
||||
|
||||
let evicted = cache.put(4, 4, 3);
|
||||
assert_eq!(evicted.len(), 2);
|
||||
assert_eq!(evicted[0].data, 1);
|
||||
assert_eq!(evicted[1].data, 2);
|
||||
|
||||
assert_eq!(cache.peek_queue(1), None);
|
||||
assert_eq!(cache.peek_queue(2), None);
|
||||
assert_eq!(cache.peek_queue(3), Some(SMALL));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_evict_from_small_to_main_compact() {
|
||||
let cache = TinyUfo::new(5, 5);
|
||||
|
||||
cache.put(1, 1, 1);
|
||||
cache.put(2, 2, 2);
|
||||
cache.put(3, 3, 2);
|
||||
// cache full now
|
||||
|
||||
cache.get(&1);
|
||||
cache.get(&1); // 1 will be moved to main during next eviction
|
||||
|
||||
assert_eq!(cache.peek_queue(1), Some(SMALL));
|
||||
assert_eq!(cache.peek_queue(2), Some(SMALL));
|
||||
assert_eq!(cache.peek_queue(3), Some(SMALL));
|
||||
|
||||
let evicted = cache.put(4, 4, 1);
|
||||
assert_eq!(evicted.len(), 1);
|
||||
assert_eq!(evicted[0].data, 2);
|
||||
|
||||
assert_eq!(cache.peek_queue(1), Some(MAIN));
|
||||
// 2 is evicted because 1 is in main
|
||||
assert_eq!(cache.peek_queue(2), None);
|
||||
assert_eq!(cache.peek_queue(3), Some(SMALL));
|
||||
assert_eq!(cache.peek_queue(4), Some(SMALL));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue