| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An object store implementation for generic HTTP servers |
| //! |
| //! This follows [rfc2518] commonly known as [WebDAV] |
| //! |
| //! Basic get support will work out of the box with most HTTP servers, |
| //! even those that don't explicitly support [rfc2518] |
| //! |
| //! Other operations such as list, delete, copy, etc... will likely |
| //! require server-side configuration. A list of HTTP servers with support |
| //! can be found [here](https://wiki.archlinux.org/title/WebDAV#Server) |
| //! |
| //! Multipart uploads are not currently supported |
| //! |
| //! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518 |
| //! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV |
| |
| use async_trait::async_trait; |
| use futures::stream::BoxStream; |
| use futures::{StreamExt, TryStreamExt}; |
| use itertools::Itertools; |
| use snafu::{OptionExt, ResultExt, Snafu}; |
| use url::Url; |
| |
| use crate::client::get::GetClientExt; |
| use crate::client::header::get_etag; |
| use crate::http::client::Client; |
| use crate::path::Path; |
| use crate::{ |
| ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, |
| ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, RetryConfig, |
| }; |
| |
| mod client; |
| |
| #[derive(Debug, Snafu)] |
| enum Error { |
| #[snafu(display("Must specify a URL"))] |
| MissingUrl, |
| |
| #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))] |
| UnableToParseUrl { |
| source: url::ParseError, |
| url: String, |
| }, |
| |
| #[snafu(display("Unable to extract metadata from headers: {}", source))] |
| Metadata { |
| source: crate::client::header::Error, |
| }, |
| |
| #[snafu(display("Request error: {}", source))] |
| Reqwest { source: reqwest::Error }, |
| } |
| |
| impl From<Error> for crate::Error { |
| fn from(err: Error) -> Self { |
| Self::Generic { |
| store: "HTTP", |
| source: Box::new(err), |
| } |
| } |
| } |
| |
| /// An [`ObjectStore`] implementation for generic HTTP servers |
| /// |
| /// See [`crate::http`] for more information |
| #[derive(Debug)] |
| pub struct HttpStore { |
| client: Client, |
| } |
| |
| impl std::fmt::Display for HttpStore { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "HttpStore") |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for HttpStore { |
| async fn put_opts( |
| &self, |
| location: &Path, |
| payload: PutPayload, |
| opts: PutOptions, |
| ) -> Result<PutResult> { |
| if opts.mode != PutMode::Overwrite { |
| // TODO: Add support for If header - https://datatracker.ietf.org/doc/html/rfc2518#section-9.4 |
| return Err(crate::Error::NotImplemented); |
| } |
| |
| let response = self.client.put(location, payload).await?; |
| let e_tag = match get_etag(response.headers()) { |
| Ok(e_tag) => Some(e_tag), |
| Err(crate::client::header::Error::MissingEtag) => None, |
| Err(source) => return Err(Error::Metadata { source }.into()), |
| }; |
| |
| Ok(PutResult { |
| e_tag, |
| version: None, |
| }) |
| } |
| |
| async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn MultipartUpload>> { |
| Err(crate::Error::NotImplemented) |
| } |
| |
| async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> { |
| self.client.get_opts(location, options).await |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| self.client.delete(location).await |
| } |
| |
| fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> { |
| let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default(); |
| let prefix = prefix.cloned(); |
| futures::stream::once(async move { |
| let status = self.client.list(prefix.as_ref(), "infinity").await?; |
| |
| let iter = status |
| .response |
| .into_iter() |
| .filter(|r| !r.is_dir()) |
| .map(|response| { |
| response.check_ok()?; |
| response.object_meta(self.client.base_url()) |
| }) |
| // Filter out exact prefix matches |
| .filter_ok(move |r| r.location.as_ref().len() > prefix_len); |
| |
| Ok::<_, crate::Error>(futures::stream::iter(iter)) |
| }) |
| .try_flatten() |
| .boxed() |
| } |
| |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| let status = self.client.list(prefix, "1").await?; |
| let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or(0); |
| |
| let mut objects: Vec<ObjectMeta> = Vec::with_capacity(status.response.len()); |
| let mut common_prefixes = Vec::with_capacity(status.response.len()); |
| for response in status.response { |
| response.check_ok()?; |
| match response.is_dir() { |
| false => { |
| let meta = response.object_meta(self.client.base_url())?; |
| // Filter out exact prefix matches |
| if meta.location.as_ref().len() > prefix_len { |
| objects.push(meta); |
| } |
| } |
| true => { |
| let path = response.path(self.client.base_url())?; |
| // Exclude the current object |
| if path.as_ref().len() > prefix_len { |
| common_prefixes.push(path); |
| } |
| } |
| } |
| } |
| |
| Ok(ListResult { |
| common_prefixes, |
| objects, |
| }) |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| self.client.copy(from, to, true).await |
| } |
| |
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
| self.client.copy(from, to, false).await |
| } |
| } |
| |
| /// Configure a connection to a generic HTTP server |
| #[derive(Debug, Default, Clone)] |
| pub struct HttpBuilder { |
| url: Option<String>, |
| client_options: ClientOptions, |
| retry_config: RetryConfig, |
| } |
| |
| impl HttpBuilder { |
| /// Create a new [`HttpBuilder`] with default values. |
| pub fn new() -> Self { |
| Default::default() |
| } |
| |
| /// Set the URL |
| pub fn with_url(mut self, url: impl Into<String>) -> Self { |
| self.url = Some(url.into()); |
| self |
| } |
| |
| /// Set the retry configuration |
| pub fn with_retry(mut self, retry_config: RetryConfig) -> Self { |
| self.retry_config = retry_config; |
| self |
| } |
| |
| /// Set individual client configuration without overriding the entire config |
| pub fn with_config(mut self, key: ClientConfigKey, value: impl Into<String>) -> Self { |
| self.client_options = self.client_options.with_config(key, value); |
| self |
| } |
| |
| /// Sets the client options, overriding any already set |
| pub fn with_client_options(mut self, options: ClientOptions) -> Self { |
| self.client_options = options; |
| self |
| } |
| |
| /// Build an [`HttpStore`] with the configured options |
| pub fn build(self) -> Result<HttpStore> { |
| let url = self.url.context(MissingUrlSnafu)?; |
| let parsed = Url::parse(&url).context(UnableToParseUrlSnafu { url })?; |
| |
| Ok(HttpStore { |
| client: Client::new(parsed, self.client_options, self.retry_config)?, |
| }) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use crate::tests::*; |
| |
| use super::*; |
| |
| #[tokio::test] |
| async fn http_test() { |
| crate::test_util::maybe_skip_integration!(); |
| let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set"); |
| let options = ClientOptions::new().with_allow_http(true); |
| let integration = HttpBuilder::new() |
| .with_url(url) |
| .with_client_options(options) |
| .build() |
| .unwrap(); |
| |
| put_get_delete_list_opts(&integration).await; |
| list_uses_directories_correctly(&integration).await; |
| list_with_delimiter(&integration).await; |
| rename_and_copy(&integration).await; |
| copy_if_not_exists(&integration).await; |
| } |
| } |