blob: 7ebcc2a88412e051064ac6a8d232d931bcf3c1d6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! An object store implementation for S3
use crate::util::format_http_range;
use crate::{
collect_bytes,
path::{Path, DELIMITER},
util::format_prefix,
GetResult, ListResult, ObjectMeta, ObjectStore, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{
stream::{self, BoxStream},
Future, Stream, StreamExt, TryStreamExt,
};
use hyper::client::Builder as HyperBuilder;
use rusoto_core::ByteStream;
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
use rusoto_s3::S3;
use rusoto_sts::WebIdentityProvider;
use snafu::{OptionExt, ResultExt, Snafu};
use std::ops::Range;
use std::{
convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration,
};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{debug, warn};
/// The maximum number of times a request will be retried in the case of an AWS server error
pub const MAX_NUM_RETRIES: u32 = 3;
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display(
"Expected streamed data to have length {}, got {}",
expected,
actual
))]
DataDoesNotMatchLength { expected: usize, actual: usize },
#[snafu(display(
"Did not receive any data. Bucket: {}, Location: {}",
bucket,
path
))]
NoData { bucket: String, path: String },
#[snafu(display(
"Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToDeleteData {
source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToGetData {
source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to HEAD data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToHeadData {
source: rusoto_core::RusotoError<rusoto_s3::HeadObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToGetPieceOfData {
source: std::io::Error,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
bucket,
path,
source,
source,
))]
UnableToPutData {
source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
bucket: String,
path: String,
},
#[snafu(display(
"Unable to list data. Bucket: {}, Error: {} ({:?})",
bucket,
source,
source,
))]
UnableToListData {
source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>,
bucket: String,
},
#[snafu(display(
"Unable to copy object. Bucket: {}, From: {}, To: {}, Error: {}",
bucket,
from,
to,
source,
))]
UnableToCopyObject {
source: rusoto_core::RusotoError<rusoto_s3::CopyObjectError>,
bucket: String,
from: String,
to: String,
},
#[snafu(display(
"Unable to parse last modified date. Bucket: {}, Error: {} ({:?})",
bucket,
source,
source,
))]
UnableToParseLastModified {
source: chrono::ParseError,
bucket: String,
},
#[snafu(display(
"Unable to buffer data into temporary file, Error: {} ({:?})",
source,
source,
))]
UnableToBufferStream { source: std::io::Error },
#[snafu(display(
"Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})",
region,
source,
source,
))]
InvalidRegion {
region: String,
source: rusoto_core::region::ParseRegionError,
},
#[snafu(display("Missing aws-access-key"))]
MissingAccessKey,
#[snafu(display("Missing aws-secret-access-key"))]
MissingSecretAccessKey,
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NotFound { path, source } => Self::NotFound { path, source },
_ => Self::Generic {
store: "S3",
source: Box::new(source),
},
}
}
}
/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
pub struct AmazonS3 {
/// S3 client w/o any connection limit.
///
/// You should normally use [`Self::client`] instead.
client_unrestricted: rusoto_s3::S3Client,
/// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted).
connection_semaphore: Arc<Semaphore>,
/// Bucket name used by this object store client.
bucket_name: String,
}
impl fmt::Debug for AmazonS3 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AmazonS3")
.field("client", &"rusoto_s3::S3Client")
.field("bucket_name", &self.bucket_name)
.finish()
}
}
impl fmt::Display for AmazonS3 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AmazonS3({})", self.bucket_name)
}
}
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let bucket_name = self.bucket_name.clone();
let request_factory = move || {
let bytes = bytes.clone();
let length = bytes.len();
let stream_data = Ok(bytes);
let stream = futures::stream::once(async move { stream_data });
let byte_stream = ByteStream::new_with_size(stream, length);
rusoto_s3::PutObjectRequest {
bucket: bucket_name.clone(),
key: location.to_string(),
body: Some(byte_stream),
..Default::default()
}
};
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.put_object(request_factory()).await }
})
.await
.context(UnableToPutDataSnafu {
bucket: &self.bucket_name,
path: location.as_ref(),
})?;
Ok(())
}
async fn get(&self, location: &Path) -> Result<GetResult> {
Ok(GetResult::Stream(
self.get_object(location, None).await?.boxed(),
))
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let size_hint = range.end - range.start;
let stream = self.get_object(location, Some(range)).await?;
collect_bytes(stream, Some(size_hint)).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let key = location.to_string();
let head_request = rusoto_s3::HeadObjectRequest {
bucket: self.bucket_name.clone(),
key: key.clone(),
..Default::default()
};
let s = self
.client()
.await
.head_object(head_request)
.await
.map_err(|e| match e {
rusoto_core::RusotoError::Service(
rusoto_s3::HeadObjectError::NoSuchKey(_),
) => Error::NotFound {
path: key.clone(),
source: e.into(),
},
rusoto_core::RusotoError::Unknown(h) if h.status.as_u16() == 404 => {
Error::NotFound {
path: key.clone(),
source: "resource not found".into(),
}
}
_ => Error::UnableToHeadData {
bucket: self.bucket_name.to_owned(),
path: key.clone(),
source: e,
},
})?;
// Note: GetObject and HeadObject return a different date format from ListObjects
//
// S3 List returns timestamps in the form
// <LastModified>2013-09-17T18:07:53.000Z</LastModified>
// S3 GetObject returns timestamps in the form
// Last-Modified: Sun, 1 Jan 2006 12:00:00 GMT
let last_modified = match s.last_modified {
Some(lm) => DateTime::parse_from_rfc2822(&lm)
.context(UnableToParseLastModifiedSnafu {
bucket: &self.bucket_name,
})?
.with_timezone(&Utc),
None => Utc::now(),
};
Ok(ObjectMeta {
last_modified,
location: location.clone(),
size: usize::try_from(s.content_length.unwrap_or(0))
.expect("unsupported size on this platform"),
})
}
async fn delete(&self, location: &Path) -> Result<()> {
let bucket_name = self.bucket_name.clone();
let request_factory = move || rusoto_s3::DeleteObjectRequest {
bucket: bucket_name.clone(),
key: location.to_string(),
..Default::default()
};
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.delete_object(request_factory()).await }
})
.await
.context(UnableToDeleteDataSnafu {
bucket: &self.bucket_name,
path: location.as_ref(),
})?;
Ok(())
}
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
Ok(self
.list_objects_v2(prefix, None)
.await?
.map_ok(move |list_objects_v2_result| {
let contents = list_objects_v2_result.contents.unwrap_or_default();
let iter = contents
.into_iter()
.map(|object| convert_object_meta(object, &self.bucket_name));
futures::stream::iter(iter)
})
.try_flatten()
.boxed())
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Ok(self
.list_objects_v2(prefix, Some(DELIMITER.to_string()))
.await?
.try_fold(
ListResult {
common_prefixes: vec![],
objects: vec![],
},
|acc, list_objects_v2_result| async move {
let mut res = acc;
let contents = list_objects_v2_result.contents.unwrap_or_default();
let mut objects = contents
.into_iter()
.map(|object| convert_object_meta(object, &self.bucket_name))
.collect::<Result<Vec<_>>>()?;
res.objects.append(&mut objects);
let prefixes =
list_objects_v2_result.common_prefixes.unwrap_or_default();
res.common_prefixes.reserve(prefixes.len());
for p in prefixes {
let prefix =
p.prefix.expect("can't have a prefix without a value");
res.common_prefixes.push(Path::parse(prefix)?);
}
Ok(res)
},
)
.await?)
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = from.as_ref();
let to = to.as_ref();
let bucket_name = self.bucket_name.clone();
let request_factory = move || rusoto_s3::CopyObjectRequest {
bucket: bucket_name.clone(),
copy_source: format!("{}/{}", &bucket_name, from),
key: to.to_string(),
..Default::default()
};
let s3 = self.client().await;
s3_request(move || {
let (s3, request_factory) = (s3.clone(), request_factory.clone());
async move { s3.copy_object(request_factory()).await }
})
.await
.context(UnableToCopyObjectSnafu {
bucket: &self.bucket_name,
from,
to,
})?;
Ok(())
}
async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) -> Result<()> {
// Will need dynamodb_lock
Err(crate::Error::NotImplemented)
}
}
fn convert_object_meta(object: rusoto_s3::Object, bucket: &str) -> Result<ObjectMeta> {
let key = object.key.expect("object doesn't exist without a key");
let location = Path::parse(key)?;
let last_modified = match object.last_modified {
Some(lm) => DateTime::parse_from_rfc3339(&lm)
.context(UnableToParseLastModifiedSnafu { bucket })?
.with_timezone(&Utc),
None => Utc::now(),
};
let size = usize::try_from(object.size.unwrap_or(0))
.expect("unsupported size on this platform");
Ok(ObjectMeta {
location,
last_modified,
size,
})
}
/// Configure a connection to Amazon S3 using the specified credentials in
/// the specified Amazon region and bucket.
#[allow(clippy::too_many_arguments)]
pub fn new_s3(
access_key_id: Option<impl Into<String>>,
secret_access_key: Option<impl Into<String>>,
region: impl Into<String>,
bucket_name: impl Into<String>,
endpoint: Option<impl Into<String>>,
session_token: Option<impl Into<String>>,
max_connections: NonZeroUsize,
allow_http: bool,
) -> Result<AmazonS3> {
let region = region.into();
let region: rusoto_core::Region = match endpoint {
None => region.parse().context(InvalidRegionSnafu { region })?,
Some(endpoint) => rusoto_core::Region::Custom {
name: region,
endpoint: endpoint.into(),
},
};
let mut builder = HyperBuilder::default();
builder.pool_max_idle_per_host(max_connections.get());
let connector = if allow_http {
hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build()
} else {
hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_only()
.enable_http1()
.enable_http2()
.build()
};
let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector);
let client = match (access_key_id, secret_access_key, session_token) {
(Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
let credentials_provider = StaticProvider::new(
access_key_id.into(),
secret_access_key.into(),
Some(session_token.into()),
None,
);
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(Some(access_key_id), Some(secret_access_key), None) => {
let credentials_provider = StaticProvider::new_minimal(
access_key_id.into(),
secret_access_key.into(),
);
rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
}
(None, Some(_), _) => return Err(Error::MissingAccessKey.into()),
(Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
_ if std::env::var_os("AWS_WEB_IDENTITY_TOKEN_FILE").is_some() => {
rusoto_s3::S3Client::new_with(
http_client,
WebIdentityProvider::from_k8s_env(),
region,
)
}
_ => rusoto_s3::S3Client::new_with(
http_client,
InstanceMetadataProvider::new(),
region,
),
};
Ok(AmazonS3 {
client_unrestricted: client,
connection_semaphore: Arc::new(Semaphore::new(max_connections.get())),
bucket_name: bucket_name.into(),
})
}
/// Create a new [`AmazonS3`] that always errors
pub fn new_failing_s3() -> Result<AmazonS3> {
new_s3(
Some("foo"),
Some("bar"),
"us-east-1",
"bucket",
None as Option<&str>,
None as Option<&str>,
NonZeroUsize::new(16).unwrap(),
true,
)
}
/// S3 client bundled w/ a semaphore permit.
#[derive(Clone)]
struct SemaphoreClient {
/// Permit for this specific use of the client.
///
/// Note that this field is never read and therefore considered "dead code" by rustc.
#[allow(dead_code)]
permit: Arc<OwnedSemaphorePermit>,
inner: rusoto_s3::S3Client,
}
impl Deref for SemaphoreClient {
type Target = rusoto_s3::S3Client;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl AmazonS3 {
/// Get a client according to the current connection limit.
async fn client(&self) -> SemaphoreClient {
let permit = Arc::clone(&self.connection_semaphore)
.acquire_owned()
.await
.expect("semaphore shouldn't be closed yet");
SemaphoreClient {
permit: Arc::new(permit),
inner: self.client_unrestricted.clone(),
}
}
async fn get_object(
&self,
location: &Path,
range: Option<Range<usize>>,
) -> Result<impl Stream<Item = Result<Bytes>>> {
let key = location.to_string();
let get_request = rusoto_s3::GetObjectRequest {
bucket: self.bucket_name.clone(),
key: key.clone(),
range: range.map(format_http_range),
..Default::default()
};
let bucket_name = self.bucket_name.clone();
let stream = self
.client()
.await
.get_object(get_request)
.await
.map_err(|e| match e {
rusoto_core::RusotoError::Service(
rusoto_s3::GetObjectError::NoSuchKey(_),
) => Error::NotFound {
path: key.clone(),
source: e.into(),
},
_ => Error::UnableToGetData {
bucket: self.bucket_name.to_owned(),
path: key.clone(),
source: e,
},
})?
.body
.context(NoDataSnafu {
bucket: self.bucket_name.to_owned(),
path: key.clone(),
})?
.map_err(move |source| Error::UnableToGetPieceOfData {
source,
bucket: bucket_name.clone(),
path: key.clone(),
})
.err_into();
Ok(stream)
}
async fn list_objects_v2(
&self,
prefix: Option<&Path>,
delimiter: Option<String>,
) -> Result<BoxStream<'_, Result<rusoto_s3::ListObjectsV2Output>>> {
enum ListState {
Start,
HasMore(String),
Done,
}
let prefix = format_prefix(prefix);
let bucket = self.bucket_name.clone();
let request_factory = move || rusoto_s3::ListObjectsV2Request {
bucket,
prefix,
delimiter,
..Default::default()
};
let s3 = self.client().await;
Ok(stream::unfold(ListState::Start, move |state| {
let request_factory = request_factory.clone();
let s3 = s3.clone();
async move {
let continuation_token = match &state {
ListState::HasMore(continuation_token) => Some(continuation_token),
ListState::Done => {
return None;
}
// If this is the first request we've made, we don't need to make any
// modifications to the request
ListState::Start => None,
};
let resp = s3_request(move || {
let (s3, request_factory, continuation_token) = (
s3.clone(),
request_factory.clone(),
continuation_token.cloned(),
);
async move {
s3.list_objects_v2(rusoto_s3::ListObjectsV2Request {
continuation_token,
..request_factory()
})
.await
}
})
.await;
let resp = match resp {
Ok(resp) => resp,
Err(e) => return Some((Err(e), state)),
};
// The AWS response contains a field named `is_truncated` as well as
// `next_continuation_token`, and we're assuming that `next_continuation_token`
// is only set when `is_truncated` is true (and therefore not
// checking `is_truncated`).
let next_state = if let Some(next_continuation_token) =
&resp.next_continuation_token
{
ListState::HasMore(next_continuation_token.to_string())
} else {
ListState::Done
};
Some((Ok(resp), next_state))
}
})
.map_err(move |e| {
Error::UnableToListData {
source: e,
bucket: self.bucket_name.clone(),
}
.into()
})
.boxed())
}
}
/// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors.
///
/// The `future_factory` argument is a function `F` that takes no arguments and, when called, will
/// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through
/// `rusoto` and return a `Result` that returns some type `R` on success and some
/// `rusoto_core::RusotoError<E>` on error.
///
/// If the executed `Future` returns success, this function will return that success.
/// If the executed `Future` returns a 5xx server error, this function will wait an amount of
/// time that increases exponentially with the number of times it has retried, get a new `Future` by
/// calling `future_factory` again, and retry the request by `await`ing the `Future` again.
/// The retries will continue until the maximum number of retries has been attempted. In that case,
/// this function will return the last encountered error.
///
/// Client errors (4xx) will never be retried by this function.
async fn s3_request<E, F, G, R>(
future_factory: F,
) -> Result<R, rusoto_core::RusotoError<E>>
where
E: std::error::Error + Send,
F: Fn() -> G + Send,
G: Future<Output = Result<R, rusoto_core::RusotoError<E>>> + Send,
R: Send,
{
let mut attempts = 0;
loop {
let request = future_factory();
let result = request.await;
match result {
Ok(r) => return Ok(r),
Err(error) => {
attempts += 1;
let should_retry = matches!(
error,
rusoto_core::RusotoError::Unknown(ref response)
if response.status.is_server_error()
);
if attempts > MAX_NUM_RETRIES {
warn!(
?error,
attempts, "maximum number of retries exceeded for AWS S3 request"
);
return Err(error);
} else if !should_retry {
return Err(error);
} else {
debug!(?error, attempts, "retrying AWS S3 request");
let wait_time = Duration::from_millis(2u64.pow(attempts) * 50);
tokio::time::sleep(wait_time).await;
}
}
}
}
}
impl Error {
#[cfg(test)]
fn s3_error_due_to_credentials(&self) -> bool {
use rusoto_core::RusotoError;
use Error::*;
matches!(
self,
UnableToPutData {
source: RusotoError::Credentials(_),
bucket: _,
path: _,
} | UnableToGetData {
source: RusotoError::Credentials(_),
bucket: _,
path: _,
} | UnableToDeleteData {
source: RusotoError::Credentials(_),
bucket: _,
path: _,
} | UnableToListData {
source: RusotoError::Credentials(_),
bucket: _,
}
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy,
},
Error as ObjectStoreError, ObjectStore,
};
use bytes::Bytes;
use std::env;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = TestError> = std::result::Result<T, E>;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[derive(Debug)]
struct AwsConfig {
access_key_id: String,
secret_access_key: String,
region: String,
bucket: String,
endpoint: Option<String>,
token: Option<String>,
}
// Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
macro_rules! maybe_skip_integration {
() => {{
dotenv::dotenv().ok();
let required_vars = [
"AWS_DEFAULT_REGION",
"OBJECT_STORE_BUCKET",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
);
} else if force.is_err() {
eprintln!(
"skipping AWS integration test - set {}TEST_INTEGRATION to run",
if unset_var_names.is_empty() {
String::new()
} else {
format!("{} and ", unset_var_names)
}
);
return;
} else {
AwsConfig {
access_key_id: env::var("AWS_ACCESS_KEY_ID")
.expect("already checked AWS_ACCESS_KEY_ID"),
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY")
.expect("already checked AWS_SECRET_ACCESS_KEY"),
region: env::var("AWS_DEFAULT_REGION")
.expect("already checked AWS_DEFAULT_REGION"),
bucket: env::var("OBJECT_STORE_BUCKET")
.expect("already checked OBJECT_STORE_BUCKET"),
endpoint: env::var("AWS_ENDPOINT").ok(),
token: env::var("AWS_SESSION_TOKEN").ok(),
}
}
}};
}
fn check_credentials<T>(r: Result<T>) -> Result<T> {
if let Err(e) = &r {
let e = &**e;
if let Some(e) = e.downcast_ref::<Error>() {
if e.s3_error_due_to_credentials() {
eprintln!(
"Try setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY \
environment variables"
);
}
}
}
r
}
fn make_integration(config: AwsConfig) -> AmazonS3 {
new_s3(
Some(config.access_key_id),
Some(config.secret_access_key),
config.region,
config.bucket,
config.endpoint,
config.token,
NonZeroUsize::new(16).unwrap(),
true,
)
.expect("Valid S3 config")
}
#[tokio::test]
async fn s3_test() {
let config = maybe_skip_integration!();
let integration = make_integration(config);
check_credentials(put_get_delete_list(&integration).await).unwrap();
check_credentials(list_uses_directories_correctly(&integration).await).unwrap();
check_credentials(list_with_delimiter(&integration).await).unwrap();
check_credentials(rename_and_copy(&integration).await).unwrap();
}
#[tokio::test]
async fn s3_test_get_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let ObjectStoreError::NotFound { path, source } = err {
let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>();
assert!(
matches!(
source_variant,
Some(rusoto_core::RusotoError::Service(
rusoto_s3::GetObjectError::NoSuchKey(_)
)),
),
"got: {:?}",
source_variant
);
assert_eq!(path, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {:?}", err);
}
}
#[tokio::test]
async fn s3_test_get_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.get(&location).await.unwrap_err().to_string();
assert!(
err.contains("The specified bucket does not exist"),
"{}",
err
)
}
#[tokio::test]
async fn s3_test_put_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let data = Bytes::from("arbitrary data");
let err = integration
.put(&location, data)
.await
.unwrap_err()
.to_string();
assert!(
err.contains("The specified bucket does not exist")
&& err.contains("Unable to PUT data"),
"{}",
err
)
}
#[tokio::test]
async fn s3_test_delete_nonexistent_location() {
let config = maybe_skip_integration!();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
integration.delete(&location).await.unwrap();
}
#[tokio::test]
async fn s3_test_delete_nonexistent_bucket() {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = make_integration(config);
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err().to_string();
assert!(
err.contains("The specified bucket does not exist")
&& err.contains("Unable to DELETE data"),
"{}",
err
)
}
}