blob: 64c1fdbee7057c8db805009d98ec4f401b086979 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::types::PyDict;
use pyo3::types::PyTuple;
use pyo3::IntoPyObjectExt;
use pyo3_async_runtimes::tokio::future_into_py;
use crate::*;
fn build_operator(
scheme: ocore::Scheme,
map: HashMap<String, String>,
) -> PyResult<ocore::Operator> {
let mut op = ocore::Operator::via_iter(scheme, map).map_err(format_pyerr)?;
if !op.info().full_capability().blocking {
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _guard = runtime.enter();
op = op
.layer(ocore::layers::BlockingLayer::create().expect("blocking layer must be created"));
}
Ok(op)
}
/// `Operator` is the entry for all public blocking APIs
///
/// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`).
#[pyclass(module = "opendal")]
pub struct Operator {
core: ocore::BlockingOperator,
__scheme: ocore::Scheme,
__map: HashMap<String, String>,
}
#[pymethods]
impl Operator {
#[new]
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&Bound<PyDict>>) -> PyResult<Self> {
let scheme = ocore::Scheme::from_str(scheme)
.map_err(|err| {
ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme")
.set_source(err)
})
.map_err(format_pyerr)?;
let map = map
.map(|v| {
v.extract::<HashMap<String, String>>()
.expect("must be valid hashmap")
})
.unwrap_or_default();
Ok(Operator {
core: build_operator(scheme, map.clone())?.blocking(),
__scheme: scheme,
__map: map,
})
}
/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.core.clone().into());
Ok(Self {
core: op.blocking(),
__scheme: self.__scheme,
__map: self.__map.clone(),
})
}
/// Open a file-like reader for the given path.
pub fn open(&self, path: PathBuf, mode: String) -> PyResult<File> {
let path = path.to_string_lossy().to_string();
let this = self.core.clone();
if mode == "rb" {
let r = this
.reader(&path)
.map_err(format_pyerr)?
.into_std_read(..)
.map_err(format_pyerr)?;
Ok(File::new_reader(r))
} else if mode == "wb" {
let w = this.writer(&path).map_err(format_pyerr)?;
Ok(File::new_writer(w))
} else {
Err(Unsupported::new_err(format!(
"OpenDAL doesn't support mode: {mode}"
)))
}
}
/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let path = path.to_string_lossy().to_string();
let buffer = self.core.read(&path).map_err(format_pyerr)?.to_vec();
Buffer::new(buffer).into_bytes_ref(py)
}
/// Write bytes into given path.
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write(&self, path: PathBuf, bs: Vec<u8>, kwargs: Option<WriteOptions>) -> PyResult<()> {
let path = path.to_string_lossy().to_string();
let mut kwargs = kwargs.unwrap_or_default();
let mut write = self
.core
.write_with(&path, bs)
.append(kwargs.append.unwrap_or(false));
if let Some(chunk) = kwargs.chunk {
write = write.chunk(chunk);
}
if let Some(content_type) = &kwargs.content_type {
write = write.content_type(content_type);
}
if let Some(content_disposition) = &kwargs.content_disposition {
write = write.content_disposition(content_disposition);
}
if let Some(cache_control) = &kwargs.cache_control {
write = write.cache_control(cache_control);
}
if let Some(user_metadata) = kwargs.user_metadata.take() {
write = write.user_metadata(user_metadata);
}
write.call().map(|_| ()).map_err(format_pyerr)
}
/// Get current path's metadata **without cache** directly.
pub fn stat(&self, path: PathBuf) -> PyResult<Metadata> {
let path = path.to_string_lossy().to_string();
self.core
.stat(&path)
.map_err(format_pyerr)
.map(Metadata::new)
}
/// Copy source to target.
pub fn copy(&self, source: PathBuf, target: PathBuf) -> PyResult<()> {
let source = source.to_string_lossy().to_string();
let target = target.to_string_lossy().to_string();
self.core.copy(&source, &target).map_err(format_pyerr)
}
/// Rename filename.
pub fn rename(&self, source: PathBuf, target: PathBuf) -> PyResult<()> {
let source = source.to_string_lossy().to_string();
let target = target.to_string_lossy().to_string();
self.core.rename(&source, &target).map_err(format_pyerr)
}
/// Remove all file
pub fn remove_all(&self, path: PathBuf) -> PyResult<()> {
let path = path.to_string_lossy().to_string();
self.core.remove_all(&path).map_err(format_pyerr)
}
/// Create a dir at given path.
///
/// # Notes
///
/// To indicate that a path is a directory, it is compulsory to include
/// a trailing / in the path. Failure to do so may result in
/// `NotADirectory` error being returned by OpenDAL.
///
/// # Behavior
///
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir(&self, path: PathBuf) -> PyResult<()> {
let path = path.to_string_lossy().to_string();
self.core.create_dir(&path).map_err(format_pyerr)
}
/// Delete given path.
///
/// # Notes
///
/// - Delete not existing error won't return errors.
pub fn delete(&self, path: PathBuf) -> PyResult<()> {
let path = path.to_string_lossy().to_string();
self.core.delete(&path).map_err(format_pyerr)
}
/// Check given path is exists.
///
/// # Notes
///
/// - Check not existing path won't return errors.
pub fn exists(&self, path: PathBuf) -> PyResult<bool> {
let path = path.to_string_lossy().to_string();
self.core.exists(&path).map_err(format_pyerr)
}
/// List current dir path.
#[pyo3(signature = (path, *, start_after=None))]
pub fn list(&self, path: PathBuf, start_after: Option<String>) -> PyResult<BlockingLister> {
let path = path.to_string_lossy().to_string();
let mut builder = self.core.lister_with(&path);
if let Some(start_after) = start_after {
builder = builder.start_after(&start_after);
}
let l = builder.call().map_err(format_pyerr)?;
Ok(BlockingLister::new(l))
}
/// List dir in flat way.
pub fn scan(&self, path: PathBuf) -> PyResult<BlockingLister> {
let path = path.to_string_lossy().to_string();
let l = self
.core
.lister_with(&path)
.recursive(true)
.call()
.map_err(format_pyerr)?;
Ok(BlockingLister::new(l))
}
pub fn capability(&self) -> PyResult<capability::Capability> {
Ok(capability::Capability::new(
self.core.info().full_capability(),
))
}
/// Check if this operator can work correctly.
pub fn check(&self) -> PyResult<()> {
self.core.check().map_err(format_pyerr)
}
pub fn to_async_operator(&self) -> PyResult<AsyncOperator> {
Ok(AsyncOperator {
core: self.core.clone().into(),
__scheme: self.__scheme,
__map: self.__map.clone(),
})
}
fn __repr__(&self) -> String {
let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root())
} else {
format!(
"Operator(\"{}\", root=\"{}\", name=\"{name}\")",
info.scheme(),
info.root()
)
}
}
fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
let args = vec![self.__scheme.to_string()];
let args = PyTuple::new(py, args)?.into_py_any(py)?;
let kwargs = self.__map.clone().into_py_any(py)?;
PyTuple::new(py, [args, kwargs])?.into_py_any(py)
}
}
/// `AsyncOperator` is the entry for all public async APIs
///
/// Create a new `AsyncOperator` with the given `scheme` and options(`**kwargs`).
#[pyclass(module = "opendal")]
pub struct AsyncOperator {
core: ocore::Operator,
__scheme: ocore::Scheme,
__map: HashMap<String, String>,
}
#[pymethods]
impl AsyncOperator {
#[new]
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&Bound<PyDict>>) -> PyResult<Self> {
let scheme = ocore::Scheme::from_str(scheme)
.map_err(|err| {
ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme")
.set_source(err)
})
.map_err(format_pyerr)?;
let map = map
.map(|v| {
v.extract::<HashMap<String, String>>()
.expect("must be valid hashmap")
})
.unwrap_or_default();
Ok(AsyncOperator {
core: build_operator(scheme, map.clone())?,
__scheme: scheme,
__map: map,
})
}
/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.core.clone());
Ok(Self {
core: op,
__scheme: self.__scheme,
__map: self.__map.clone(),
})
}
/// Open a file-like reader for the given path.
pub fn open<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
mode: String,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
if mode == "rb" {
let r = this
.reader(&path)
.await
.map_err(format_pyerr)?
.into_futures_async_read(..)
.await
.map_err(format_pyerr)?;
Ok(AsyncFile::new_reader(r))
} else if mode == "wb" {
let w = this.writer(&path).await.map_err(format_pyerr)?;
Ok(AsyncFile::new_writer(w))
} else {
Err(Unsupported::new_err(format!(
"OpenDAL doesn't support mode: {mode}"
)))
}
})
}
/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?.to_vec();
Python::with_gil(|py| Buffer::new(res).into_bytes(py))
})
}
/// Write bytes into given path.
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
bs: &Bound<PyBytes>,
kwargs: Option<WriteOptions>,
) -> PyResult<Bound<'p, PyAny>> {
let mut kwargs = kwargs.unwrap_or_default();
let this = self.core.clone();
let bs = bs.as_bytes().to_vec();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let mut write = this
.write_with(&path, bs)
.append(kwargs.append.unwrap_or(false));
if let Some(buffer) = kwargs.chunk {
write = write.chunk(buffer);
}
if let Some(content_type) = &kwargs.content_type {
write = write.content_type(content_type);
}
if let Some(content_disposition) = &kwargs.content_disposition {
write = write.content_disposition(content_disposition);
}
if let Some(cache_control) = &kwargs.cache_control {
write = write.cache_control(cache_control);
}
if let Some(user_metadata) = kwargs.user_metadata.take() {
write = write.user_metadata(user_metadata);
}
write.await.map(|_| ()).map_err(format_pyerr)
})
}
/// Get current path's metadata **without cache** directly.
pub fn stat<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let res: Metadata = this
.stat(&path)
.await
.map_err(format_pyerr)
.map(Metadata::new)?;
Ok(res)
})
}
/// Copy source to target.``
pub fn copy<'p>(
&'p self,
py: Python<'p>,
source: PathBuf,
target: PathBuf,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let source = source.to_string_lossy().to_string();
let target = target.to_string_lossy().to_string();
future_into_py(py, async move {
this.copy(&source, &target).await.map_err(format_pyerr)
})
}
/// Rename filename
pub fn rename<'p>(
&'p self,
py: Python<'p>,
source: PathBuf,
target: PathBuf,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let source = source.to_string_lossy().to_string();
let target = target.to_string_lossy().to_string();
future_into_py(py, async move {
this.rename(&source, &target).await.map_err(format_pyerr)
})
}
/// Remove all file
pub fn remove_all<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
this.remove_all(&path).await.map_err(format_pyerr)
})
}
/// Check if this operator can work correctly.
pub fn check<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
future_into_py(py, async move { this.check().await.map_err(format_pyerr) })
}
/// Create a dir at given path.
///
/// # Notes
///
/// To indicate that a path is a directory, it is compulsory to include
/// a trailing / in the path. Failure to do so may result in
/// `NotADirectory` error being returned by OpenDAL.
///
/// # Behavior
///
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
this.create_dir(&path).await.map_err(format_pyerr)
})
}
/// Delete given path.
///
/// # Notes
///
/// - Delete not existing error won't return errors.
pub fn delete<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(
py,
async move { this.delete(&path).await.map_err(format_pyerr) },
)
}
/// Check given path is exists.
///
/// # Notes
///
/// - Check not existing path won't return errors.
pub fn exists<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(
py,
async move { this.exists(&path).await.map_err(format_pyerr) },
)
}
/// List current dir path.
#[pyo3(signature = (path, *, start_after=None))]
pub fn list<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
start_after: Option<String>,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let mut builder = this.lister_with(&path);
if let Some(start_after) = start_after {
builder = builder.start_after(&start_after);
}
let lister = builder.await.map_err(format_pyerr)?;
let pylister = Python::with_gil(|py| AsyncLister::new(lister).into_py_any(py))?;
Ok(pylister)
})
}
/// List dir in flat way.
pub fn scan<'p>(&'p self, py: Python<'p>, path: PathBuf) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let builder = this.lister_with(&path).recursive(true);
let lister = builder.await.map_err(format_pyerr)?;
let pylister: PyObject =
Python::with_gil(|py| AsyncLister::new(lister).into_py_any(py))?;
Ok(pylister)
})
}
/// Presign an operation for stat(head) which expires after `expire_second` seconds.
pub fn presign_stat<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
expire_second: u64,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let res = this
.presign_stat(&path, Duration::from_secs(expire_second))
.await
.map_err(format_pyerr)
.map(PresignedRequest)?;
Ok(res)
})
}
/// Presign an operation for read which expires after `expire_second` seconds.
pub fn presign_read<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
expire_second: u64,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let res = this
.presign_read(&path, Duration::from_secs(expire_second))
.await
.map_err(format_pyerr)
.map(PresignedRequest)?;
Ok(res)
})
}
/// Presign an operation for write which expires after `expire_second` seconds.
pub fn presign_write<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
expire_second: u64,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let res = this
.presign_write(&path, Duration::from_secs(expire_second))
.await
.map_err(format_pyerr)
.map(PresignedRequest)?;
Ok(res)
})
}
/// Presign an operation for delete which expires after `expire_second` seconds.
pub fn presign_delete<'p>(
&'p self,
py: Python<'p>,
path: PathBuf,
expire_second: u64,
) -> PyResult<Bound<'p, PyAny>> {
let this = self.core.clone();
let path = path.to_string_lossy().to_string();
future_into_py(py, async move {
let res = this
.presign_delete(&path, Duration::from_secs(expire_second))
.await
.map_err(format_pyerr)
.map(PresignedRequest)?;
Ok(res)
})
}
pub fn capability(&self) -> PyResult<capability::Capability> {
Ok(capability::Capability::new(
self.core.info().full_capability(),
))
}
pub fn to_operator(&self) -> PyResult<Operator> {
Ok(Operator {
core: self.core.clone().blocking(),
__scheme: self.__scheme,
__map: self.__map.clone(),
})
}
fn __repr__(&self) -> String {
let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!(
"AsyncOperator(\"{}\", root=\"{}\")",
info.scheme(),
info.root()
)
} else {
format!(
"AsyncOperator(\"{}\", root=\"{}\", name=\"{name}\")",
info.scheme(),
info.root()
)
}
}
fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
let args = vec![self.__scheme.to_string()];
let args = PyTuple::new(py, args)?.into_py_any(py)?;
let kwargs = self.__map.clone().into_py_any(py)?;
PyTuple::new(py, [args, kwargs])?.into_py_any(py)
}
}
#[pyclass(module = "opendal")]
pub struct PresignedRequest(ocore::raw::PresignedRequest);
#[pymethods]
impl PresignedRequest {
/// Return the URL of this request.
#[getter]
pub fn url(&self) -> String {
self.0.uri().to_string()
}
/// Return the HTTP method of this request.
#[getter]
pub fn method(&self) -> &str {
self.0.method().as_str()
}
/// Return the HTTP headers of this request.
#[getter]
pub fn headers(&self) -> PyResult<HashMap<&str, &str>> {
let mut headers = HashMap::new();
for (k, v) in self.0.header().iter() {
let k = k.as_str();
let v = v
.to_str()
.map_err(|err| Unexpected::new_err(err.to_string()))?;
if headers.insert(k, v).is_some() {
return Err(Unexpected::new_err("duplicate header"));
}
}
Ok(headers)
}
}