blob: 59c802901daa6a4af5d0783ef99f43a56ffda0e3 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::Debug;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use monoio::fs::OpenOptions;
use opendal_core::raw::*;
use opendal_core::*;
use super::config::MonoiofsConfig;
use super::core::BUFFER_SIZE;
use super::core::MonoiofsCore;
use super::deleter::MonoiofsDeleter;
use super::reader::MonoiofsReader;
use super::writer::MonoiofsWriter;
/// File system support via [`monoio`].
#[doc = include_str!("docs.md")]
#[derive(Debug, Default)]
pub struct MonoiofsBuilder {
pub(super) config: MonoiofsConfig,
}
impl MonoiofsBuilder {
/// Set root of this backend.
///
/// All operations will happen under this root.
pub fn root(mut self, root: &str) -> Self {
self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
};
self
}
}
impl Builder for MonoiofsBuilder {
type Config = MonoiofsConfig;
fn build(self) -> Result<impl Access> {
let root = self.config.root.map(PathBuf::from).ok_or(
Error::new(ErrorKind::ConfigInvalid, "root is not specified")
.with_operation("Builder::build"),
)?;
if let Err(e) = std::fs::metadata(&root) {
if e.kind() == io::ErrorKind::NotFound {
std::fs::create_dir_all(&root).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create root dir failed")
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})?;
}
}
let root = root.canonicalize().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"canonicalize of root directory failed",
)
.with_operation("Builder::build")
.with_context("root", root.to_string_lossy())
.set_source(e)
})?;
let worker_threads = 1; // TODO: test concurrency and default to available_parallelism and bind cpu
let io_uring_entries = 1024;
Ok(MonoiofsBackend {
core: Arc::new(MonoiofsCore::new(root, worker_threads, io_uring_entries)),
})
}
}
#[derive(Debug, Clone)]
pub struct MonoiofsBackend {
core: Arc<MonoiofsCore>,
}
impl Access for MonoiofsBackend {
type Reader = MonoiofsReader;
type Writer = MonoiofsWriter;
type Lister = ();
type Deleter = oio::OneShotDeleter<MonoiofsDeleter>;
fn info(&self) -> Arc<AccessorInfo> {
self.core.info.clone()
}
async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
let path = self.core.prepare_path(path);
let meta = self
.core
.dispatch(move || monoio::fs::metadata(path))
.await
.map_err(new_std_io_error)?;
let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let m = Metadata::new(mode)
.with_content_length(meta.len())
.with_last_modified(Timestamp::try_from(
meta.modified().map_err(new_std_io_error)?,
)?);
Ok(RpStat::new(m))
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path = self.core.prepare_path(path);
let reader = MonoiofsReader::new(self.core.clone(), path, args.range()).await?;
Ok((RpRead::default(), reader))
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path = self.core.prepare_write_path(path).await?;
let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?;
Ok((RpWrite::default(), writer))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
oio::OneShotDeleter::new(MonoiofsDeleter::new(self.core.clone())),
))
}
async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch(move || monoio::fs::rename(from, to))
.await
.map_err(new_std_io_error)?;
Ok(RpRename::default())
}
async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let path = self.core.prepare_path(path);
self.core
.dispatch(move || monoio::fs::create_dir_all(path))
.await
.map_err(new_std_io_error)?;
Ok(RpCreateDir::default())
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let from = self.core.prepare_path(from);
// ensure file exists
self.core
.dispatch({
let from = from.clone();
move || monoio::fs::metadata(from)
})
.await
.map_err(new_std_io_error)?;
let to = self.core.prepare_write_path(to).await?;
self.core
.dispatch({
let core = self.core.clone();
move || async move {
let from = OpenOptions::new().read(true).open(from).await?;
let to = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(to)
.await?;
// AsyncReadRent and AsyncWriteRent is not implemented
// for File, so we can't write this:
// monoio::io::copy(&mut from, &mut to).await?;
let mut pos = 0;
// allocate and resize buffer
let mut buf = core.buf_pool.get();
// set capacity of buf to exact size to avoid excessive read
buf.reserve(BUFFER_SIZE);
let _ = buf.split_off(BUFFER_SIZE);
loop {
let result;
(result, buf) = from.read_at(buf, pos).await;
if result? == 0 {
// EOF
break;
}
let result;
(result, buf) = to.write_all_at(buf, pos).await;
result?;
pos += buf.len() as u64;
buf.clear();
}
core.buf_pool.put(buf);
Ok(())
}
})
.await
.map_err(new_std_io_error)?;
Ok(RpCopy::default())
}
}