blob: 95772d01214d8bc15712f10d3afb32c8b2d74cd9 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use foyer::Error as FoyerError;
use opendal_core::Buffer;
use opendal_core::Result;
use opendal_core::raw::Access;
use opendal_core::raw::BytesContentRange;
use opendal_core::raw::BytesRange;
use opendal_core::raw::OpRead;
use opendal_core::raw::OpStat;
use opendal_core::raw::RpRead;
use opendal_core::raw::oio::Read;
use crate::FoyerKey;
use crate::FoyerValue;
use crate::Inner;
use crate::error::{FetchSizeTooLarge, extract_err};
pub struct FullReader<A: Access> {
inner: Arc<Inner<A>>,
size_limit: std::ops::Range<usize>,
}
impl<A: Access> FullReader<A> {
pub fn new(inner: Arc<Inner<A>>, size_limit: std::ops::Range<usize>) -> Self {
Self { inner, size_limit }
}
/// Read data from cache or underlying storage.
/// Caches the ENTIRE object, then slices to requested range.
pub async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Buffer)> {
let path_str = path.to_string();
let version = args.version().map(|v| v.to_string());
let original_args = args.clone();
// Extract range bounds before async block to avoid lifetime issues
let (range_start, range_end) = {
let r = args.range();
let start = r.offset();
let end = r.size().map(|size| start + size);
(start, end)
};
// Use fetch to read data from cache or fallback to remote. fetch() can automatically
// handle the thundering herd problem by ensuring only one request is made for a given
// key.
//
// Please note that we only cache the object if it's smaller than size_limit. And we'll
// fetch the ENTIRE object from remote to put it into cache, then slice it to the requested
// range.
let result = self
.inner
.cache
.fetch(
FoyerKey {
path: path_str.clone(),
version: version.clone(),
},
|| {
let inner = self.inner.clone();
let size_limit = self.size_limit.clone();
let path_clone = path_str.clone();
async move {
// read the metadata first, if it's too large, do not cache
let metadata = inner
.accessor
.stat(&path_clone, OpStat::default())
.await
.map_err(FoyerError::other)?
.into_metadata();
let size = metadata.content_length() as usize;
if !size_limit.contains(&size) {
return Err(FoyerError::other(FetchSizeTooLarge));
}
// fetch the ENTIRE object from remote.
let (_, mut reader) = inner
.accessor
.read(
&path_clone,
OpRead::default().with_range(BytesRange::new(0, None)),
)
.await
.map_err(FoyerError::other)?;
let buffer = reader.read_all().await.map_err(FoyerError::other)?;
Ok(FoyerValue(buffer))
}
},
)
.await;
// If got entry from cache, slice it to the requested range. If it's larger than size_limit,
// we'll simply forward the request to the underlying accessor with user's given range.
match result {
Ok(entry) => {
let end = range_end.unwrap_or(entry.len() as u64);
let range = BytesContentRange::default()
.with_range(range_start, end - 1)
.with_size(entry.len() as _);
let buffer = entry.slice(range_start as usize..end as usize);
let rp = RpRead::new()
.with_size(Some(buffer.len() as _))
.with_range(Some(range));
Ok((rp, buffer))
}
Err(e) => match e.downcast::<FetchSizeTooLarge>() {
Ok(_) => {
let (rp, mut reader) = self.inner.accessor.read(path, original_args).await?;
let buffer = reader.read_all().await?;
Ok((rp, buffer))
}
Err(e) => Err(extract_err(e)),
},
}
}
}