| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An object store implementation for S3 |
| use crate::util::format_http_range; |
| use crate::{ |
| collect_bytes, |
| path::{Path, DELIMITER}, |
| util::format_prefix, |
| GetResult, ListResult, ObjectMeta, ObjectStore, Result, |
| }; |
| use async_trait::async_trait; |
| use bytes::Bytes; |
| use chrono::{DateTime, Utc}; |
| use futures::{ |
| stream::{self, BoxStream}, |
| Future, Stream, StreamExt, TryStreamExt, |
| }; |
| use hyper::client::Builder as HyperBuilder; |
| use rusoto_core::ByteStream; |
| use rusoto_credential::{InstanceMetadataProvider, StaticProvider}; |
| use rusoto_s3::S3; |
| use rusoto_sts::WebIdentityProvider; |
| use snafu::{OptionExt, ResultExt, Snafu}; |
| use std::ops::Range; |
| use std::{ |
| convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration, |
| }; |
| use tokio::sync::{OwnedSemaphorePermit, Semaphore}; |
| use tracing::{debug, warn}; |
| |
| /// The maximum number of times a request will be retried in the case of an AWS server error |
| pub const MAX_NUM_RETRIES: u32 = 3; |
| |
| /// A specialized `Error` for object store-related errors |
| #[derive(Debug, Snafu)] |
| #[allow(missing_docs)] |
| enum Error { |
| #[snafu(display( |
| "Expected streamed data to have length {}, got {}", |
| expected, |
| actual |
| ))] |
| DataDoesNotMatchLength { expected: usize, actual: usize }, |
| |
| #[snafu(display( |
| "Did not receive any data. Bucket: {}, Location: {}", |
| bucket, |
| path |
| ))] |
| NoData { bucket: String, path: String }, |
| |
| #[snafu(display( |
| "Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})", |
| bucket, |
| path, |
| source, |
| source, |
| ))] |
| UnableToDeleteData { |
| source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>, |
| bucket: String, |
| path: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})", |
| bucket, |
| path, |
| source, |
| source, |
| ))] |
| UnableToGetData { |
| source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>, |
| bucket: String, |
| path: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to HEAD data. Bucket: {}, Location: {}, Error: {} ({:?})", |
| bucket, |
| path, |
| source, |
| source, |
| ))] |
| UnableToHeadData { |
| source: rusoto_core::RusotoError<rusoto_s3::HeadObjectError>, |
| bucket: String, |
| path: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})", |
| bucket, |
| path, |
| source, |
| source, |
| ))] |
| UnableToGetPieceOfData { |
| source: std::io::Error, |
| bucket: String, |
| path: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})", |
| bucket, |
| path, |
| source, |
| source, |
| ))] |
| UnableToPutData { |
| source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>, |
| bucket: String, |
| path: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to list data. Bucket: {}, Error: {} ({:?})", |
| bucket, |
| source, |
| source, |
| ))] |
| UnableToListData { |
| source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>, |
| bucket: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to copy object. Bucket: {}, From: {}, To: {}, Error: {}", |
| bucket, |
| from, |
| to, |
| source, |
| ))] |
| UnableToCopyObject { |
| source: rusoto_core::RusotoError<rusoto_s3::CopyObjectError>, |
| bucket: String, |
| from: String, |
| to: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to parse last modified date. Bucket: {}, Error: {} ({:?})", |
| bucket, |
| source, |
| source, |
| ))] |
| UnableToParseLastModified { |
| source: chrono::ParseError, |
| bucket: String, |
| }, |
| |
| #[snafu(display( |
| "Unable to buffer data into temporary file, Error: {} ({:?})", |
| source, |
| source, |
| ))] |
| UnableToBufferStream { source: std::io::Error }, |
| |
| #[snafu(display( |
| "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})", |
| region, |
| source, |
| source, |
| ))] |
| InvalidRegion { |
| region: String, |
| source: rusoto_core::region::ParseRegionError, |
| }, |
| |
| #[snafu(display("Missing aws-access-key"))] |
| MissingAccessKey, |
| |
| #[snafu(display("Missing aws-secret-access-key"))] |
| MissingSecretAccessKey, |
| |
| NotFound { |
| path: String, |
| source: Box<dyn std::error::Error + Send + Sync + 'static>, |
| }, |
| } |
| |
| impl From<Error> for super::Error { |
| fn from(source: Error) -> Self { |
| match source { |
| Error::NotFound { path, source } => Self::NotFound { path, source }, |
| _ => Self::Generic { |
| store: "S3", |
| source: Box::new(source), |
| }, |
| } |
| } |
| } |
| |
| /// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/). |
| pub struct AmazonS3 { |
| /// S3 client w/o any connection limit. |
| /// |
| /// You should normally use [`Self::client`] instead. |
| client_unrestricted: rusoto_s3::S3Client, |
| |
| /// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted). |
| connection_semaphore: Arc<Semaphore>, |
| |
| /// Bucket name used by this object store client. |
| bucket_name: String, |
| } |
| |
| impl fmt::Debug for AmazonS3 { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("AmazonS3") |
| .field("client", &"rusoto_s3::S3Client") |
| .field("bucket_name", &self.bucket_name) |
| .finish() |
| } |
| } |
| |
| impl fmt::Display for AmazonS3 { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| write!(f, "AmazonS3({})", self.bucket_name) |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for AmazonS3 { |
| async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { |
| let bucket_name = self.bucket_name.clone(); |
| let request_factory = move || { |
| let bytes = bytes.clone(); |
| |
| let length = bytes.len(); |
| let stream_data = Ok(bytes); |
| let stream = futures::stream::once(async move { stream_data }); |
| let byte_stream = ByteStream::new_with_size(stream, length); |
| |
| rusoto_s3::PutObjectRequest { |
| bucket: bucket_name.clone(), |
| key: location.to_string(), |
| body: Some(byte_stream), |
| ..Default::default() |
| } |
| }; |
| |
| let s3 = self.client().await; |
| |
| s3_request(move || { |
| let (s3, request_factory) = (s3.clone(), request_factory.clone()); |
| |
| async move { s3.put_object(request_factory()).await } |
| }) |
| .await |
| .context(UnableToPutDataSnafu { |
| bucket: &self.bucket_name, |
| path: location.as_ref(), |
| })?; |
| |
| Ok(()) |
| } |
| |
| async fn get(&self, location: &Path) -> Result<GetResult> { |
| Ok(GetResult::Stream( |
| self.get_object(location, None).await?.boxed(), |
| )) |
| } |
| |
| async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> { |
| let size_hint = range.end - range.start; |
| let stream = self.get_object(location, Some(range)).await?; |
| collect_bytes(stream, Some(size_hint)).await |
| } |
| |
| async fn head(&self, location: &Path) -> Result<ObjectMeta> { |
| let key = location.to_string(); |
| let head_request = rusoto_s3::HeadObjectRequest { |
| bucket: self.bucket_name.clone(), |
| key: key.clone(), |
| ..Default::default() |
| }; |
| let s = self |
| .client() |
| .await |
| .head_object(head_request) |
| .await |
| .map_err(|e| match e { |
| rusoto_core::RusotoError::Service( |
| rusoto_s3::HeadObjectError::NoSuchKey(_), |
| ) => Error::NotFound { |
| path: key.clone(), |
| source: e.into(), |
| }, |
| rusoto_core::RusotoError::Unknown(h) if h.status.as_u16() == 404 => { |
| Error::NotFound { |
| path: key.clone(), |
| source: "resource not found".into(), |
| } |
| } |
| _ => Error::UnableToHeadData { |
| bucket: self.bucket_name.to_owned(), |
| path: key.clone(), |
| source: e, |
| }, |
| })?; |
| |
| // Note: GetObject and HeadObject return a different date format from ListObjects |
| // |
| // S3 List returns timestamps in the form |
| // <LastModified>2013-09-17T18:07:53.000Z</LastModified> |
| // S3 GetObject returns timestamps in the form |
| // Last-Modified: Sun, 1 Jan 2006 12:00:00 GMT |
| let last_modified = match s.last_modified { |
| Some(lm) => DateTime::parse_from_rfc2822(&lm) |
| .context(UnableToParseLastModifiedSnafu { |
| bucket: &self.bucket_name, |
| })? |
| .with_timezone(&Utc), |
| None => Utc::now(), |
| }; |
| |
| Ok(ObjectMeta { |
| last_modified, |
| location: location.clone(), |
| size: usize::try_from(s.content_length.unwrap_or(0)) |
| .expect("unsupported size on this platform"), |
| }) |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| let bucket_name = self.bucket_name.clone(); |
| |
| let request_factory = move || rusoto_s3::DeleteObjectRequest { |
| bucket: bucket_name.clone(), |
| key: location.to_string(), |
| ..Default::default() |
| }; |
| |
| let s3 = self.client().await; |
| |
| s3_request(move || { |
| let (s3, request_factory) = (s3.clone(), request_factory.clone()); |
| |
| async move { s3.delete_object(request_factory()).await } |
| }) |
| .await |
| .context(UnableToDeleteDataSnafu { |
| bucket: &self.bucket_name, |
| path: location.as_ref(), |
| })?; |
| |
| Ok(()) |
| } |
| |
| async fn list( |
| &self, |
| prefix: Option<&Path>, |
| ) -> Result<BoxStream<'_, Result<ObjectMeta>>> { |
| Ok(self |
| .list_objects_v2(prefix, None) |
| .await? |
| .map_ok(move |list_objects_v2_result| { |
| let contents = list_objects_v2_result.contents.unwrap_or_default(); |
| let iter = contents |
| .into_iter() |
| .map(|object| convert_object_meta(object, &self.bucket_name)); |
| |
| futures::stream::iter(iter) |
| }) |
| .try_flatten() |
| .boxed()) |
| } |
| |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| Ok(self |
| .list_objects_v2(prefix, Some(DELIMITER.to_string())) |
| .await? |
| .try_fold( |
| ListResult { |
| common_prefixes: vec![], |
| objects: vec![], |
| }, |
| |acc, list_objects_v2_result| async move { |
| let mut res = acc; |
| let contents = list_objects_v2_result.contents.unwrap_or_default(); |
| let mut objects = contents |
| .into_iter() |
| .map(|object| convert_object_meta(object, &self.bucket_name)) |
| .collect::<Result<Vec<_>>>()?; |
| |
| res.objects.append(&mut objects); |
| |
| let prefixes = |
| list_objects_v2_result.common_prefixes.unwrap_or_default(); |
| res.common_prefixes.reserve(prefixes.len()); |
| |
| for p in prefixes { |
| let prefix = |
| p.prefix.expect("can't have a prefix without a value"); |
| res.common_prefixes.push(Path::parse(prefix)?); |
| } |
| |
| Ok(res) |
| }, |
| ) |
| .await?) |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| let from = from.as_ref(); |
| let to = to.as_ref(); |
| let bucket_name = self.bucket_name.clone(); |
| |
| let request_factory = move || rusoto_s3::CopyObjectRequest { |
| bucket: bucket_name.clone(), |
| copy_source: format!("{}/{}", &bucket_name, from), |
| key: to.to_string(), |
| ..Default::default() |
| }; |
| |
| let s3 = self.client().await; |
| |
| s3_request(move || { |
| let (s3, request_factory) = (s3.clone(), request_factory.clone()); |
| |
| async move { s3.copy_object(request_factory()).await } |
| }) |
| .await |
| .context(UnableToCopyObjectSnafu { |
| bucket: &self.bucket_name, |
| from, |
| to, |
| })?; |
| |
| Ok(()) |
| } |
| |
| async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) -> Result<()> { |
| // Will need dynamodb_lock |
| Err(crate::Error::NotImplemented) |
| } |
| } |
| |
| fn convert_object_meta(object: rusoto_s3::Object, bucket: &str) -> Result<ObjectMeta> { |
| let key = object.key.expect("object doesn't exist without a key"); |
| let location = Path::parse(key)?; |
| let last_modified = match object.last_modified { |
| Some(lm) => DateTime::parse_from_rfc3339(&lm) |
| .context(UnableToParseLastModifiedSnafu { bucket })? |
| .with_timezone(&Utc), |
| None => Utc::now(), |
| }; |
| let size = usize::try_from(object.size.unwrap_or(0)) |
| .expect("unsupported size on this platform"); |
| |
| Ok(ObjectMeta { |
| location, |
| last_modified, |
| size, |
| }) |
| } |
| |
| /// Configure a connection to Amazon S3 using the specified credentials in |
| /// the specified Amazon region and bucket. |
| #[allow(clippy::too_many_arguments)] |
| pub fn new_s3( |
| access_key_id: Option<impl Into<String>>, |
| secret_access_key: Option<impl Into<String>>, |
| region: impl Into<String>, |
| bucket_name: impl Into<String>, |
| endpoint: Option<impl Into<String>>, |
| session_token: Option<impl Into<String>>, |
| max_connections: NonZeroUsize, |
| allow_http: bool, |
| ) -> Result<AmazonS3> { |
| let region = region.into(); |
| let region: rusoto_core::Region = match endpoint { |
| None => region.parse().context(InvalidRegionSnafu { region })?, |
| Some(endpoint) => rusoto_core::Region::Custom { |
| name: region, |
| endpoint: endpoint.into(), |
| }, |
| }; |
| |
| let mut builder = HyperBuilder::default(); |
| builder.pool_max_idle_per_host(max_connections.get()); |
| |
| let connector = if allow_http { |
| hyper_rustls::HttpsConnectorBuilder::new() |
| .with_webpki_roots() |
| .https_or_http() |
| .enable_http1() |
| .enable_http2() |
| .build() |
| } else { |
| hyper_rustls::HttpsConnectorBuilder::new() |
| .with_webpki_roots() |
| .https_only() |
| .enable_http1() |
| .enable_http2() |
| .build() |
| }; |
| |
| let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector); |
| |
| let client = match (access_key_id, secret_access_key, session_token) { |
| (Some(access_key_id), Some(secret_access_key), Some(session_token)) => { |
| let credentials_provider = StaticProvider::new( |
| access_key_id.into(), |
| secret_access_key.into(), |
| Some(session_token.into()), |
| None, |
| ); |
| rusoto_s3::S3Client::new_with(http_client, credentials_provider, region) |
| } |
| (Some(access_key_id), Some(secret_access_key), None) => { |
| let credentials_provider = StaticProvider::new_minimal( |
| access_key_id.into(), |
| secret_access_key.into(), |
| ); |
| rusoto_s3::S3Client::new_with(http_client, credentials_provider, region) |
| } |
| (None, Some(_), _) => return Err(Error::MissingAccessKey.into()), |
| (Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()), |
| _ if std::env::var_os("AWS_WEB_IDENTITY_TOKEN_FILE").is_some() => { |
| rusoto_s3::S3Client::new_with( |
| http_client, |
| WebIdentityProvider::from_k8s_env(), |
| region, |
| ) |
| } |
| _ => rusoto_s3::S3Client::new_with( |
| http_client, |
| InstanceMetadataProvider::new(), |
| region, |
| ), |
| }; |
| |
| Ok(AmazonS3 { |
| client_unrestricted: client, |
| connection_semaphore: Arc::new(Semaphore::new(max_connections.get())), |
| bucket_name: bucket_name.into(), |
| }) |
| } |
| |
| /// Create a new [`AmazonS3`] that always errors |
| pub fn new_failing_s3() -> Result<AmazonS3> { |
| new_s3( |
| Some("foo"), |
| Some("bar"), |
| "us-east-1", |
| "bucket", |
| None as Option<&str>, |
| None as Option<&str>, |
| NonZeroUsize::new(16).unwrap(), |
| true, |
| ) |
| } |
| |
| /// S3 client bundled w/ a semaphore permit. |
| #[derive(Clone)] |
| struct SemaphoreClient { |
| /// Permit for this specific use of the client. |
| /// |
| /// Note that this field is never read and therefore considered "dead code" by rustc. |
| #[allow(dead_code)] |
| permit: Arc<OwnedSemaphorePermit>, |
| |
| inner: rusoto_s3::S3Client, |
| } |
| |
| impl Deref for SemaphoreClient { |
| type Target = rusoto_s3::S3Client; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.inner |
| } |
| } |
| |
| impl AmazonS3 { |
| /// Get a client according to the current connection limit. |
| async fn client(&self) -> SemaphoreClient { |
| let permit = Arc::clone(&self.connection_semaphore) |
| .acquire_owned() |
| .await |
| .expect("semaphore shouldn't be closed yet"); |
| SemaphoreClient { |
| permit: Arc::new(permit), |
| inner: self.client_unrestricted.clone(), |
| } |
| } |
| |
| async fn get_object( |
| &self, |
| location: &Path, |
| range: Option<Range<usize>>, |
| ) -> Result<impl Stream<Item = Result<Bytes>>> { |
| let key = location.to_string(); |
| let get_request = rusoto_s3::GetObjectRequest { |
| bucket: self.bucket_name.clone(), |
| key: key.clone(), |
| range: range.map(format_http_range), |
| ..Default::default() |
| }; |
| let bucket_name = self.bucket_name.clone(); |
| let stream = self |
| .client() |
| .await |
| .get_object(get_request) |
| .await |
| .map_err(|e| match e { |
| rusoto_core::RusotoError::Service( |
| rusoto_s3::GetObjectError::NoSuchKey(_), |
| ) => Error::NotFound { |
| path: key.clone(), |
| source: e.into(), |
| }, |
| _ => Error::UnableToGetData { |
| bucket: self.bucket_name.to_owned(), |
| path: key.clone(), |
| source: e, |
| }, |
| })? |
| .body |
| .context(NoDataSnafu { |
| bucket: self.bucket_name.to_owned(), |
| path: key.clone(), |
| })? |
| .map_err(move |source| Error::UnableToGetPieceOfData { |
| source, |
| bucket: bucket_name.clone(), |
| path: key.clone(), |
| }) |
| .err_into(); |
| |
| Ok(stream) |
| } |
| |
| async fn list_objects_v2( |
| &self, |
| prefix: Option<&Path>, |
| delimiter: Option<String>, |
| ) -> Result<BoxStream<'_, Result<rusoto_s3::ListObjectsV2Output>>> { |
| enum ListState { |
| Start, |
| HasMore(String), |
| Done, |
| } |
| |
| let prefix = format_prefix(prefix); |
| let bucket = self.bucket_name.clone(); |
| |
| let request_factory = move || rusoto_s3::ListObjectsV2Request { |
| bucket, |
| prefix, |
| delimiter, |
| ..Default::default() |
| }; |
| let s3 = self.client().await; |
| |
| Ok(stream::unfold(ListState::Start, move |state| { |
| let request_factory = request_factory.clone(); |
| let s3 = s3.clone(); |
| |
| async move { |
| let continuation_token = match &state { |
| ListState::HasMore(continuation_token) => Some(continuation_token), |
| ListState::Done => { |
| return None; |
| } |
| // If this is the first request we've made, we don't need to make any |
| // modifications to the request |
| ListState::Start => None, |
| }; |
| |
| let resp = s3_request(move || { |
| let (s3, request_factory, continuation_token) = ( |
| s3.clone(), |
| request_factory.clone(), |
| continuation_token.cloned(), |
| ); |
| |
| async move { |
| s3.list_objects_v2(rusoto_s3::ListObjectsV2Request { |
| continuation_token, |
| ..request_factory() |
| }) |
| .await |
| } |
| }) |
| .await; |
| |
| let resp = match resp { |
| Ok(resp) => resp, |
| Err(e) => return Some((Err(e), state)), |
| }; |
| |
| // The AWS response contains a field named `is_truncated` as well as |
| // `next_continuation_token`, and we're assuming that `next_continuation_token` |
| // is only set when `is_truncated` is true (and therefore not |
| // checking `is_truncated`). |
| let next_state = if let Some(next_continuation_token) = |
| &resp.next_continuation_token |
| { |
| ListState::HasMore(next_continuation_token.to_string()) |
| } else { |
| ListState::Done |
| }; |
| |
| Some((Ok(resp), next_state)) |
| } |
| }) |
| .map_err(move |e| { |
| Error::UnableToListData { |
| source: e, |
| bucket: self.bucket_name.clone(), |
| } |
| .into() |
| }) |
| .boxed()) |
| } |
| } |
| |
| /// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors. |
| /// |
| /// The `future_factory` argument is a function `F` that takes no arguments and, when called, will |
| /// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through |
| /// `rusoto` and return a `Result` that returns some type `R` on success and some |
| /// `rusoto_core::RusotoError<E>` on error. |
| /// |
| /// If the executed `Future` returns success, this function will return that success. |
| /// If the executed `Future` returns a 5xx server error, this function will wait an amount of |
| /// time that increases exponentially with the number of times it has retried, get a new `Future` by |
| /// calling `future_factory` again, and retry the request by `await`ing the `Future` again. |
| /// The retries will continue until the maximum number of retries has been attempted. In that case, |
| /// this function will return the last encountered error. |
| /// |
| /// Client errors (4xx) will never be retried by this function. |
| async fn s3_request<E, F, G, R>( |
| future_factory: F, |
| ) -> Result<R, rusoto_core::RusotoError<E>> |
| where |
| E: std::error::Error + Send, |
| F: Fn() -> G + Send, |
| G: Future<Output = Result<R, rusoto_core::RusotoError<E>>> + Send, |
| R: Send, |
| { |
| let mut attempts = 0; |
| |
| loop { |
| let request = future_factory(); |
| |
| let result = request.await; |
| |
| match result { |
| Ok(r) => return Ok(r), |
| Err(error) => { |
| attempts += 1; |
| |
| let should_retry = matches!( |
| error, |
| rusoto_core::RusotoError::Unknown(ref response) |
| if response.status.is_server_error() |
| ); |
| |
| if attempts > MAX_NUM_RETRIES { |
| warn!( |
| ?error, |
| attempts, "maximum number of retries exceeded for AWS S3 request" |
| ); |
| return Err(error); |
| } else if !should_retry { |
| return Err(error); |
| } else { |
| debug!(?error, attempts, "retrying AWS S3 request"); |
| let wait_time = Duration::from_millis(2u64.pow(attempts) * 50); |
| tokio::time::sleep(wait_time).await; |
| } |
| } |
| } |
| } |
| } |
| |
| impl Error { |
| #[cfg(test)] |
| fn s3_error_due_to_credentials(&self) -> bool { |
| use rusoto_core::RusotoError; |
| use Error::*; |
| |
| matches!( |
| self, |
| UnableToPutData { |
| source: RusotoError::Credentials(_), |
| bucket: _, |
| path: _, |
| } | UnableToGetData { |
| source: RusotoError::Credentials(_), |
| bucket: _, |
| path: _, |
| } | UnableToDeleteData { |
| source: RusotoError::Credentials(_), |
| bucket: _, |
| path: _, |
| } | UnableToListData { |
| source: RusotoError::Credentials(_), |
| bucket: _, |
| } |
| ) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::{ |
| tests::{ |
| get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, |
| put_get_delete_list, rename_and_copy, |
| }, |
| Error as ObjectStoreError, ObjectStore, |
| }; |
| use bytes::Bytes; |
| use std::env; |
| |
| type TestError = Box<dyn std::error::Error + Send + Sync + 'static>; |
| type Result<T, E = TestError> = std::result::Result<T, E>; |
| |
| const NON_EXISTENT_NAME: &str = "nonexistentname"; |
| |
| #[derive(Debug)] |
| struct AwsConfig { |
| access_key_id: String, |
| secret_access_key: String, |
| region: String, |
| bucket: String, |
| endpoint: Option<String>, |
| token: Option<String>, |
| } |
| |
| // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set. |
| macro_rules! maybe_skip_integration { |
| () => {{ |
| dotenv::dotenv().ok(); |
| |
| let required_vars = [ |
| "AWS_DEFAULT_REGION", |
| "OBJECT_STORE_BUCKET", |
| "AWS_ACCESS_KEY_ID", |
| "AWS_SECRET_ACCESS_KEY", |
| ]; |
| let unset_vars: Vec<_> = required_vars |
| .iter() |
| .filter_map(|&name| match env::var(name) { |
| Ok(_) => None, |
| Err(_) => Some(name), |
| }) |
| .collect(); |
| let unset_var_names = unset_vars.join(", "); |
| |
| let force = env::var("TEST_INTEGRATION"); |
| |
| if force.is_ok() && !unset_var_names.is_empty() { |
| panic!( |
| "TEST_INTEGRATION is set, \ |
| but variable(s) {} need to be set", |
| unset_var_names |
| ); |
| } else if force.is_err() { |
| eprintln!( |
| "skipping AWS integration test - set {}TEST_INTEGRATION to run", |
| if unset_var_names.is_empty() { |
| String::new() |
| } else { |
| format!("{} and ", unset_var_names) |
| } |
| ); |
| return; |
| } else { |
| AwsConfig { |
| access_key_id: env::var("AWS_ACCESS_KEY_ID") |
| .expect("already checked AWS_ACCESS_KEY_ID"), |
| secret_access_key: env::var("AWS_SECRET_ACCESS_KEY") |
| .expect("already checked AWS_SECRET_ACCESS_KEY"), |
| region: env::var("AWS_DEFAULT_REGION") |
| .expect("already checked AWS_DEFAULT_REGION"), |
| bucket: env::var("OBJECT_STORE_BUCKET") |
| .expect("already checked OBJECT_STORE_BUCKET"), |
| endpoint: env::var("AWS_ENDPOINT").ok(), |
| token: env::var("AWS_SESSION_TOKEN").ok(), |
| } |
| } |
| }}; |
| } |
| |
| fn check_credentials<T>(r: Result<T>) -> Result<T> { |
| if let Err(e) = &r { |
| let e = &**e; |
| if let Some(e) = e.downcast_ref::<Error>() { |
| if e.s3_error_due_to_credentials() { |
| eprintln!( |
| "Try setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY \ |
| environment variables" |
| ); |
| } |
| } |
| } |
| |
| r |
| } |
| |
| fn make_integration(config: AwsConfig) -> AmazonS3 { |
| new_s3( |
| Some(config.access_key_id), |
| Some(config.secret_access_key), |
| config.region, |
| config.bucket, |
| config.endpoint, |
| config.token, |
| NonZeroUsize::new(16).unwrap(), |
| true, |
| ) |
| .expect("Valid S3 config") |
| } |
| |
| #[tokio::test] |
| async fn s3_test() { |
| let config = maybe_skip_integration!(); |
| let integration = make_integration(config); |
| |
| check_credentials(put_get_delete_list(&integration).await).unwrap(); |
| check_credentials(list_uses_directories_correctly(&integration).await).unwrap(); |
| check_credentials(list_with_delimiter(&integration).await).unwrap(); |
| check_credentials(rename_and_copy(&integration).await).unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn s3_test_get_nonexistent_location() { |
| let config = maybe_skip_integration!(); |
| let integration = make_integration(config); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = get_nonexistent_object(&integration, Some(location)) |
| .await |
| .unwrap_err(); |
| if let ObjectStoreError::NotFound { path, source } = err { |
| let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>(); |
| assert!( |
| matches!( |
| source_variant, |
| Some(rusoto_core::RusotoError::Service( |
| rusoto_s3::GetObjectError::NoSuchKey(_) |
| )), |
| ), |
| "got: {:?}", |
| source_variant |
| ); |
| assert_eq!(path, NON_EXISTENT_NAME); |
| } else { |
| panic!("unexpected error type: {:?}", err); |
| } |
| } |
| |
| #[tokio::test] |
| async fn s3_test_get_nonexistent_bucket() { |
| let mut config = maybe_skip_integration!(); |
| config.bucket = NON_EXISTENT_NAME.into(); |
| let integration = make_integration(config); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.get(&location).await.unwrap_err().to_string(); |
| assert!( |
| err.contains("The specified bucket does not exist"), |
| "{}", |
| err |
| ) |
| } |
| |
| #[tokio::test] |
| async fn s3_test_put_nonexistent_bucket() { |
| let mut config = maybe_skip_integration!(); |
| config.bucket = NON_EXISTENT_NAME.into(); |
| let integration = make_integration(config); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| let data = Bytes::from("arbitrary data"); |
| |
| let err = integration |
| .put(&location, data) |
| .await |
| .unwrap_err() |
| .to_string(); |
| |
| assert!( |
| err.contains("The specified bucket does not exist") |
| && err.contains("Unable to PUT data"), |
| "{}", |
| err |
| ) |
| } |
| |
| #[tokio::test] |
| async fn s3_test_delete_nonexistent_location() { |
| let config = maybe_skip_integration!(); |
| let integration = make_integration(config); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| integration.delete(&location).await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn s3_test_delete_nonexistent_bucket() { |
| let mut config = maybe_skip_integration!(); |
| config.bucket = NON_EXISTENT_NAME.into(); |
| let integration = make_integration(config); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.delete(&location).await.unwrap_err().to_string(); |
| assert!( |
| err.contains("The specified bucket does not exist") |
| && err.contains("Unable to DELETE data"), |
| "{}", |
| err |
| ) |
| } |
| } |