blob: 7eaa6aa6d50f7c6af29006fca9f57a5764f64adc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use opendal::Operator;
use url::Url;
use super::storage::Storage;
use crate::{Error, ErrorKind, Result};
/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
///
/// All path passed to `FileIO` must be absolute path starting with scheme string used to construct `FileIO`.
/// For example, if you construct `FileIO` with `s3a` scheme, then all path passed to `FileIO` must start with `s3a://`.
///
/// Supported storages:
///
/// | Storage | Feature Flag | Schemes |
/// |--------------------|-------------------|------------|
/// | Local file system | `storage-fs` | `file` |
/// | Memory | `storage-memory` | `memory` |
/// | S3 | `storage-s3` | `s3`, `s3a`|
/// | GCS | `storage-gcs` | `gcs` |
#[derive(Clone, Debug)]
pub struct FileIO {
builder: FileIOBuilder,
inner: Arc<Storage>,
}
impl FileIO {
/// Convert FileIO into [`FileIOBuilder`] which used to build this FileIO.
///
/// This function is useful when you want serialize and deserialize FileIO across
/// distributed systems.
pub fn into_builder(self) -> FileIOBuilder {
self.builder
}
/// Try to infer file io scheme from path. See [`FileIO`] for supported schemes.
///
/// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.
/// - If it's not a valid url, will try to detect if it's a file path.
///
/// Otherwise will return parsing error.
pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
let url = Url::parse(path.as_ref())
.map_err(Error::from)
.or_else(|e| {
Url::from_file_path(path.as_ref()).map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
"Input is neither a valid url nor path",
)
.with_context("input", path.as_ref().to_string())
.with_source(e)
})
})?;
Ok(FileIOBuilder::new(url.scheme()))
}
/// Deletes file.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.delete(relative_path).await?)
}
/// Remove the path and all nested dirs and files recursively.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.remove_all(relative_path).await?)
}
/// Check file exists.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.exists(relative_path).await?)
}
/// Creates input file.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(InputFile {
op,
path,
relative_path_pos,
})
}
/// Creates output file.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(OutputFile {
op,
path,
relative_path_pos,
})
}
}
/// Builder for [`FileIO`].
#[derive(Clone, Debug)]
pub struct FileIOBuilder {
/// This is used to infer scheme of operator.
///
/// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) will build a local file io.
scheme_str: Option<String>,
/// Arguments for operator.
props: HashMap<String, String>,
}
impl FileIOBuilder {
/// Creates a new builder with scheme.
/// See [`FileIO`] for supported schemes.
pub fn new(scheme_str: impl ToString) -> Self {
Self {
scheme_str: Some(scheme_str.to_string()),
props: HashMap::default(),
}
}
/// Creates a new builder for local file io.
pub fn new_fs_io() -> Self {
Self {
scheme_str: None,
props: HashMap::default(),
}
}
/// Fetch the scheme string.
///
/// The scheme_str will be empty if it's None.
pub fn into_parts(self) -> (String, HashMap<String, String>) {
(self.scheme_str.unwrap_or_default(), self.props)
}
/// Add argument for operator.
pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
self.props.insert(key.to_string(), value.to_string());
self
}
/// Add argument for operator.
pub fn with_props(
mut self,
args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
) -> Self {
self.props
.extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
self
}
/// Builds [`FileIO`].
pub fn build(self) -> Result<FileIO> {
let storage = Storage::build(self.clone())?;
Ok(FileIO {
builder: self,
inner: Arc::new(storage),
})
}
}
/// The struct the represents the metadata of a file.
///
/// TODO: we can add last modified time, content type, etc. in the future.
pub struct FileMetadata {
/// The size of the file.
pub size: u64,
}
/// Trait for reading file.
///
/// # TODO
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
pub trait FileRead: Send + Unpin + 'static {
/// Read file content with given range.
///
/// TODO: we can support reading non-contiguous bytes in the future.
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}
#[async_trait::async_trait]
impl FileRead for opendal::Reader {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
Ok(opendal::Reader::read(self, range).await?.to_bytes())
}
}
/// Input file is used for reading from files.
#[derive(Debug)]
pub struct InputFile {
op: Operator,
// Absolution path of file.
path: String,
// Relative path of file to uri, starts at [`relative_path_pos`]
relative_path_pos: usize,
}
impl InputFile {
/// Absolute path to root uri.
pub fn location(&self) -> &str {
&self.path
}
/// Check if file exists.
pub async fn exists(&self) -> crate::Result<bool> {
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}
/// Fetch and returns metadata of file.
pub async fn metadata(&self) -> crate::Result<FileMetadata> {
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
Ok(FileMetadata {
size: meta.content_length(),
})
}
/// Read and returns whole content of file.
///
/// For continues reading, use [`Self::reader`] instead.
pub async fn read(&self) -> crate::Result<Bytes> {
Ok(self
.op
.read(&self.path[self.relative_path_pos..])
.await?
.to_bytes())
}
/// Creates [`FileRead`] for continues reading.
///
/// For one-time reading, use [`Self::read`] instead.
pub async fn reader(&self) -> crate::Result<impl FileRead> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
}
}
/// Trait for writing file.
///
/// # TODO
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
pub trait FileWrite: Send + Unpin + 'static {
/// Write bytes to file.
///
/// TODO: we can support writing non-contiguous bytes in the future.
async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
/// Close file.
///
/// Calling close on closed file will generate an error.
async fn close(&mut self) -> crate::Result<()>;
}
#[async_trait::async_trait]
impl FileWrite for opendal::Writer {
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
Ok(opendal::Writer::write(self, bs).await?)
}
async fn close(&mut self) -> crate::Result<()> {
Ok(opendal::Writer::close(self).await?)
}
}
/// Output file is used for writing to files..
#[derive(Debug)]
pub struct OutputFile {
op: Operator,
// Absolution path of file.
path: String,
// Relative path of file to uri, starts at [`relative_path_pos`]
relative_path_pos: usize,
}
impl OutputFile {
/// Relative path to root uri.
pub fn location(&self) -> &str {
&self.path
}
/// Checks if file exists.
pub async fn exists(&self) -> crate::Result<bool> {
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}
/// Converts into [`InputFile`].
pub fn to_input_file(self) -> InputFile {
InputFile {
op: self.op,
path: self.path,
relative_path_pos: self.relative_path_pos,
}
}
/// Create a new output file with given bytes.
///
/// # Notes
///
/// Calling `write` will overwrite the file if it exists.
/// For continues writing, use [`Self::writer`].
pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
let mut writer = self.writer().await?;
writer.write(bs).await?;
writer.close().await
}
/// Creates output file for continues writing.
///
/// # Notes
///
/// For one-time writing, use [`Self::write`] instead.
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
}
}
#[cfg(test)]
mod tests {
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use bytes::Bytes;
use futures::io::AllowStdIo;
use futures::AsyncReadExt;
use tempfile::TempDir;
use super::{FileIO, FileIOBuilder};
fn create_local_file_io() -> FileIO {
FileIOBuilder::new_fs_io().build().unwrap()
}
fn write_to_file<P: AsRef<Path>>(s: &str, path: P) {
create_dir_all(path.as_ref().parent().unwrap()).unwrap();
let mut f = File::create(path).unwrap();
write!(f, "{s}").unwrap();
}
async fn read_from_file<P: AsRef<Path>>(path: P) -> String {
let mut f = AllowStdIo::new(File::open(path).unwrap());
let mut s = String::new();
f.read_to_string(&mut s).await.unwrap();
s
}
#[tokio::test]
async fn test_local_input_file() {
let tmp_dir = TempDir::new().unwrap();
let file_name = "a.txt";
let content = "Iceberg loves rust.";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
write_to_file(content, &full_path);
let file_io = create_local_file_io();
let input_file = file_io.new_input(&full_path).unwrap();
assert!(input_file.exists().await.unwrap());
// Remove heading slash
assert_eq!(&full_path, input_file.location());
let read_content = read_from_file(full_path).await;
assert_eq!(content, &read_content);
}
#[tokio::test]
async fn test_delete_local_file() {
let tmp_dir = TempDir::new().unwrap();
let a_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), "a.txt");
let sub_dir_path = format!("{}/sub", tmp_dir.path().to_str().unwrap());
let b_path = format!("{}/{}", sub_dir_path, "b.txt");
let c_path = format!("{}/{}", sub_dir_path, "c.txt");
write_to_file("Iceberg loves rust.", &a_path);
write_to_file("Iceberg loves rust.", &b_path);
write_to_file("Iceberg loves rust.", &c_path);
let file_io = create_local_file_io();
assert!(file_io.exists(&a_path).await.unwrap());
file_io.remove_all(&sub_dir_path).await.unwrap();
assert!(!file_io.exists(&b_path).await.unwrap());
assert!(!file_io.exists(&c_path).await.unwrap());
assert!(file_io.exists(&a_path).await.unwrap());
file_io.delete(&a_path).await.unwrap();
assert!(!file_io.exists(&a_path).await.unwrap());
}
#[tokio::test]
async fn test_delete_non_exist_file() {
let tmp_dir = TempDir::new().unwrap();
let file_name = "a.txt";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
let file_io = create_local_file_io();
assert!(!file_io.exists(&full_path).await.unwrap());
assert!(file_io.delete(&full_path).await.is_ok());
assert!(file_io.remove_all(&full_path).await.is_ok());
}
#[tokio::test]
async fn test_local_output_file() {
let tmp_dir = TempDir::new().unwrap();
let file_name = "a.txt";
let content = "Iceberg loves rust.";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
let file_io = create_local_file_io();
let output_file = file_io.new_output(&full_path).unwrap();
assert!(!output_file.exists().await.unwrap());
{
output_file.write(content.into()).await.unwrap();
}
assert_eq!(&full_path, output_file.location());
let read_content = read_from_file(full_path).await;
assert_eq!(content, &read_content);
}
#[test]
fn test_create_file_from_path() {
let io = FileIO::from_path("/tmp/a").unwrap();
assert_eq!("file", io.scheme_str.unwrap().as_str());
let io = FileIO::from_path("file:/tmp/b").unwrap();
assert_eq!("file", io.scheme_str.unwrap().as_str());
let io = FileIO::from_path("file:///tmp/c").unwrap();
assert_eq!("file", io.scheme_str.unwrap().as_str());
let io = FileIO::from_path("s3://bucket/a").unwrap();
assert_eq!("s3", io.scheme_str.unwrap().as_str());
let io = FileIO::from_path("tmp/||c");
assert!(io.is_err());
}
#[tokio::test]
async fn test_memory_io() {
let io = FileIOBuilder::new("memory").build().unwrap();
let path = format!("{}/1.txt", TempDir::new().unwrap().path().to_str().unwrap());
let output_file = io.new_output(&path).unwrap();
output_file.write("test".into()).await.unwrap();
assert!(io.exists(&path.clone()).await.unwrap());
let input_file = io.new_input(&path).unwrap();
let content = input_file.read().await.unwrap();
assert_eq!(content, Bytes::from("test"));
io.delete(&path).await.unwrap();
assert!(!io.exists(&path).await.unwrap());
}
}