blob: 39f68ece65a32247d87f434db74155f0d8e8de76 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
use async_trait::async_trait;
use bytes::Buf;
use chrono::{DateTime, Utc};
use hyper::header::CONTENT_LENGTH;
use percent_encoding::percent_decode_str;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, Response, StatusCode};
use serde::Deserialize;
use snafu::{OptionExt, ResultExt, Snafu};
use url::Url;
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Request error: {}", source))]
Request { source: retry::Error },
#[snafu(display("Request error: {}", source))]
Reqwest { source: reqwest::Error },
#[snafu(display("Range request not supported by {}", href))]
RangeNotSupported { href: String },
#[snafu(display("Error decoding PROPFIND response: {}", source))]
InvalidPropFind { source: quick_xml::de::DeError },
#[snafu(display("Missing content size for {}", href))]
MissingSize { href: String },
#[snafu(display("Error getting properties of \"{}\" got \"{}\"", href, status))]
PropStatus { href: String, status: String },
#[snafu(display("Failed to parse href \"{}\": {}", href, source))]
InvalidHref {
href: String,
source: url::ParseError,
},
#[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
NonUnicode {
path: String,
source: std::str::Utf8Error,
},
#[snafu(display("Encountered invalid path \"{}\": {}", path, source))]
InvalidPath {
path: String,
source: crate::path::Error,
},
}
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
Self::Generic {
store: "HTTP",
source: Box::new(err),
}
}
}
/// Internal client for HttpStore
#[derive(Debug)]
pub struct Client {
url: Url,
client: reqwest::Client,
retry_config: RetryConfig,
client_options: ClientOptions,
}
impl Client {
pub fn new(url: Url, client_options: ClientOptions, retry_config: RetryConfig) -> Result<Self> {
let client = client_options.client()?;
Ok(Self {
url,
retry_config,
client_options,
client,
})
}
pub fn base_url(&self) -> &Url {
&self.url
}
fn path_url(&self, location: &Path) -> Url {
let mut url = self.url.clone();
url.path_segments_mut().unwrap().extend(location.parts());
url
}
/// Create a directory with `path` using MKCOL
async fn make_directory(&self, path: &str) -> Result<(), Error> {
let method = Method::from_bytes(b"MKCOL").unwrap();
let mut url = self.url.clone();
url.path_segments_mut()
.unwrap()
.extend(path.split(DELIMITER));
self.client
.request(method, url)
.send_retry(&self.retry_config)
.await
.context(RequestSnafu)?;
Ok(())
}
/// Recursively create parent directories
async fn create_parent_directories(&self, location: &Path) -> Result<()> {
let mut stack = vec![];
// Walk backwards until a request succeeds
let mut last_prefix = location.as_ref();
while let Some((prefix, _)) = last_prefix.rsplit_once(DELIMITER) {
last_prefix = prefix;
match self.make_directory(prefix).await {
Ok(_) => break,
Err(Error::Request { source })
if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
{
// Need to create parent
stack.push(prefix)
}
Err(e) => return Err(e.into()),
}
}
// Retry the failed requests, which should now succeed
for prefix in stack.into_iter().rev() {
self.make_directory(prefix).await?;
}
Ok(())
}
pub async fn put(&self, location: &Path, payload: PutPayload) -> Result<Response> {
let mut retry = false;
loop {
let url = self.path_url(location);
let mut builder = self.client.put(url);
if let Some(value) = self.client_options.get_content_type(location) {
builder = builder.header(CONTENT_TYPE, value);
}
let resp = builder
.header(CONTENT_LENGTH, payload.content_length())
.retryable(&self.retry_config)
.idempotent(true)
.payload(Some(payload.clone()))
.send()
.await;
match resp {
Ok(response) => return Ok(response),
Err(source) => match source.status() {
// Some implementations return 404 instead of 409
Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
retry = true;
self.create_parent_directories(location).await?
}
_ => return Err(Error::Request { source }.into()),
},
}
}
}
pub async fn list(&self, location: Option<&Path>, depth: &str) -> Result<MultiStatus> {
let url = location
.map(|path| self.path_url(path))
.unwrap_or_else(|| self.url.clone());
let method = Method::from_bytes(b"PROPFIND").unwrap();
let result = self
.client
.request(method, url)
.header("Depth", depth)
.retryable(&self.retry_config)
.idempotent(true)
.send()
.await;
let response = match result {
Ok(result) => result.bytes().await.context(ReqwestSnafu)?,
Err(e) if matches!(e.status(), Some(StatusCode::NOT_FOUND)) => {
return match depth {
"0" => {
let path = location.map(|x| x.as_ref()).unwrap_or("");
Err(crate::Error::NotFound {
path: path.to_string(),
source: Box::new(e),
})
}
_ => {
// If prefix not found, return empty result set
Ok(Default::default())
}
};
}
Err(source) => return Err(Error::Request { source }.into()),
};
let status = quick_xml::de::from_reader(response.reader()).context(InvalidPropFindSnafu)?;
Ok(status)
}
pub async fn delete(&self, path: &Path) -> Result<()> {
let url = self.path_url(path);
self.client
.delete(url)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
Some(StatusCode::NOT_FOUND) => crate::Error::NotFound {
source: Box::new(source),
path: path.to_string(),
},
_ => Error::Request { source }.into(),
})?;
Ok(())
}
pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
let mut retry = false;
loop {
let method = Method::from_bytes(b"COPY").unwrap();
let mut builder = self
.client
.request(method, self.path_url(from))
.header("Destination", self.path_url(to).as_str());
if !overwrite {
// While the Overwrite header appears to duplicate
// the functionality of the If-Match: * header of HTTP/1.1, If-Match
// applies only to the Request-URI, and not to the Destination of a COPY
// or MOVE.
builder = builder.header("Overwrite", "F");
}
return match builder.send_retry(&self.retry_config).await {
Ok(_) => Ok(()),
Err(source) => Err(match source.status() {
Some(StatusCode::PRECONDITION_FAILED) if !overwrite => {
crate::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(source),
}
}
// Some implementations return 404 instead of 409
Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
retry = true;
self.create_parent_directories(to).await?;
continue;
}
_ => Error::Request { source }.into(),
}),
};
}
}
}
#[async_trait]
impl GetClient for Client {
const STORE: &'static str = "HTTP";
/// Override the [`HeaderConfig`] to be less strict to support a
/// broader range of HTTP servers (#4831)
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: None,
};
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let url = self.path_url(path);
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
};
let has_range = options.range.is_some();
let builder = self.client.request(method, url);
let res = builder
.with_get_options(options)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: path.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;
// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if has_range && res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: path.to_string(),
}),
});
}
Ok(res)
}
}
/// The response returned by a PROPFIND request, i.e. list
#[derive(Deserialize, Default)]
pub struct MultiStatus {
pub response: Vec<MultiStatusResponse>,
}
#[derive(Deserialize)]
pub struct MultiStatusResponse {
href: String,
#[serde(rename = "propstat")]
prop_stat: PropStat,
}
impl MultiStatusResponse {
/// Returns an error if this response is not OK
pub fn check_ok(&self) -> Result<()> {
match self.prop_stat.status.contains("200 OK") {
true => Ok(()),
false => Err(Error::PropStatus {
href: self.href.clone(),
status: self.prop_stat.status.clone(),
}
.into()),
}
}
/// Returns the resolved path of this element relative to `base_url`
pub fn path(&self, base_url: &Url) -> Result<Path> {
let url = Url::options()
.base_url(Some(base_url))
.parse(&self.href)
.context(InvalidHrefSnafu { href: &self.href })?;
// Reverse any percent encoding
let path = percent_decode_str(url.path())
.decode_utf8()
.context(NonUnicodeSnafu { path: url.path() })?;
Ok(Path::parse(path.as_ref()).context(InvalidPathSnafu { path })?)
}
fn size(&self) -> Result<usize> {
let size = self
.prop_stat
.prop
.content_length
.context(MissingSizeSnafu { href: &self.href })?;
Ok(size)
}
/// Returns this objects metadata as [`ObjectMeta`]
pub fn object_meta(&self, base_url: &Url) -> Result<ObjectMeta> {
let last_modified = self.prop_stat.prop.last_modified;
Ok(ObjectMeta {
location: self.path(base_url)?,
last_modified,
size: self.size()?,
e_tag: self.prop_stat.prop.e_tag.clone(),
version: None,
})
}
/// Returns true if this is a directory / collection
pub fn is_dir(&self) -> bool {
self.prop_stat.prop.resource_type.collection.is_some()
}
}
#[derive(Deserialize)]
pub struct PropStat {
prop: Prop,
status: String,
}
#[derive(Deserialize)]
pub struct Prop {
#[serde(deserialize_with = "deserialize_rfc1123", rename = "getlastmodified")]
last_modified: DateTime<Utc>,
#[serde(rename = "getcontentlength")]
content_length: Option<usize>,
#[serde(rename = "resourcetype")]
resource_type: ResourceType,
#[serde(rename = "getetag")]
e_tag: Option<String>,
}
#[derive(Deserialize)]
pub struct ResourceType {
collection: Option<()>,
}