| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An object store implementation for S3 |
| //! |
| //! ## Multipart uploads |
| //! |
| //! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method. |
| //! |
| //! If the writer fails for any reason, you may have parts uploaded to AWS but not |
| //! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop |
| //! these unneeded parts, however, it is recommended that you consider implementing |
| //! [automatic cleanup] of unused parts that are older than some threshold. |
| //! |
| //! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/ |
| |
| use async_trait::async_trait; |
| use futures::stream::BoxStream; |
| use futures::{StreamExt, TryStreamExt}; |
| use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; |
| use reqwest::{Method, StatusCode}; |
| use std::{sync::Arc, time::Duration}; |
| use url::Url; |
| |
| use crate::aws::client::{CompleteMultipartMode, PutPartPayload, RequestError, S3Client}; |
| use crate::client::get::GetClientExt; |
| use crate::client::list::{ListClient, ListClientExt}; |
| use crate::client::CredentialProvider; |
| use crate::multipart::{MultipartStore, PartId}; |
| use crate::signer::Signer; |
| use crate::util::STRICT_ENCODE_SET; |
| use crate::{ |
| Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, |
| ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, |
| UploadPart, |
| }; |
| |
| static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); |
| static COPY_SOURCE_HEADER: HeaderName = HeaderName::from_static("x-amz-copy-source"); |
| |
| mod builder; |
| mod checksum; |
| mod client; |
| mod credential; |
| mod precondition; |
| |
| #[cfg(not(target_arch = "wasm32"))] |
| mod resolve; |
| |
| pub use builder::{AmazonS3Builder, AmazonS3ConfigKey}; |
| pub use checksum::Checksum; |
| pub use precondition::{S3ConditionalPut, S3CopyIfNotExists}; |
| |
| #[cfg(not(target_arch = "wasm32"))] |
| pub use resolve::resolve_bucket_region; |
| |
| /// This struct is used to maintain the URI path encoding |
| const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/'); |
| |
| const STORE: &str = "S3"; |
| |
| /// [`CredentialProvider`] for [`AmazonS3`] |
| pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>; |
| use crate::client::parts::Parts; |
| use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore}; |
| pub use credential::{AwsAuthorizer, AwsCredential}; |
| |
| /// Interface for [Amazon S3](https://aws.amazon.com/s3/). |
| #[derive(Debug, Clone)] |
| pub struct AmazonS3 { |
| client: Arc<S3Client>, |
| } |
| |
| impl std::fmt::Display for AmazonS3 { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "AmazonS3({})", self.client.config.bucket) |
| } |
| } |
| |
| impl AmazonS3 { |
| /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`] |
| pub fn credentials(&self) -> &AwsCredentialProvider { |
| &self.client.config.credentials |
| } |
| |
| /// Create a full URL to the resource specified by `path` with this instance's configuration. |
| fn path_url(&self, path: &Path) -> String { |
| self.client.config.path_url(path) |
| } |
| } |
| |
| #[async_trait] |
| impl Signer for AmazonS3 { |
| /// Create a URL containing the relevant [AWS SigV4] query parameters that authorize a request |
| /// via `method` to the resource at `path` valid for the duration specified in `expires_in`. |
| /// |
| /// [AWS SigV4]: https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html |
| /// |
| /// # Example |
| /// |
| /// This example returns a URL that will enable a user to upload a file to |
| /// "some-folder/some-file.txt" in the next hour. |
| /// |
| /// ``` |
| /// # async fn example() -> Result<(), Box<dyn std::error::Error>> { |
| /// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer}; |
| /// # use reqwest::Method; |
| /// # use std::time::Duration; |
| /// # |
| /// let region = "us-east-1"; |
| /// let s3 = AmazonS3Builder::new() |
| /// .with_region(region) |
| /// .with_bucket_name("my-bucket") |
| /// .with_access_key_id("my-access-key-id") |
| /// .with_secret_access_key("my-secret-access-key") |
| /// .build()?; |
| /// |
| /// let url = s3.signed_url( |
| /// Method::PUT, |
| /// &Path::from("some-folder/some-file.txt"), |
| /// Duration::from_secs(60 * 60) |
| /// ).await?; |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> { |
| let credential = self.credentials().get_credential().await?; |
| let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region) |
| .with_request_payer(self.client.config.request_payer); |
| |
| let path_url = self.path_url(path); |
| let mut url = path_url.parse().map_err(|e| Error::Generic { |
| store: STORE, |
| source: format!("Unable to parse url {path_url}: {e}").into(), |
| })?; |
| |
| authorizer.sign(method, &mut url, expires_in); |
| |
| Ok(url) |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for AmazonS3 { |
| async fn put_opts( |
| &self, |
| location: &Path, |
| payload: PutPayload, |
| opts: PutOptions, |
| ) -> Result<PutResult> { |
| let PutOptions { |
| mode, |
| tags, |
| attributes, |
| extensions, |
| } = opts; |
| |
| let request = self |
| .client |
| .request(Method::PUT, location) |
| .with_payload(payload) |
| .with_attributes(attributes) |
| .with_tags(tags) |
| .with_extensions(extensions) |
| .with_encryption_headers(); |
| |
| match (mode, &self.client.config.conditional_put) { |
| (PutMode::Overwrite, _) => request.idempotent(true).do_put().await, |
| (PutMode::Create, S3ConditionalPut::Disabled) => Err(Error::NotImplemented), |
| (PutMode::Create, S3ConditionalPut::ETagMatch) => { |
| match request.header(&IF_NONE_MATCH, "*").do_put().await { |
| // Technically If-None-Match should return NotModified but some stores, |
| // such as R2, instead return PreconditionFailed |
| // https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject |
| Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => { |
| Err(Error::AlreadyExists { |
| path: location.to_string(), |
| source: Box::new(e), |
| }) |
| } |
| r => r, |
| } |
| } |
| (PutMode::Update(v), put) => { |
| let etag = v.e_tag.ok_or_else(|| Error::Generic { |
| store: STORE, |
| source: "ETag required for conditional put".to_string().into(), |
| })?; |
| match put { |
| S3ConditionalPut::ETagMatch => { |
| match request |
| .header(&IF_MATCH, etag.as_str()) |
| // Real S3 will occasionally report 409 Conflict |
| // if there are concurrent `If-Match` requests |
| // in flight, so we need to be prepared to retry |
| // 409 responses. |
| .retry_on_conflict(true) |
| .do_put() |
| .await |
| { |
| // Real S3 reports NotFound rather than PreconditionFailed when the |
| // object doesn't exist. Convert to PreconditionFailed for |
| // consistency with R2. This also matches what the HTTP spec |
| // says the behavior should be. |
| Err(Error::NotFound { path, source }) => { |
| Err(Error::Precondition { path, source }) |
| } |
| r => r, |
| } |
| } |
| S3ConditionalPut::Disabled => Err(Error::NotImplemented), |
| } |
| } |
| } |
| } |
| |
| async fn put_multipart_opts( |
| &self, |
| location: &Path, |
| opts: PutMultipartOptions, |
| ) -> Result<Box<dyn MultipartUpload>> { |
| let upload_id = self.client.create_multipart(location, opts).await?; |
| |
| Ok(Box::new(S3MultiPartUpload { |
| part_idx: 0, |
| state: Arc::new(UploadState { |
| client: Arc::clone(&self.client), |
| location: location.clone(), |
| upload_id: upload_id.clone(), |
| parts: Default::default(), |
| }), |
| })) |
| } |
| |
| async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> { |
| self.client.get_opts(location, options).await |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| self.client.request(Method::DELETE, location).send().await?; |
| Ok(()) |
| } |
| |
| fn delete_stream<'a>( |
| &'a self, |
| locations: BoxStream<'a, Result<Path>>, |
| ) -> BoxStream<'a, Result<Path>> { |
| locations |
| .try_chunks(1_000) |
| .map(move |locations| async { |
| // Early return the error. We ignore the paths that have already been |
| // collected into the chunk. |
| let locations = locations.map_err(|e| e.1)?; |
| self.client |
| .bulk_delete_request(locations) |
| .await |
| .map(futures::stream::iter) |
| }) |
| .buffered(20) |
| .try_flatten() |
| .boxed() |
| } |
| |
| fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> { |
| self.client.list(prefix) |
| } |
| |
| fn list_with_offset( |
| &self, |
| prefix: Option<&Path>, |
| offset: &Path, |
| ) -> BoxStream<'static, Result<ObjectMeta>> { |
| if self.client.config.is_s3_express() { |
| let offset = offset.clone(); |
| // S3 Express does not support start-after |
| return self |
| .client |
| .list(prefix) |
| .try_filter(move |f| futures::future::ready(f.location > offset)) |
| .boxed(); |
| } |
| |
| self.client.list_with_offset(prefix, offset) |
| } |
| |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| self.client.list_with_delimiter(prefix).await |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| self.client |
| .copy_request(from, to) |
| .idempotent(true) |
| .send() |
| .await?; |
| Ok(()) |
| } |
| |
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
| let (k, v, status) = match &self.client.config.copy_if_not_exists { |
| Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED), |
| Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status), |
| Some(S3CopyIfNotExists::Multipart) => { |
| let upload_id = self |
| .client |
| .create_multipart(to, PutMultipartOptions::default()) |
| .await?; |
| |
| let res = async { |
| let part_id = self |
| .client |
| .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) |
| .await?; |
| match self |
| .client |
| .complete_multipart( |
| to, |
| &upload_id, |
| vec![part_id], |
| CompleteMultipartMode::Create, |
| ) |
| .await |
| { |
| Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { |
| path: to.to_string(), |
| source: Box::new(e), |
| }), |
| Ok(_) => Ok(()), |
| Err(e) => Err(e), |
| } |
| } |
| .await; |
| |
| // If the multipart upload failed, make a best effort attempt to |
| // clean it up. It's the caller's responsibility to add a |
| // lifecycle rule if guaranteed cleanup is required, as we |
| // cannot protect against an ill-timed process crash. |
| if res.is_err() { |
| let _ = self.client.abort_multipart(to, &upload_id).await; |
| } |
| |
| return res; |
| } |
| None => { |
| return Err(Error::NotSupported { |
| source: "S3 does not support copy-if-not-exists".to_string().into(), |
| }) |
| } |
| }; |
| |
| let req = self.client.copy_request(from, to); |
| match req.header(k, v).send().await { |
| Err(RequestError::Retry { source, path }) if source.status() == Some(status) => { |
| Err(Error::AlreadyExists { |
| source: Box::new(source), |
| path, |
| }) |
| } |
| Err(e) => Err(e.into()), |
| Ok(_) => Ok(()), |
| } |
| } |
| } |
| |
| #[derive(Debug)] |
| struct S3MultiPartUpload { |
| part_idx: usize, |
| state: Arc<UploadState>, |
| } |
| |
| #[derive(Debug)] |
| struct UploadState { |
| parts: Parts, |
| location: Path, |
| upload_id: String, |
| client: Arc<S3Client>, |
| } |
| |
| #[async_trait] |
| impl MultipartUpload for S3MultiPartUpload { |
| fn put_part(&mut self, data: PutPayload) -> UploadPart { |
| let idx = self.part_idx; |
| self.part_idx += 1; |
| let state = Arc::clone(&self.state); |
| Box::pin(async move { |
| let part = state |
| .client |
| .put_part( |
| &state.location, |
| &state.upload_id, |
| idx, |
| PutPartPayload::Part(data), |
| ) |
| .await?; |
| state.parts.put(idx, part); |
| Ok(()) |
| }) |
| } |
| |
| async fn complete(&mut self) -> Result<PutResult> { |
| let parts = self.state.parts.finish(self.part_idx)?; |
| |
| self.state |
| .client |
| .complete_multipart( |
| &self.state.location, |
| &self.state.upload_id, |
| parts, |
| CompleteMultipartMode::Overwrite, |
| ) |
| .await |
| } |
| |
| async fn abort(&mut self) -> Result<()> { |
| self.state |
| .client |
| .request(Method::DELETE, &self.state.location) |
| .query(&[("uploadId", &self.state.upload_id)]) |
| .idempotent(true) |
| .send() |
| .await?; |
| |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl MultipartStore for AmazonS3 { |
| async fn create_multipart(&self, path: &Path) -> Result<MultipartId> { |
| self.client |
| .create_multipart(path, PutMultipartOptions::default()) |
| .await |
| } |
| |
| async fn put_part( |
| &self, |
| path: &Path, |
| id: &MultipartId, |
| part_idx: usize, |
| data: PutPayload, |
| ) -> Result<PartId> { |
| self.client |
| .put_part(path, id, part_idx, PutPartPayload::Part(data)) |
| .await |
| } |
| |
| async fn complete_multipart( |
| &self, |
| path: &Path, |
| id: &MultipartId, |
| parts: Vec<PartId>, |
| ) -> Result<PutResult> { |
| self.client |
| .complete_multipart(path, id, parts, CompleteMultipartMode::Overwrite) |
| .await |
| } |
| |
| async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { |
| self.client |
| .request(Method::DELETE, path) |
| .query(&[("uploadId", id)]) |
| .send() |
| .await?; |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl PaginatedListStore for AmazonS3 { |
| async fn list_paginated( |
| &self, |
| prefix: Option<&str>, |
| opts: PaginatedListOptions, |
| ) -> Result<PaginatedListResult> { |
| self.client.list_request(prefix, opts).await |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use crate::client::get::GetClient; |
| use crate::client::retry::RetryContext; |
| use crate::client::SpawnedReqwestConnector; |
| use crate::integration::*; |
| use crate::tests::*; |
| use crate::ClientOptions; |
| use crate::ObjectStoreExt; |
| use base64::prelude::BASE64_STANDARD; |
| use base64::Engine; |
| use http::HeaderMap; |
| |
| const NON_EXISTENT_NAME: &str = "nonexistentname"; |
| |
| #[tokio::test] |
| async fn write_multipart_file_with_signature() { |
| maybe_skip_integration!(); |
| |
| let bucket = "test-bucket-for-checksum"; |
| let store = AmazonS3Builder::from_env() |
| .with_bucket_name(bucket) |
| .with_checksum_algorithm(Checksum::SHA256) |
| .build() |
| .unwrap(); |
| |
| let str = "test.bin"; |
| let path = Path::parse(str).unwrap(); |
| let opts = PutMultipartOptions::default(); |
| let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); |
| |
| upload |
| .put_part(PutPayload::from(vec![0u8; 10_000_000])) |
| .await |
| .unwrap(); |
| upload |
| .put_part(PutPayload::from(vec![0u8; 5_000_000])) |
| .await |
| .unwrap(); |
| |
| let res = upload.complete().await.unwrap(); |
| assert!(res.e_tag.is_some(), "Should have valid etag"); |
| |
| store.delete(&path).await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn write_multipart_file_with_signature_object_lock() { |
| maybe_skip_integration!(); |
| |
| let bucket = "test-object-lock"; |
| let store = AmazonS3Builder::from_env() |
| .with_bucket_name(bucket) |
| .with_checksum_algorithm(Checksum::SHA256) |
| .build() |
| .unwrap(); |
| |
| let str = "test.bin"; |
| let path = Path::parse(str).unwrap(); |
| let opts = PutMultipartOptions::default(); |
| let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); |
| |
| upload |
| .put_part(PutPayload::from(vec![0u8; 10_000_000])) |
| .await |
| .unwrap(); |
| upload |
| .put_part(PutPayload::from(vec![0u8; 5_000_000])) |
| .await |
| .unwrap(); |
| |
| let res = upload.complete().await.unwrap(); |
| assert!(res.e_tag.is_some(), "Should have valid etag"); |
| |
| store.delete(&path).await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn s3_test() { |
| maybe_skip_integration!(); |
| let config = AmazonS3Builder::from_env(); |
| |
| let integration = config.build().unwrap(); |
| let config = &integration.client.config; |
| let test_not_exists = config.copy_if_not_exists.is_some(); |
| let test_conditional_put = config.conditional_put != S3ConditionalPut::Disabled; |
| |
| put_get_delete_list(&integration).await; |
| get_opts(&integration).await; |
| list_uses_directories_correctly(&integration).await; |
| list_with_delimiter(&integration).await; |
| rename_and_copy(&integration).await; |
| stream_get(&integration).await; |
| multipart(&integration, &integration).await; |
| multipart_race_condition(&integration, true).await; |
| multipart_out_of_order(&integration).await; |
| signing(&integration).await; |
| s3_encryption(&integration).await; |
| put_get_attributes(&integration).await; |
| list_paginated(&integration, &integration).await; |
| |
| // Object tagging is not supported by S3 Express One Zone |
| if config.session_provider.is_none() { |
| tagging( |
| Arc::new(AmazonS3 { |
| client: Arc::clone(&integration.client), |
| }), |
| !config.disable_tagging, |
| |p| { |
| let client = Arc::clone(&integration.client); |
| async move { client.get_object_tagging(&p).await } |
| }, |
| ) |
| .await; |
| } |
| |
| if test_not_exists { |
| copy_if_not_exists(&integration).await; |
| } |
| if test_conditional_put { |
| put_opts(&integration, true).await; |
| } |
| |
| // run integration test with unsigned payload enabled |
| let builder = AmazonS3Builder::from_env().with_unsigned_payload(true); |
| let integration = builder.build().unwrap(); |
| put_get_delete_list(&integration).await; |
| |
| // run integration test with checksum set to sha256 |
| let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256); |
| let integration = builder.build().unwrap(); |
| put_get_delete_list(&integration).await; |
| } |
| |
| #[tokio::test] |
| async fn s3_test_get_nonexistent_location() { |
| maybe_skip_integration!(); |
| let integration = AmazonS3Builder::from_env().build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = get_nonexistent_object(&integration, Some(location)) |
| .await |
| .unwrap_err(); |
| assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err); |
| } |
| |
| #[tokio::test] |
| async fn s3_test_get_nonexistent_bucket() { |
| maybe_skip_integration!(); |
| let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME); |
| let integration = config.build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.get(&location).await.unwrap_err(); |
| assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err); |
| } |
| |
| #[tokio::test] |
| async fn s3_test_put_nonexistent_bucket() { |
| maybe_skip_integration!(); |
| let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME); |
| let integration = config.build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| let data = PutPayload::from("arbitrary data"); |
| |
| let err = integration.put(&location, data).await.unwrap_err(); |
| assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err); |
| } |
| |
| #[tokio::test] |
| async fn s3_test_delete_nonexistent_location() { |
| maybe_skip_integration!(); |
| let integration = AmazonS3Builder::from_env().build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| integration.delete(&location).await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn s3_test_delete_nonexistent_bucket() { |
| maybe_skip_integration!(); |
| let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME); |
| let integration = config.build().unwrap(); |
| |
| let location = Path::from_iter([NON_EXISTENT_NAME]); |
| |
| let err = integration.delete(&location).await.unwrap_err(); |
| assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err); |
| } |
| |
| #[tokio::test] |
| #[ignore = "Tests shouldn't call use remote services by default"] |
| async fn test_disable_creds() { |
| // https://registry.opendata.aws/daylight-osm/ |
| let v1 = AmazonS3Builder::new() |
| .with_bucket_name("daylight-map-distribution") |
| .with_region("us-west-1") |
| .with_access_key_id("local") |
| .with_secret_access_key("development") |
| .build() |
| .unwrap(); |
| |
| let prefix = Path::from("release"); |
| |
| v1.list_with_delimiter(Some(&prefix)).await.unwrap_err(); |
| |
| let v2 = AmazonS3Builder::new() |
| .with_bucket_name("daylight-map-distribution") |
| .with_region("us-west-1") |
| .with_skip_signature(true) |
| .build() |
| .unwrap(); |
| |
| v2.list_with_delimiter(Some(&prefix)).await.unwrap(); |
| } |
| |
| async fn s3_encryption(store: &AmazonS3) { |
| maybe_skip_integration!(); |
| |
| let data = PutPayload::from(vec![3u8; 1024]); |
| |
| let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into(); |
| let expected_encryption = |
| if let Some(encryption_type) = encryption_headers.get("x-amz-server-side-encryption") { |
| encryption_type |
| } else { |
| eprintln!("Skipping S3 encryption test - encryption not configured"); |
| return; |
| }; |
| |
| let locations = [ |
| Path::from("test-encryption-1"), |
| Path::from("test-encryption-2"), |
| Path::from("test-encryption-3"), |
| ]; |
| |
| store.put(&locations[0], data.clone()).await.unwrap(); |
| store.copy(&locations[0], &locations[1]).await.unwrap(); |
| |
| let mut upload = store.put_multipart(&locations[2]).await.unwrap(); |
| upload.put_part(data.clone()).await.unwrap(); |
| upload.complete().await.unwrap(); |
| |
| for location in &locations { |
| let mut context = RetryContext::new(&store.client.config.retry_config); |
| |
| let res = store |
| .client |
| .get_request(&mut context, location, GetOptions::default()) |
| .await |
| .unwrap(); |
| |
| let headers = res.headers(); |
| assert_eq!( |
| headers |
| .get("x-amz-server-side-encryption") |
| .expect("object is not encrypted"), |
| expected_encryption |
| ); |
| |
| store.delete(location).await.unwrap(); |
| } |
| } |
| |
| /// See CONTRIBUTING.md for the MinIO setup for this test. |
| #[tokio::test] |
| async fn test_s3_ssec_encryption_with_minio() { |
| if std::env::var("TEST_S3_SSEC_ENCRYPTION").is_err() { |
| eprintln!("Skipping S3 SSE-C encryption test"); |
| return; |
| } |
| eprintln!("Running S3 SSE-C encryption test"); |
| |
| let customer_key = "1234567890abcdef1234567890abcdef"; |
| let expected_md5 = "JMwgiexXqwuPqIPjYFmIZQ=="; |
| |
| let store = AmazonS3Builder::from_env() |
| .with_ssec_encryption(BASE64_STANDARD.encode(customer_key)) |
| .with_client_options(ClientOptions::default().with_allow_invalid_certificates(true)) |
| .build() |
| .unwrap(); |
| |
| let data = PutPayload::from(vec![3u8; 1024]); |
| |
| let locations = [ |
| Path::from("test-encryption-1"), |
| Path::from("test-encryption-2"), |
| Path::from("test-encryption-3"), |
| ]; |
| |
| // Test put with sse-c. |
| store.put(&locations[0], data.clone()).await.unwrap(); |
| |
| // Test copy with sse-c. |
| store.copy(&locations[0], &locations[1]).await.unwrap(); |
| |
| // Test multipart upload with sse-c. |
| let mut upload = store.put_multipart(&locations[2]).await.unwrap(); |
| upload.put_part(data.clone()).await.unwrap(); |
| upload.complete().await.unwrap(); |
| |
| // Test get with sse-c. |
| for location in &locations { |
| let mut context = RetryContext::new(&store.client.config.retry_config); |
| |
| let res = store |
| .client |
| .get_request(&mut context, location, GetOptions::default()) |
| .await |
| .unwrap(); |
| |
| let headers = res.headers(); |
| assert_eq!( |
| headers |
| .get("x-amz-server-side-encryption-customer-algorithm") |
| .expect("object is not encrypted with SSE-C"), |
| "AES256" |
| ); |
| |
| assert_eq!( |
| headers |
| .get("x-amz-server-side-encryption-customer-key-MD5") |
| .expect("object is not encrypted with SSE-C"), |
| expected_md5 |
| ); |
| |
| store.delete(location).await.unwrap(); |
| } |
| } |
| |
| /// Integration test that ensures I/O is done on an alternate threadpool |
| /// when using the `SpawnedReqwestConnector`. |
| #[test] |
| fn s3_alternate_threadpool_spawned_request_connector() { |
| maybe_skip_integration!(); |
| let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); |
| |
| // Runtime with I/O enabled |
| let io_runtime = tokio::runtime::Builder::new_current_thread() |
| .enable_all() // <-- turns on IO |
| .build() |
| .unwrap(); |
| |
| // Runtime without I/O enabled |
| let non_io_runtime = tokio::runtime::Builder::new_current_thread() |
| // note: no call to enable_all |
| .build() |
| .unwrap(); |
| |
| // run the io runtime in a different thread |
| let io_handle = io_runtime.handle().clone(); |
| let thread_handle = std::thread::spawn(move || { |
| io_runtime.block_on(async move { |
| shutdown_rx.await.unwrap(); |
| }); |
| }); |
| |
| let store = AmazonS3Builder::from_env() |
| // use different bucket to avoid collisions with other tests |
| .with_bucket_name("test-bucket-for-spawn") |
| .with_http_connector(SpawnedReqwestConnector::new(io_handle)) |
| .build() |
| .unwrap(); |
| |
| // run a request on the non io runtime -- will fail if the connector |
| // does not spawn the request to the io runtime |
| non_io_runtime |
| .block_on(async move { |
| let path = Path::from("alternate_threadpool/test.txt"); |
| store.delete(&path).await.ok(); // remove the file if it exists from prior runs |
| store.put(&path, "foo".into()).await?; |
| let res = store.get(&path).await?.bytes().await?; |
| assert_eq!(res.as_ref(), b"foo"); |
| store.delete(&path).await?; // cleanup |
| Ok(()) as Result<()> |
| }) |
| .expect("failed to run request on non io runtime"); |
| |
| // shutdown the io runtime and thread |
| shutdown_tx.send(()).ok(); |
| thread_handle.join().expect("runtime thread panicked"); |
| } |
| } |