| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::collections::HashMap; |
| use std::str::FromStr; |
| use std::time::Duration; |
| |
| use pyo3::exceptions::PyValueError; |
| use pyo3::prelude::*; |
| use pyo3::types::PyBytes; |
| use pyo3::types::PyDict; |
| use pyo3_asyncio::tokio::future_into_py; |
| |
| use crate::*; |
| |
| fn build_operator( |
| scheme: ocore::Scheme, |
| map: HashMap<String, String>, |
| ) -> PyResult<ocore::Operator> { |
| let mut op = ocore::Operator::via_map(scheme, map).map_err(format_pyerr)?; |
| if !op.info().full_capability().blocking { |
| let runtime = pyo3_asyncio::tokio::get_runtime(); |
| let _guard = runtime.enter(); |
| op = op |
| .layer(ocore::layers::BlockingLayer::create().expect("blocking layer must be created")); |
| } |
| |
| Ok(op) |
| } |
| |
| /// `Operator` is the entry for all public blocking APIs |
| /// |
| /// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`). |
| #[pyclass(module = "opendal")] |
| pub struct Operator(ocore::BlockingOperator); |
| |
| #[pymethods] |
| impl Operator { |
| #[new] |
| #[pyo3(signature = (scheme, *, **map))] |
| pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { |
| let scheme = ocore::Scheme::from_str(scheme) |
| .map_err(|err| { |
| ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme") |
| .set_source(err) |
| }) |
| .map_err(format_pyerr)?; |
| let map = map |
| .map(|v| { |
| v.extract::<HashMap<String, String>>() |
| .expect("must be valid hashmap") |
| }) |
| .unwrap_or_default(); |
| |
| Ok(Operator(build_operator(scheme, map)?.blocking())) |
| } |
| |
| /// Add new layers upon existing operator |
| pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> { |
| let op = layer.0.layer(self.0.clone().into()); |
| Ok(Self(op.blocking())) |
| } |
| |
| /// Open a file-like reader for the given path. |
| pub fn open(&self, path: String, mode: String) -> PyResult<File> { |
| let this = self.0.clone(); |
| |
| if mode == "rb" { |
| let r = this.reader(&path).map_err(format_pyerr)?; |
| Ok(File::new_reader(r)) |
| } else if mode == "wb" { |
| let w = this.writer(&path).map_err(format_pyerr)?; |
| Ok(File::new_writer(w)) |
| } else { |
| Err(UnsupportedError::new_err(format!( |
| "OpenDAL doesn't support mode: {mode}" |
| ))) |
| } |
| } |
| |
| /// Read the whole path into bytes. |
| pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<&'p PyAny> { |
| let buffer = self.0.read(path).map_err(format_pyerr)?; |
| Buffer::new(buffer).into_memory_view_ref(py) |
| } |
| |
| /// Write bytes into given path. |
| #[pyo3(signature = (path, bs, **kwargs))] |
| pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) -> PyResult<()> { |
| let opwrite = build_opwrite(kwargs)?; |
| let mut write = self.0.write_with(path, bs).append(opwrite.append()); |
| if let Some(buffer) = opwrite.buffer() { |
| write = write.buffer(buffer); |
| } |
| if let Some(content_type) = opwrite.content_type() { |
| write = write.content_type(content_type); |
| } |
| if let Some(content_disposition) = opwrite.content_disposition() { |
| write = write.content_disposition(content_disposition); |
| } |
| if let Some(cache_control) = opwrite.cache_control() { |
| write = write.cache_control(cache_control); |
| } |
| |
| write.call().map_err(format_pyerr) |
| } |
| |
| /// Get current path's metadata **without cache** directly. |
| pub fn stat(&self, path: &str) -> PyResult<Metadata> { |
| self.0.stat(path).map_err(format_pyerr).map(Metadata::new) |
| } |
| |
| /// Copy source to target. |
| pub fn copy(&self, source: &str, target: &str) -> PyResult<()> { |
| self.0.copy(source, target).map_err(format_pyerr) |
| } |
| |
| /// Rename filename. |
| pub fn rename(&self, source: &str, target: &str) -> PyResult<()> { |
| self.0.rename(source, target).map_err(format_pyerr) |
| } |
| |
| /// Remove all file |
| pub fn remove_all(&self, path: &str) -> PyResult<()> { |
| self.0.remove_all(path).map_err(format_pyerr) |
| } |
| |
| /// Create a dir at given path. |
| /// |
| /// # Notes |
| /// |
| /// To indicate that a path is a directory, it is compulsory to include |
| /// a trailing / in the path. Failure to do so may result in |
| /// `NotADirectory` error being returned by OpenDAL. |
| /// |
| /// # Behavior |
| /// |
| /// - Create on existing dir will succeed. |
| /// - Create dir is always recursive, works like `mkdir -p` |
| pub fn create_dir(&self, path: &str) -> PyResult<()> { |
| self.0.create_dir(path).map_err(format_pyerr) |
| } |
| |
| /// Delete given path. |
| /// |
| /// # Notes |
| /// |
| /// - Delete not existing error won't return errors. |
| pub fn delete(&self, path: &str) -> PyResult<()> { |
| self.0.delete(path).map_err(format_pyerr) |
| } |
| |
| /// List current dir path. |
| pub fn list(&self, path: &str) -> PyResult<BlockingLister> { |
| let l = self.0.lister(path).map_err(format_pyerr)?; |
| Ok(BlockingLister::new(l)) |
| } |
| |
| /// List dir in flat way. |
| pub fn scan(&self, path: &str) -> PyResult<BlockingLister> { |
| let l = self |
| .0 |
| .lister_with(path) |
| .recursive(true) |
| .call() |
| .map_err(format_pyerr)?; |
| Ok(BlockingLister::new(l)) |
| } |
| |
| pub fn capability(&self) -> PyResult<capability::Capability> { |
| Ok(capability::Capability::new(self.0.info().full_capability())) |
| } |
| |
| pub fn to_async_operator(&self) -> PyResult<AsyncOperator> { |
| Ok(AsyncOperator(self.0.clone().into())) |
| } |
| |
| fn __repr__(&self) -> String { |
| let info = self.0.info(); |
| let name = info.name(); |
| if name.is_empty() { |
| format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root()) |
| } else { |
| format!( |
| "Operator(\"{}\", root=\"{}\", name=\"{name}\")", |
| info.scheme(), |
| info.root() |
| ) |
| } |
| } |
| } |
| |
| /// `AsyncOperator` is the entry for all public async APIs |
| /// |
| /// Create a new `AsyncOperator` with the given `scheme` and options(`**kwargs`). |
| #[pyclass(module = "opendal")] |
| pub struct AsyncOperator(ocore::Operator); |
| |
| #[pymethods] |
| impl AsyncOperator { |
| #[new] |
| #[pyo3(signature = (scheme, *, **map))] |
| pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> { |
| let scheme = ocore::Scheme::from_str(scheme) |
| .map_err(|err| { |
| ocore::Error::new(ocore::ErrorKind::Unexpected, "unsupported scheme") |
| .set_source(err) |
| }) |
| .map_err(format_pyerr)?; |
| let map = map |
| .map(|v| { |
| v.extract::<HashMap<String, String>>() |
| .expect("must be valid hashmap") |
| }) |
| .unwrap_or_default(); |
| |
| Ok(AsyncOperator(build_operator(scheme, map)?)) |
| } |
| |
| /// Add new layers upon existing operator |
| pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> { |
| let op = layer.0.layer(self.0.clone()); |
| Ok(Self(op)) |
| } |
| |
| /// Open a file-like reader for the given path. |
| pub fn open<'p>(&'p self, py: Python<'p>, path: String, mode: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| |
| future_into_py(py, async move { |
| if mode == "rb" { |
| let r = this.reader(&path).await.map_err(format_pyerr)?; |
| Ok(AsyncFile::new_reader(r)) |
| } else if mode == "wb" { |
| let w = this.writer(&path).await.map_err(format_pyerr)?; |
| Ok(AsyncFile::new_writer(w)) |
| } else { |
| Err(UnsupportedError::new_err(format!( |
| "OpenDAL doesn't support mode: {mode}" |
| ))) |
| } |
| }) |
| } |
| |
| /// Read the whole path into bytes. |
| pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?; |
| Python::with_gil(|py| Buffer::new(res).into_memory_view(py)) |
| }) |
| } |
| |
| /// Write bytes into given path. |
| #[pyo3(signature = (path, bs, **kwargs))] |
| pub fn write<'p>( |
| &'p self, |
| py: Python<'p>, |
| path: String, |
| bs: &PyBytes, |
| kwargs: Option<&PyDict>, |
| ) -> PyResult<&'p PyAny> { |
| let opwrite = build_opwrite(kwargs)?; |
| let this = self.0.clone(); |
| let bs = bs.as_bytes().to_vec(); |
| future_into_py(py, async move { |
| let mut write = this.write_with(&path, bs).append(opwrite.append()); |
| if let Some(buffer) = opwrite.buffer() { |
| write = write.buffer(buffer); |
| } |
| if let Some(content_type) = opwrite.content_type() { |
| write = write.content_type(content_type); |
| } |
| if let Some(content_disposition) = opwrite.content_disposition() { |
| write = write.content_disposition(content_disposition); |
| } |
| if let Some(cache_control) = opwrite.cache_control() { |
| write = write.cache_control(cache_control); |
| } |
| write.await.map_err(format_pyerr) |
| }) |
| } |
| |
| /// Get current path's metadata **without cache** directly. |
| pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let res: Metadata = this |
| .stat(&path) |
| .await |
| .map_err(format_pyerr) |
| .map(Metadata::new)?; |
| |
| Ok(res) |
| }) |
| } |
| |
| /// Copy source to target.`` |
| pub fn copy<'p>( |
| &'p self, |
| py: Python<'p>, |
| source: String, |
| target: String, |
| ) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| this.copy(&source, &target).await.map_err(format_pyerr) |
| }) |
| } |
| |
| /// Rename filename |
| pub fn rename<'p>( |
| &'p self, |
| py: Python<'p>, |
| source: String, |
| target: String, |
| ) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| this.rename(&source, &target).await.map_err(format_pyerr) |
| }) |
| } |
| |
| /// Remove all file |
| pub fn remove_all<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| this.remove_all(&path).await.map_err(format_pyerr) |
| }) |
| } |
| |
| /// Create a dir at given path. |
| /// |
| /// # Notes |
| /// |
| /// To indicate that a path is a directory, it is compulsory to include |
| /// a trailing / in the path. Failure to do so may result in |
| /// `NotADirectory` error being returned by OpenDAL. |
| /// |
| /// # Behavior |
| /// |
| /// - Create on existing dir will succeed. |
| /// - Create dir is always recursive, works like `mkdir -p` |
| pub fn create_dir<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| this.create_dir(&path).await.map_err(format_pyerr) |
| }) |
| } |
| |
| /// Delete given path. |
| /// |
| /// # Notes |
| /// |
| /// - Delete not existing error won't return errors. |
| pub fn delete<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py( |
| py, |
| async move { this.delete(&path).await.map_err(format_pyerr) }, |
| ) |
| } |
| |
| /// List current dir path. |
| pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let lister = this.lister(&path).await.map_err(format_pyerr)?; |
| let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py)); |
| Ok(pylister) |
| }) |
| } |
| |
| /// List dir in flat way. |
| pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let lister = this |
| .lister_with(&path) |
| .recursive(true) |
| .await |
| .map_err(format_pyerr)?; |
| let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py)); |
| Ok(pylister) |
| }) |
| } |
| |
| /// Presign an operation for stat(head) which expires after `expire_second` seconds. |
| pub fn presign_stat<'p>( |
| &'p self, |
| py: Python<'p>, |
| path: String, |
| expire_second: u64, |
| ) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let res = this |
| .presign_stat(&path, Duration::from_secs(expire_second)) |
| .await |
| .map_err(format_pyerr) |
| .map(PresignedRequest)?; |
| |
| Ok(res) |
| }) |
| } |
| |
| /// Presign an operation for read which expires after `expire_second` seconds. |
| pub fn presign_read<'p>( |
| &'p self, |
| py: Python<'p>, |
| path: String, |
| expire_second: u64, |
| ) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let res = this |
| .presign_read(&path, Duration::from_secs(expire_second)) |
| .await |
| .map_err(format_pyerr) |
| .map(PresignedRequest)?; |
| |
| Ok(res) |
| }) |
| } |
| |
| /// Presign an operation for write which expires after `expire_second` seconds. |
| pub fn presign_write<'p>( |
| &'p self, |
| py: Python<'p>, |
| path: String, |
| expire_second: u64, |
| ) -> PyResult<&'p PyAny> { |
| let this = self.0.clone(); |
| future_into_py(py, async move { |
| let res = this |
| .presign_write(&path, Duration::from_secs(expire_second)) |
| .await |
| .map_err(format_pyerr) |
| .map(PresignedRequest)?; |
| |
| Ok(res) |
| }) |
| } |
| |
| pub fn capability(&self) -> PyResult<capability::Capability> { |
| Ok(capability::Capability::new(self.0.info().full_capability())) |
| } |
| |
| pub fn to_operator(&self) -> PyResult<Operator> { |
| Ok(Operator(self.0.clone().blocking())) |
| } |
| |
| fn __repr__(&self) -> String { |
| let info = self.0.info(); |
| let name = info.name(); |
| if name.is_empty() { |
| format!( |
| "AsyncOperator(\"{}\", root=\"{}\")", |
| info.scheme(), |
| info.root() |
| ) |
| } else { |
| format!( |
| "AsyncOperator(\"{}\", root=\"{}\", name=\"{name}\")", |
| info.scheme(), |
| info.root() |
| ) |
| } |
| } |
| } |
| |
| /// recognize OpWrite-equivalent options passed as python dict |
| pub(crate) fn build_opwrite(kwargs: Option<&PyDict>) -> PyResult<ocore::raw::OpWrite> { |
| use ocore::raw::OpWrite; |
| let mut op = OpWrite::new(); |
| |
| let dict = if let Some(kwargs) = kwargs { |
| kwargs |
| } else { |
| return Ok(op); |
| }; |
| |
| if let Some(append) = dict.get_item("append") { |
| let v = append |
| .extract::<bool>() |
| .map_err(|err| PyValueError::new_err(format!("append must be bool, got {}", err)))?; |
| op = op.with_append(v); |
| } |
| |
| if let Some(buffer) = dict.get_item("buffer") { |
| let v = buffer |
| .extract::<usize>() |
| .map_err(|err| PyValueError::new_err(format!("buffer must be usize, got {}", err)))?; |
| op = op.with_buffer(v); |
| } |
| |
| if let Some(content_type) = dict.get_item("content_type") { |
| let v = content_type.extract::<String>().map_err(|err| { |
| PyValueError::new_err(format!("content_type must be str, got {}", err)) |
| })?; |
| op = op.with_content_type(v.as_str()); |
| } |
| |
| if let Some(content_disposition) = dict.get_item("content_disposition") { |
| let v = content_disposition.extract::<String>().map_err(|err| { |
| PyValueError::new_err(format!("content_disposition must be str, got {}", err)) |
| })?; |
| op = op.with_content_disposition(v.as_str()); |
| } |
| |
| if let Some(cache_control) = dict.get_item("cache_control") { |
| let v = cache_control.extract::<String>().map_err(|err| { |
| PyValueError::new_err(format!("cache_control must be str, got {}", err)) |
| })?; |
| op = op.with_cache_control(v.as_str()); |
| } |
| |
| Ok(op) |
| } |
| |
| #[pyclass(module = "opendal")] |
| pub struct PresignedRequest(ocore::raw::PresignedRequest); |
| |
| #[pymethods] |
| impl PresignedRequest { |
| /// Return the URL of this request. |
| #[getter] |
| pub fn url(&self) -> String { |
| self.0.uri().to_string() |
| } |
| |
| /// Return the HTTP method of this request. |
| #[getter] |
| pub fn method(&self) -> &str { |
| self.0.method().as_str() |
| } |
| |
| /// Return the HTTP headers of this request. |
| #[getter] |
| pub fn headers(&self) -> PyResult<HashMap<&str, &str>> { |
| let mut headers = HashMap::new(); |
| for (k, v) in self.0.header().iter() { |
| let k = k.as_str(); |
| let v = v |
| .to_str() |
| .map_err(|err| UnexpectedError::new_err(err.to_string()))?; |
| if headers.insert(k, v).is_some() { |
| return Err(UnexpectedError::new_err("duplicate header")); |
| } |
| } |
| Ok(headers) |
| } |
| } |