blob: 1f21cf882596b547441d925452523ddfb32e0a6c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::{Debug, Formatter};
use std::io::SeekFrom;
use bytes::{Buf, Bytes, BytesMut};
use dav_server::fs::{DavFile, OpenOptions};
use dav_server::fs::{DavMetaData, FsResult};
use dav_server::fs::{FsError, FsFuture};
use futures::FutureExt;
use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use opendal::{FuturesAsyncReader, FuturesAsyncWriter, Operator};
use super::metadata::OpendalMetaData;
use super::utils::*;
/// OpendalFile is a `DavFile` implementation for opendal.
pub struct OpendalFile {
op: Operator,
path: String,
state: State,
buf: BytesMut,
}
impl Debug for OpendalFile {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpendalFile")
.field("path", &self.path)
.field(
"state",
match &self.state {
State::Read { .. } => &"read",
State::Write(_) => &"write",
},
)
.finish()
}
}
enum State {
Read(FuturesAsyncReader),
Write(FuturesAsyncWriter),
}
impl OpendalFile {
/// Create a new opendal file.
pub async fn open(op: Operator, path: String, options: OpenOptions) -> FsResult<Self> {
let state = if options.read {
let r = op
.reader(&path)
.await
.map_err(convert_error)?
.into_futures_async_read(..)
.await
.map_err(convert_error)?;
State::Read(r)
} else if options.write {
let w = op
.writer_with(&path)
.append(options.append)
.await
.map_err(convert_error)?
.into_futures_async_write();
State::Write(w)
} else {
return Err(FsError::NotImplemented);
};
Ok(Self {
op,
path,
state,
buf: BytesMut::new(),
})
}
}
impl DavFile for OpendalFile {
fn metadata(&mut self) -> FsFuture<'_, Box<dyn DavMetaData>> {
async move {
self.op
.stat(&self.path)
.await
.map(|opendal_metadata| {
Box::new(OpendalMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>
})
.map_err(convert_error)
}
.boxed()
}
fn write_buf(&mut self, mut buf: Box<dyn Buf + Send>) -> FsFuture<'_, ()> {
async move {
let State::Write(w) = &mut self.state else {
return Err(FsError::GeneralFailure);
};
w.write_all(&buf.copy_to_bytes(buf.remaining()))
.await
.map_err(|_| FsError::GeneralFailure)?;
Ok(())
}
.boxed()
}
fn write_bytes(&mut self, buf: Bytes) -> FsFuture<'_, ()> {
async move {
let State::Write(w) = &mut self.state else {
return Err(FsError::GeneralFailure);
};
w.write_all(&buf).await.map_err(|_| FsError::GeneralFailure)
}
.boxed()
}
fn read_bytes(&mut self, count: usize) -> FsFuture<'_, Bytes> {
async move {
let State::Read(r) = &mut self.state else {
return Err(FsError::GeneralFailure);
};
self.buf.resize(count, 0);
let len = r
.read(&mut self.buf)
.await
.map_err(|_| FsError::GeneralFailure)?;
Ok(self.buf.split_to(len).freeze())
}
.boxed()
}
fn seek(&mut self, pos: SeekFrom) -> FsFuture<'_, u64> {
async move {
let State::Read(r) = &mut self.state else {
return Err(FsError::GeneralFailure);
};
r.seek(pos).await.map_err(|_| FsError::GeneralFailure)
}
.boxed()
}
fn flush(&mut self) -> FsFuture<'_, ()> {
async move {
let State::Write(w) = &mut self.state else {
return Err(FsError::GeneralFailure);
};
w.flush().await.map_err(|_| FsError::GeneralFailure)?;
w.close().await.map_err(|_| FsError::GeneralFailure)
}
.boxed()
}
}