blob: efce4d601b55aab5e53a05b9c2e02a96b2a6f426 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use futures::Stream;
use object_store::ObjectMeta;
use opendal::Metadata;
use std::future::IntoFuture;
use crate::timestamp_to_datetime;
/// Conditionally add the `Send` marker trait for the wrapped type.
/// Only take effect when the `send_wrapper` feature is enabled.
#[cfg(not(feature = "send_wrapper"))]
use noop_wrapper::NoopWrapper as SendWrapper;
#[cfg(feature = "send_wrapper")]
use send_wrapper::SendWrapper;
/// Format `opendal::Error` to `object_store::Error`.
pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
use opendal::ErrorKind;
match err.kind() {
ErrorKind::NotFound => object_store::Error::NotFound {
path: path.to_string(),
source: Box::new(err),
},
ErrorKind::Unsupported => object_store::Error::NotSupported {
source: Box::new(err),
},
ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
path: path.to_string(),
source: Box::new(err),
},
ErrorKind::ConditionNotMatch => object_store::Error::Precondition {
path: path.to_string(),
source: Box::new(err),
},
kind => object_store::Error::Generic {
store: kind.into_static(),
source: Box::new(err),
},
}
}
/// Format `opendal::Metadata` to `object_store::ObjectMeta`.
pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
ObjectMeta {
location: path.into(),
last_modified: meta
.last_modified()
.and_then(timestamp_to_datetime)
.unwrap_or_default(),
size: meta.content_length(),
e_tag: meta.etag().map(|x| x.to_string()),
version: meta.version().map(|x| x.to_string()),
}
}
/// Make given future `Send`.
pub trait IntoSendFuture {
type Output;
fn into_send(self) -> Self::Output;
}
impl<T> IntoSendFuture for T
where
T: IntoFuture,
{
type Output = SendWrapper<T::IntoFuture>;
fn into_send(self) -> Self::Output {
SendWrapper::new(self.into_future())
}
}
/// Make given Stream `Send`.
pub trait IntoSendStream {
type Output;
fn into_send(self) -> Self::Output;
}
impl<T> IntoSendStream for T
where
T: Stream,
{
type Output = SendWrapper<T>;
fn into_send(self) -> Self::Output {
SendWrapper::new(self)
}
}
#[cfg(not(feature = "send_wrapper"))]
mod noop_wrapper {
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use futures::Future;
use futures::Stream;
use pin_project::pin_project;
#[pin_project]
pub struct NoopWrapper<T> {
#[pin]
item: T,
}
impl<T> Future for NoopWrapper<T>
where
T: Future,
{
type Output = T::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.item.poll(cx)
}
}
impl<T> Stream for NoopWrapper<T>
where
T: Stream,
{
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.item.poll_next(cx)
}
}
impl<T> NoopWrapper<T> {
pub fn new(item: T) -> Self {
Self { item }
}
}
}