blob: da48ddde5063993cda82511810aa1d8bfa3416ae [file] [log] [blame]
use super::memory_tracker::CacheMemoryTracker;
use crate::streaming::sizeable::Sizeable;
use atone::Vc;
use std::fmt::Debug;
use std::ops::Index;
use std::sync::Arc;
#[derive(Debug)]
pub struct SmartCache<T: Sizeable + Debug> {
current_size: u64,
buffer: Vc<T>,
memory_tracker: Arc<CacheMemoryTracker>,
}
impl<T> SmartCache<T>
where
T: Sizeable + Clone + Debug,
{
pub fn new() -> Self {
let current_size = 0;
let buffer = Vc::new();
let memory_tracker = CacheMemoryTracker::get_instance().unwrap();
Self {
current_size,
buffer,
memory_tracker,
}
}
// Used only for cache validation tests
#[cfg(test)]
pub fn to_vec(&self) -> Vec<T> {
let mut vec = Vec::with_capacity(self.buffer.len());
vec.extend(self.buffer.iter().cloned());
vec
}
/// Pushes an element to the buffer, and if adding the element would exceed the memory limit,
/// removes the oldest elements until there's enough space for the new element.
/// It's preferred to use `extend` instead of this method.
pub fn push_safe(&mut self, element: T) {
let element_size = element.get_size_bytes() as u64;
while !self.memory_tracker.will_fit_into_cache(element_size) {
if let Some(oldest_element) = self.buffer.pop_front() {
let oldest_size = oldest_element.get_size_bytes() as u64;
self.memory_tracker.decrement_used_memory(oldest_size);
self.current_size -= oldest_size;
}
}
self.memory_tracker.increment_used_memory(element_size);
self.current_size += element_size;
self.buffer.push_back(element);
}
/// Removes the oldest elements until there's enough space for the new element.
pub fn evict_by_size(&mut self, size_to_remove: u64) {
let mut removed_size = 0;
while let Some(element) = self.buffer.pop_front() {
if removed_size >= size_to_remove {
break;
}
let elem_size = element.get_size_bytes() as u64;
self.memory_tracker.decrement_used_memory(elem_size);
self.current_size -= elem_size;
removed_size += elem_size;
}
}
pub fn purge(&mut self) {
self.buffer.clear();
self.memory_tracker.decrement_used_memory(self.current_size);
self.current_size = 0;
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn current_size(&self) -> u64 {
self.current_size
}
/// Extends the buffer with the given elements, and always adding the elements,
/// even if it exceeds the memory limit.
pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) {
let elements = elements.into_iter().inspect(|element| {
let element_size = element.get_size_bytes() as u64;
self.memory_tracker.increment_used_memory(element_size);
self.current_size += element_size;
});
self.buffer.extend(elements);
}
/// Always appends the element into the buffer, even if it exceeds the memory limit.
pub fn append(&mut self, element: T) {
let element_size = element.get_size_bytes() as u64;
self.memory_tracker.increment_used_memory(element_size);
self.current_size += element_size;
self.buffer.push(element);
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.buffer.iter()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
}
impl<T> Index<usize> for SmartCache<T>
where
T: Sizeable + Clone + Debug,
{
type Output = T;
fn index(&self, index: usize) -> &Self::Output {
&self.buffer[index]
}
}
impl<T: Sizeable + Clone + Debug> Default for SmartCache<T> {
fn default() -> Self {
Self::new()
}
}