blob: 6b633b40f6bef71ce89a42be4bf6c998b057b353 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Immutable index layer implementation for Apache OpenDAL.
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs)]
use std::collections::HashSet;
use std::vec::IntoIter;
use opendal_core::raw::*;
use opendal_core::*;
/// Add an immutable in-memory index for underlying storage services.
///
/// Especially useful for services without list capability like HTTP.
///
/// # Examples
///
/// ```no_run
/// # use std::collections::HashMap;
/// #
/// # use opendal_core::services;
/// # use opendal_core::Operator;
/// # use opendal_core::Result;
/// # use opendal_layer_immutable_index::ImmutableIndexLayer;
/// #
/// # fn main() -> Result<()> {
/// let mut iil = ImmutableIndexLayer::new();
///
/// for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
/// iil.insert(i.to_string())
/// }
///
/// let op = Operator::from_iter::<services::Memory>(HashMap::<_, _>::default())?
/// .layer(iil)
/// .finish();
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Default)]
pub struct ImmutableIndexLayer {
vec: Vec<String>,
}
impl ImmutableIndexLayer {
/// Create a new [`ImmutableIndexLayer`].
pub fn new() -> Self {
Self::default()
}
}
impl ImmutableIndexLayer {
/// Insert a key into index.
pub fn insert(&mut self, key: String) {
self.vec.push(key);
}
/// Insert keys from iter.
pub fn extend_iter<I>(&mut self, iter: I)
where
I: IntoIterator<Item = String>,
{
self.vec.extend(iter);
}
}
impl<A: Access> Layer<A> for ImmutableIndexLayer {
type LayeredAccess = ImmutableIndexAccessor<A>;
fn layer(&self, inner: A) -> Self::LayeredAccess {
let info = inner.info();
info.update_full_capability(|mut cap| {
cap.list = true;
cap.list_with_recursive = true;
cap
});
ImmutableIndexAccessor {
vec: self.vec.clone(),
inner,
}
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct ImmutableIndexAccessor<A: Access> {
inner: A,
vec: Vec<String>,
}
impl<A: Access> ImmutableIndexAccessor<A> {
fn children_flat(&self, path: &str) -> Vec<String> {
self.vec
.iter()
.filter(|v| v.starts_with(path) && v.as_str() != path)
.cloned()
.collect()
}
fn children_hierarchy(&self, path: &str) -> Vec<String> {
let mut res = HashSet::new();
for i in self.vec.iter() {
// `/xyz` should not belong to `/abc`
if !i.starts_with(path) {
continue;
}
// remove `/abc` if self
if i == path {
continue;
}
match i[path.len()..].find('/') {
// File `/abc/def.csv` must belong to `/abc`
None => {
res.insert(i.to_string());
}
Some(idx) => {
// The index of first `/` after `/abc`.
let dir_idx = idx + 1 + path.len();
if dir_idx == i.len() {
// Dir `/abc/def/` belongs to `/abc/`
res.insert(i.to_string());
} else {
// File/Dir `/abc/def/xyz` doesn't belong to `/abc`.
// But we need to list `/abc/def` out so that we can walk down.
res.insert(i[..dir_idx].to_string());
}
}
}
}
res.into_iter().collect()
}
}
impl<A: Access> LayeredAccess for ImmutableIndexAccessor<A> {
type Inner = A;
type Reader = A::Reader;
type Writer = A::Writer;
type Lister = ImmutableDir;
type Deleter = A::Deleter;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.inner.read(path, args).await
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner.write(path, args).await
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let mut path = path;
if path == "/" {
path = ""
}
let idx = if args.recursive() {
self.children_flat(path)
} else {
self.children_hierarchy(path)
};
Ok((RpList::default(), ImmutableDir::new(idx)))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}
}
#[doc(hidden)]
pub struct ImmutableDir {
idx: IntoIter<String>,
}
impl ImmutableDir {
fn new(idx: Vec<String>) -> Self {
Self {
idx: idx.into_iter(),
}
}
fn inner_next(&mut self) -> Option<oio::Entry> {
self.idx.next().map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
EntryMode::FILE
};
let meta = Metadata::new(mode);
oio::Entry::with(v, meta)
})
}
}
impl oio::List for ImmutableDir {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::*;
use futures::TryStreamExt;
use log::debug;
use logforth::append::Testing;
use logforth::filter::env_filter::EnvFilterBuilder;
use logforth::layout::TextLayout;
#[derive(Debug)]
struct MockService;
impl Access for MockService {
type Reader = oio::Reader;
type Writer = oio::Writer;
type Lister = oio::Lister;
type Deleter = oio::Deleter;
fn info(&self) -> Arc<AccessorInfo> {
let info = AccessorInfo::default();
info.set_scheme("mock");
info.into()
}
}
fn build_operator(layer: ImmutableIndexLayer) -> Operator {
Operator::from_inner(Arc::new(MockService)).layer(layer)
}
fn setup() {
let _ = logforth::starter_log::builder()
.dispatch(|d| {
d.filter(EnvFilterBuilder::from_default_env().build())
.append(Testing::default().with_layout(TextLayout::default()))
})
.try_apply();
}
#[tokio::test]
async fn test_list() -> Result<()> {
setup();
let mut iil = ImmutableIndexLayer::default();
for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
iil.insert(i.to_string())
}
let op = build_operator(iil);
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.lister("").await?;
while let Some(entry) = ds.try_next().await? {
debug!("got entry: {}", entry.path());
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}
assert_eq!(map["file"], EntryMode::FILE);
assert_eq!(map["dir/"], EntryMode::DIR);
assert_eq!(map["dir_without_prefix/"], EntryMode::DIR);
Ok(())
}
#[tokio::test]
async fn test_scan() -> Result<()> {
setup();
let mut iil = ImmutableIndexLayer::default();
for i in ["file", "dir/", "dir/file", "dir_without_prefix/file"] {
iil.insert(i.to_string())
}
let op = build_operator(iil);
let mut ds = op.lister_with("/").recursive(true).await?;
let mut set = HashSet::new();
let mut map = HashMap::new();
while let Some(entry) = ds.try_next().await? {
debug!("got entry: {}", entry.path());
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}
debug!("current files: {map:?}");
assert_eq!(map["file"], EntryMode::FILE);
assert_eq!(map["dir/"], EntryMode::DIR);
assert_eq!(map["dir_without_prefix/file"], EntryMode::FILE);
Ok(())
}
#[tokio::test]
async fn test_list_dir() -> Result<()> {
setup();
let mut iil = ImmutableIndexLayer::default();
for i in [
"dataset/stateful/ontime_2007_200.csv",
"dataset/stateful/ontime_2008_200.csv",
"dataset/stateful/ontime_2009_200.csv",
] {
iil.insert(i.to_string())
}
let op = build_operator(iil);
// List /
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.lister("/").await?;
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}
assert_eq!(map.len(), 1);
assert_eq!(map["dataset/"], EntryMode::DIR);
// List dataset/stateful/
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.lister("dataset/stateful/").await?;
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}
assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
assert_eq!(map["dataset/stateful/ontime_2008_200.csv"], EntryMode::FILE);
assert_eq!(map["dataset/stateful/ontime_2009_200.csv"], EntryMode::FILE);
Ok(())
}
#[tokio::test]
async fn test_walk_top_down_dir() -> Result<()> {
setup();
let mut iil = ImmutableIndexLayer::default();
for i in [
"dataset/stateful/ontime_2007_200.csv",
"dataset/stateful/ontime_2008_200.csv",
"dataset/stateful/ontime_2009_200.csv",
] {
iil.insert(i.to_string())
}
let op = build_operator(iil);
let mut ds = op.lister_with("/").recursive(true).await?;
let mut map = HashMap::new();
let mut set = HashSet::new();
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
"duplicated value: {}",
entry.path()
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}
debug!("current files: {map:?}");
assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
assert_eq!(map["dataset/stateful/ontime_2008_200.csv"], EntryMode::FILE);
assert_eq!(map["dataset/stateful/ontime_2009_200.csv"], EntryMode::FILE);
Ok(())
}
}