blob: 10aa24d800ee06ee3b61f9f368e0688db542234b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::io::BufRead;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::ops::DerefMut;
use std::sync::Arc;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;
use pyo3::buffer::PyBuffer;
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::IntoPyObjectExt;
use pyo3_async_runtimes::tokio::future_into_py;
use tokio::sync::Mutex;
use crate::*;
/// A file-like object.
/// Can be used as a context manager.
#[pyclass(module = "opendal")]
pub struct File(FileState);
enum FileState {
Reader(ocore::blocking::StdReader),
Writer(ocore::blocking::StdWriter),
Closed,
}
impl File {
pub fn new_reader(reader: ocore::blocking::StdReader) -> Self {
Self(FileState::Reader(reader))
}
pub fn new_writer(writer: ocore::blocking::Writer) -> Self {
Self(FileState::Writer(writer.into_std_write()))
}
}
#[pymethods]
impl File {
/// Read and return at most size bytes, or if size is not given, until EOF.
#[pyo3(signature = (size=None,))]
pub fn read<'p>(
&'p mut self,
py: Python<'p>,
size: Option<usize>,
) -> PyResult<Bound<'p, PyAny>> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let buffer = match size {
Some(size) => {
let mut bs = vec![0; size];
let n = reader
.read(&mut bs)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
bs.truncate(n);
bs
}
None => {
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
}
};
Buffer::new(buffer).into_bytes_ref(py)
}
/// Read a single line from the file.
/// A newline character (`\n`) is left at the end of the string, and is only omitted on the last line of the file if the file doesn’t end in a newline.
/// If size is specified, at most size bytes will be read.
#[pyo3(signature = (size=None,))]
pub fn readline<'p>(
&'p mut self,
py: Python<'p>,
size: Option<usize>,
) -> PyResult<Bound<'p, PyAny>> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let buffer = match size {
None => {
let mut buffer = Vec::new();
reader
.read_until(b'\n', &mut buffer)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
}
Some(size) => {
let mut bs = vec![0; size];
let mut reader = reader.take(size as u64);
let n = reader
.read_until(b'\n', &mut bs)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
bs.truncate(n);
bs
}
};
Buffer::new(buffer).into_bytes_ref(py)
}
/// Read bytes into a pre-allocated, writable buffer
pub fn readinto(&mut self, buffer: PyBuffer<u8>) -> PyResult<usize> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
if buffer.readonly() {
return Err(PyIOError::new_err("Buffer is not writable."));
}
if !buffer.is_c_contiguous() {
return Err(PyIOError::new_err("Buffer is not C contiguous."));
}
Python::with_gil(|_py| {
let ptr = buffer.buf_ptr();
let nbytes = buffer.len_bytes();
unsafe {
let view: &mut [u8] = std::slice::from_raw_parts_mut(ptr as *mut u8, nbytes);
let z = Read::read(reader, view)?;
Ok(z)
}
})
}
/// Write bytes into the file.
pub fn write(&mut self, bs: &[u8]) -> PyResult<usize> {
let writer = match &mut self.0 {
FileState::Reader(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on read only file.",
));
}
FileState::Writer(w) => w,
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
writer
.write_all(bs)
.map(|_| bs.len())
.map_err(|err| PyIOError::new_err(err.to_string()))
}
/// Change the stream position to the given byte offset.
/// Offset is interpreted relative to the position indicated by `whence`.
/// The default value for whence is `SEEK_SET`. Values for `whence` are:
///
/// * `SEEK_SET` or `0` – start of the stream (the default); offset should be zero or positive
/// * `SEEK_CUR` or `1` – current stream position; offset may be negative
/// * `SEEK_END` or `2` – end of the stream; offset is usually negative
///
/// Return the new absolute position.
#[pyo3(signature = (pos, whence = 0))]
pub fn seek(&mut self, pos: i64, whence: u8) -> PyResult<u64> {
if !self.seekable()? {
return Err(PyIOError::new_err(
"Seek operation is not supported by the backing service.",
));
}
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let whence = match whence {
0 => SeekFrom::Start(pos as u64),
1 => SeekFrom::Current(pos),
2 => SeekFrom::End(pos),
_ => return Err(PyValueError::new_err("invalid whence")),
};
reader
.seek(whence)
.map_err(|err| PyIOError::new_err(err.to_string()))
}
/// Return the current stream position.
pub fn tell(&mut self) -> PyResult<u64> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
reader
.stream_position()
.map_err(|err| PyIOError::new_err(err.to_string()))
}
fn close(&mut self) -> PyResult<()> {
if let FileState::Writer(w) = &mut self.0 {
w.close().map_err(format_pyerr_from_io_error)?;
};
self.0 = FileState::Closed;
Ok(())
}
pub fn __enter__(slf: Py<Self>) -> Py<Self> {
slf
}
pub fn __exit__(
&mut self,
_exc_type: PyObject,
_exc_value: PyObject,
_traceback: PyObject,
) -> PyResult<()> {
self.close()
}
/// Flush the underlying writer. Is a no-op if the file is opened in reading mode.
pub fn flush(&mut self) -> PyResult<()> {
if matches!(self.0, FileState::Reader(_)) {
Ok(())
} else if let FileState::Writer(w) = &mut self.0 {
match w.flush() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
} else {
Ok(())
}
}
/// Return True if the stream can be read from.
pub fn readable(&self) -> PyResult<bool> {
Ok(matches!(self.0, FileState::Reader(_)))
}
/// Return True if the stream can be written to.
pub fn writable(&self) -> PyResult<bool> {
Ok(matches!(self.0, FileState::Writer(_)))
}
/// Return True if the stream can be repositioned.
///
/// In OpenDAL this is limited to only *readable* streams.
pub fn seekable(&self) -> PyResult<bool> {
match &self.0 {
FileState::Reader(_) => Ok(true),
_ => Ok(false),
}
}
/// Return True if the stream is closed.
#[getter]
pub fn closed(&self) -> PyResult<bool> {
Ok(matches!(self.0, FileState::Closed))
}
}
/// A file-like async reader.
/// Can be used as an async context manager.
#[pyclass(module = "opendal")]
pub struct AsyncFile(Arc<Mutex<AsyncFileState>>);
enum AsyncFileState {
Reader(ocore::FuturesAsyncReader),
Writer(ocore::FuturesAsyncWriter),
Closed,
}
impl AsyncFile {
pub fn new_reader(reader: ocore::FuturesAsyncReader) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader))))
}
pub fn new_writer(writer: ocore::FuturesAsyncWriter) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))))
}
}
#[pymethods]
impl AsyncFile {
/// Read and return at most size bytes, or if size is not given, until EOF.
#[pyo3(signature = (size=None))]
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let mut guard = state.lock().await;
let reader = match guard.deref_mut() {
AsyncFileState::Reader(r) => r,
AsyncFileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
_ => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let buffer = match size {
Some(size) => {
// TODO: optimize here by using uninit slice.
let mut bs = vec![0; size];
let n = reader
.read(&mut bs)
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
bs.truncate(n);
bs
}
None => {
let mut buffer = Vec::new();
reader
.read_to_end(&mut buffer)
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
buffer
}
};
Python::with_gil(|py| Buffer::new(buffer).into_bytes(py))
})
}
/// Write bytes into the file.
pub fn write<'p>(&'p mut self, py: Python<'p>, bs: &'p [u8]) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
// FIXME: can we avoid this clone?
let bs = bs.to_vec();
future_into_py(py, async move {
let mut guard = state.lock().await;
let writer = match guard.deref_mut() {
AsyncFileState::Reader(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on read only file.",
));
}
AsyncFileState::Writer(w) => w,
_ => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let len = bs.len();
writer
.write_all(&bs)
.await
.map(|_| len)
.map_err(|err| PyIOError::new_err(err.to_string()))
})
}
/// Change the stream position to the given byte offset.
/// offset is interpreted relative to the position indicated by `whence`.
/// The default value for whence is `SEEK_SET`. Values for `whence` are:
///
/// * `SEEK_SET` or `0` – start of the stream (the default); offset should be zero or positive
/// * `SEEK_CUR` or `1` – current stream position; offset may be negative
/// * `SEEK_END` or `2` – end of the stream; offset is usually negative
///
/// Return the new absolute position.
#[pyo3(signature = (pos, whence = 0))]
pub fn seek<'p>(
&'p mut self,
py: Python<'p>,
pos: i64,
whence: u8,
) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
let whence = match whence {
0 => SeekFrom::Start(pos as u64),
1 => SeekFrom::Current(pos),
2 => SeekFrom::End(pos),
_ => return Err(PyValueError::new_err("invalid whence")),
};
future_into_py(py, async move {
let mut guard = state.lock().await;
let reader = match guard.deref_mut() {
AsyncFileState::Reader(r) => r,
AsyncFileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
_ => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let pos = reader
.seek(whence)
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
Ok(pos)
})
.and_then(|pos| pos.into_bound_py_any(py))
}
/// Return the current stream position.
pub fn tell<'p>(&'p mut self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let mut guard = state.lock().await;
let reader = match guard.deref_mut() {
AsyncFileState::Reader(r) => r,
AsyncFileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
_ => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
}
};
let pos = reader
.stream_position()
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
Ok(pos)
})
.and_then(|pos| pos.into_bound_py_any(py))
}
fn close<'p>(&'p mut self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let mut state = state.lock().await;
if let AsyncFileState::Writer(w) = &mut *state {
w.close().await.map_err(format_pyerr_from_io_error)?;
}
*state = AsyncFileState::Closed;
Ok(())
})
}
fn __aenter__<'a>(slf: PyRef<'a, Self>, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
let slf = slf.into_py_any(py)?;
future_into_py(py, async move { Ok(slf) })
}
fn __aexit__<'a>(
&'a mut self,
py: Python<'a>,
_exc_type: &Bound<'a, PyAny>,
_exc_value: &Bound<'a, PyAny>,
_traceback: &Bound<'a, PyAny>,
) -> PyResult<Bound<'a, PyAny>> {
self.close(py)
}
/// Check if the stream may be read from.
pub fn readable<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Ok(matches!(*state, AsyncFileState::Reader(_)))
})
}
/// Check if the stream may be written to.
pub fn writable<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Ok(matches!(*state, AsyncFileState::Writer(_)))
})
}
/// Check if the stream reader may be re-located.
pub fn seekable<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
if true {
self.readable(py)
} else {
future_into_py(py, async move { Ok(false) })
}
}
/// Check if the stream is closed.
#[getter]
pub fn closed<'p>(&'p self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let state = self.0.clone();
future_into_py(py, async move {
let state = state.lock().await;
Ok(matches!(*state, AsyncFileState::Closed))
})
}
}