| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| mod deleter; |
| mod error; |
| mod full; |
| mod writer; |
| |
| use std::{ |
| future::Future, |
| ops::{Bound, Deref, Range, RangeBounds}, |
| sync::Arc, |
| }; |
| |
| use foyer::{Code, CodeError, HybridCache}; |
| |
| use opendal_core::raw::*; |
| use opendal_core::*; |
| |
| pub use deleter::Deleter; |
| pub use writer::Writer; |
| |
| /// [`FoyerKey`] is a key for the foyer cache. It's encoded via bincode, which is |
| /// backed by foyer's "serde" feature. |
| /// |
| /// It's possible to specify a version in the [`OpRead`] args: |
| /// |
| /// - If a version is given, the object is cached under that versioned key. |
| /// - If version is not supplied, the object is cached exactly as returned by the backend, |
| /// We do NOT interpret `None` as "latest" and we do not promote it to any other version. |
| #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] |
| pub struct FoyerKey { |
| pub path: String, |
| pub version: Option<String>, |
| } |
| |
| /// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. |
| #[derive(Debug)] |
| pub struct FoyerValue(pub Buffer); |
| |
| impl Deref for FoyerValue { |
| type Target = Buffer; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.0 |
| } |
| } |
| |
| impl Code for FoyerValue { |
| fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { |
| let len = self.0.len() as u64; |
| writer.write_all(&len.to_le_bytes())?; |
| std::io::copy(&mut self.0.clone(), writer)?; |
| Ok(()) |
| } |
| |
| fn decode(reader: &mut impl std::io::Read) -> std::result::Result<Self, CodeError> |
| where |
| Self: Sized, |
| { |
| let mut len_bytes = [0u8; 8]; |
| reader.read_exact(&mut len_bytes)?; |
| let len = u64::from_le_bytes(len_bytes) as usize; |
| let mut buffer = vec![0u8; len]; |
| reader.read_exact(&mut buffer[..len])?; |
| Ok(FoyerValue(buffer.into())) |
| } |
| |
| fn estimated_size(&self) -> usize { |
| 8 + self.0.len() |
| } |
| } |
| |
| /// Hybrid cache layer for OpenDAL that uses [foyer](https://github.com/foyer-rs/foyer) for caching. |
| /// |
| /// # Operation Behavior |
| /// - `write`: [`FoyerLayer`] will write to the foyer hybrid cache after the service's write operation is completed. |
| /// - `read`: [`FoyerLayer`] will first check the foyer hybrid cache for the data. If the data is not found, it will perform the read operation on the service and cache the result. |
| /// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid cache regardless of whether the service's delete operation is successful. |
| /// - Other operations: [`FoyerLayer`] will not cache the results of other operations, such as `list`, `copy`, `rename`, etc. They will be passed through to the underlying accessor without caching. |
| /// |
| /// # Examples |
| /// |
| /// ```no_run |
| /// use opendal_core::{Operator, services::Memory}; |
| /// use opendal_layer_foyer::FoyerLayer; |
| /// use foyer::{HybridCacheBuilder, Engine}; |
| /// |
| /// # async fn example() -> Result<(), Box<dyn std::error::Error>> { |
| /// let cache = HybridCacheBuilder::new() |
| /// .memory(64 * 1024 * 1024) // 64MB memory cache |
| /// .with_shards(4) |
| /// .storage(Engine::Large(Default::default())) |
| /// .build() |
| /// .await?; |
| /// |
| /// let op = Operator::new(Memory::default())? |
| /// .layer(FoyerLayer::new(cache)) |
| /// .finish(); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| /// |
| /// # Note |
| /// |
| /// If the object version is enabled, the foyer cache layer will treat the objects with same path but different versions as different objects. |
| #[derive(Debug)] |
| pub struct FoyerLayer { |
| cache: HybridCache<FoyerKey, FoyerValue>, |
| size_limit: Range<usize>, |
| } |
| |
| impl FoyerLayer { |
| /// Creates a new `FoyerLayer` with the given foyer hybrid cache. |
| pub fn new(cache: HybridCache<FoyerKey, FoyerValue>) -> Self { |
| FoyerLayer { |
| cache, |
| size_limit: 0..usize::MAX, |
| } |
| } |
| |
| /// Sets the size limit for caching. |
| /// |
| /// It is recommended to set a size limit to avoid caching large files that may not be suitable for caching. |
| pub fn with_size_limit<R: RangeBounds<usize>>(mut self, size_limit: R) -> Self { |
| let start = match size_limit.start_bound() { |
| Bound::Included(v) => *v, |
| Bound::Excluded(v) => *v + 1, |
| Bound::Unbounded => 0, |
| }; |
| let end = match size_limit.end_bound() { |
| Bound::Included(v) => *v + 1, |
| Bound::Excluded(v) => *v, |
| Bound::Unbounded => usize::MAX, |
| }; |
| self.size_limit = start..end; |
| self |
| } |
| } |
| |
| impl<A: Access> Layer<A> for FoyerLayer { |
| type LayeredAccess = FoyerAccessor<A>; |
| |
| fn layer(&self, accessor: A) -> Self::LayeredAccess { |
| let cache = self.cache.clone(); |
| FoyerAccessor { |
| inner: Arc::new(Inner { |
| accessor, |
| cache, |
| size_limit: self.size_limit.clone(), |
| }), |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| pub(crate) struct Inner<A: Access> { |
| pub(crate) accessor: A, |
| pub(crate) cache: HybridCache<FoyerKey, FoyerValue>, |
| pub(crate) size_limit: Range<usize>, |
| } |
| |
| #[derive(Debug)] |
| pub struct FoyerAccessor<A: Access> { |
| inner: Arc<Inner<A>>, |
| } |
| |
| impl<A: Access> LayeredAccess for FoyerAccessor<A> { |
| type Inner = A; |
| type Reader = Buffer; |
| type Writer = Writer<A>; |
| type Lister = A::Lister; |
| type Deleter = Deleter<A>; |
| |
| fn inner(&self) -> &Self::Inner { |
| &self.inner.accessor |
| } |
| |
| fn info(&self) -> Arc<AccessorInfo> { |
| self.inner.accessor.info() |
| } |
| |
| async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { |
| full::FullReader::new(self.inner.clone(), self.inner.size_limit.clone()) |
| .read(path, args) |
| .await |
| } |
| |
| fn write( |
| &self, |
| path: &str, |
| args: OpWrite, |
| ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend { |
| let inner = self.inner.clone(); |
| let size_limit = self.inner.size_limit.clone(); |
| let path = path.to_string(); |
| async move { |
| let (rp, w) = inner.accessor.write(&path, args).await?; |
| Ok((rp, Writer::new(w, path, inner, size_limit))) |
| } |
| } |
| |
| fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend { |
| let inner = self.inner.clone(); |
| async move { |
| let (rp, d) = inner.accessor.delete().await?; |
| Ok((rp, Deleter::new(d, inner))) |
| } |
| } |
| |
| async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { |
| self.inner.accessor.list(path, args).await |
| } |
| |
| // TODO(MrCroxx): Implement copy, rename with foyer cache. |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use foyer::{ |
| DirectFsDeviceOptions, Engine, Error as FoyerError, HybridCacheBuilder, LargeEngineOptions, |
| RecoverMode, |
| }; |
| use opendal_core::{Operator, services::Memory}; |
| use size::consts::MiB; |
| use std::io::Cursor; |
| |
| use super::*; |
| use crate::error::extract_err; |
| |
| fn key(i: u8) -> String { |
| format!("obj-{i}") |
| } |
| |
| fn value(i: u8) -> Vec<u8> { |
| // ~ 64KiB with metadata |
| vec![i; 63 * 1024] |
| } |
| |
| #[tokio::test] |
| async fn test() { |
| let dir = tempfile::tempdir().unwrap(); |
| |
| let cache = HybridCacheBuilder::new() |
| .memory(10) |
| .with_shards(1) |
| .storage(Engine::Large(LargeEngineOptions::new())) |
| .with_device_options( |
| DirectFsDeviceOptions::new(dir.path()) |
| .with_capacity(16 * MiB as usize) |
| .with_file_size(MiB as usize), |
| ) |
| .with_recover_mode(RecoverMode::None) |
| .build() |
| .await |
| .unwrap(); |
| |
| let op = Operator::new(Memory::default()) |
| .unwrap() |
| .layer(FoyerLayer::new(cache.clone())) |
| .finish(); |
| |
| assert!(op.list("/").await.unwrap().is_empty()); |
| |
| for i in 0..64 { |
| op.write(&key(i), value(i)).await.unwrap(); |
| } |
| |
| assert_eq!(op.list("/").await.unwrap().len(), 64); |
| |
| for i in 0..64 { |
| let buf = op.read(&key(i)).await.unwrap(); |
| assert_eq!(buf.to_vec(), value(i)); |
| } |
| |
| cache.clear().await.unwrap(); |
| |
| for i in 0..64 { |
| let buf = op.read(&key(i)).await.unwrap(); |
| assert_eq!(buf.to_vec(), value(i)); |
| } |
| |
| for i in 0..64 { |
| op.delete(&key(i)).await.unwrap(); |
| } |
| |
| assert!(op.list("/").await.unwrap().is_empty()); |
| |
| for i in 0..64 { |
| let res = op.read(&key(i)).await; |
| assert!(res.is_err(), "should fail to read deleted file"); |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_size_limit() { |
| let dir = tempfile::tempdir().unwrap(); |
| |
| let cache = HybridCacheBuilder::new() |
| .memory(1024 * 1024) |
| .with_shards(1) |
| .storage(Engine::Large(LargeEngineOptions::new())) |
| .with_device_options( |
| DirectFsDeviceOptions::new(dir.path()) |
| .with_capacity(16 * MiB as usize) |
| .with_file_size(MiB as usize), |
| ) |
| .with_recover_mode(RecoverMode::None) |
| .build() |
| .await |
| .unwrap(); |
| |
| // Set size limit: only cache files between 1KB and 10KB |
| let op = Operator::new(Memory::default()) |
| .unwrap() |
| .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 * 1024)) |
| .finish(); |
| |
| let small_data = vec![1u8; 5 * 1024]; // 5KB - should be cached |
| let large_data = vec![2u8; 20 * 1024]; // 20KB - should NOT be cached |
| let tiny_data = vec![3u8; 512]; // 512B - below size limit, should NOT be cached |
| |
| // Write all files |
| op.write("small.txt", small_data.clone()).await.unwrap(); |
| op.write("large.txt", large_data.clone()).await.unwrap(); |
| op.write("tiny.txt", tiny_data.clone()).await.unwrap(); |
| |
| // All should be readable |
| let read_small = op.read("small.txt").await.unwrap(); |
| assert_eq!(read_small.to_vec(), small_data); |
| |
| let read_large = op.read("large.txt").await.unwrap(); |
| assert_eq!(read_large.to_vec(), large_data); |
| |
| let read_tiny = op.read("tiny.txt").await.unwrap(); |
| assert_eq!(read_tiny.to_vec(), tiny_data); |
| |
| // Clear the cache to test read-through behavior |
| cache.clear().await.unwrap(); |
| |
| // All files should still be readable from underlying storage |
| let read_small = op.read("small.txt").await.unwrap(); |
| assert_eq!(read_small.to_vec(), small_data); |
| |
| let read_large = op.read("large.txt").await.unwrap(); |
| assert_eq!(read_large.to_vec(), large_data); |
| |
| let read_tiny = op.read("tiny.txt").await.unwrap(); |
| assert_eq!(read_tiny.to_vec(), tiny_data); |
| |
| // After reading, small file should be cached, but large and tiny should not |
| // We can verify this by reading with range - cached files should support range reads |
| let read_small_range = op.read_with("small.txt").range(0..1024).await.unwrap(); |
| assert_eq!(read_small_range.len(), 1024); |
| assert_eq!(read_small_range.to_vec(), small_data[0..1024]); |
| } |
| |
| #[test] |
| fn test_error() { |
| let e = Error::new(ErrorKind::NotFound, "not found"); |
| let fe = FoyerError::other(e); |
| let oe = extract_err(fe); |
| assert_eq!(oe.kind(), ErrorKind::NotFound); |
| } |
| |
| #[test] |
| fn test_foyer_key_version_none_vs_empty() { |
| let key_none = FoyerKey { |
| path: "test/path".to_string(), |
| version: None, |
| }; |
| |
| let key_empty = FoyerKey { |
| path: "test/path".to_string(), |
| version: Some("".to_string()), |
| }; |
| |
| let mut buf_none = Vec::new(); |
| key_none.encode(&mut buf_none).unwrap(); |
| |
| let mut buf_empty = Vec::new(); |
| key_empty.encode(&mut buf_empty).unwrap(); |
| |
| assert_ne!( |
| buf_none, buf_empty, |
| "Serialization of version=None and version=\"\" should be different" |
| ); |
| |
| let decoded_none = FoyerKey::decode(&mut Cursor::new(&buf_none)).unwrap(); |
| assert_eq!(decoded_none, key_none); |
| let decoded_empty = FoyerKey::decode(&mut Cursor::new(&buf_empty)).unwrap(); |
| assert_eq!(decoded_empty, key_empty); |
| } |
| |
| #[test] |
| fn test_foyer_key_serde() { |
| use std::io::Cursor; |
| |
| let test_cases = vec![ |
| FoyerKey { |
| path: "simple".to_string(), |
| version: None, |
| }, |
| FoyerKey { |
| path: "with/slash/path".to_string(), |
| version: None, |
| }, |
| FoyerKey { |
| path: "versioned".to_string(), |
| version: Some("v1.0.0".to_string()), |
| }, |
| FoyerKey { |
| path: "empty-version".to_string(), |
| version: Some("".to_string()), |
| }, |
| FoyerKey { |
| path: "".to_string(), |
| version: None, |
| }, |
| FoyerKey { |
| path: "unicode/θ·―εΎ/π".to_string(), |
| version: Some("ηζ¬-1".to_string()), |
| }, |
| FoyerKey { |
| path: "long/".to_string().repeat(100), |
| version: Some("long-version-".to_string().repeat(50)), |
| }, |
| ]; |
| |
| for original in test_cases { |
| let mut buffer = Vec::new(); |
| original |
| .encode(&mut buffer) |
| .expect("encoding should succeed"); |
| |
| let decoded = |
| FoyerKey::decode(&mut Cursor::new(&buffer)).expect("decoding should succeed"); |
| |
| assert_eq!( |
| decoded, original, |
| "decode(encode(key)) should equal original key" |
| ); |
| } |
| } |
| } |