blob: 9e741c9142dd5931b46fb57703a255bc63054066 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! An object store implementation for S3
//!
//! ## Multipart uploads
//!
//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method.
//!
//! If the writer fails for any reason, you may have parts uploaded to AWS but not
//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop
//! these unneeded parts, however, it is recommended that you consider implementing
//! [automatic cleanup] of unused parts that are older than some threshold.
//!
//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use url::Url;
use crate::aws::client::{RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, Path, PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
mod builder;
mod checksum;
mod client;
mod credential;
mod dynamo;
mod precondition;
mod resolve;
pub use builder::{AmazonS3Builder, AmazonS3ConfigKey, S3EncryptionHeaders};
pub use checksum::Checksum;
pub use dynamo::DynamoCommit;
pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
pub use resolve::resolve_bucket_region;
/// This struct is used to maintain the URI path encoding
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
const STORE: &str = "S3";
/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
use crate::client::parts::Parts;
pub use credential::{AwsAuthorizer, AwsCredential};
/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
#[derive(Debug)]
pub struct AmazonS3 {
client: Arc<S3Client>,
}
impl std::fmt::Display for AmazonS3 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "AmazonS3({})", self.client.config.bucket)
}
}
impl AmazonS3 {
/// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`]
pub fn credentials(&self) -> &AwsCredentialProvider {
&self.client.config.credentials
}
/// Create a full URL to the resource specified by `path` with this instance's configuration.
fn path_url(&self, path: &Path) -> String {
self.client.config.path_url(path)
}
}
#[async_trait]
impl Signer for AmazonS3 {
/// Create a URL containing the relevant [AWS SigV4] query parameters that authorize a request
/// via `method` to the resource at `path` valid for the duration specified in `expires_in`.
///
/// [AWS SigV4]: https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
///
/// # Example
///
/// This example returns a URL that will enable a user to upload a file to
/// "some-folder/some-file.txt" in the next hour.
///
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer};
/// # use reqwest::Method;
/// # use std::time::Duration;
/// #
/// let region = "us-east-1";
/// let s3 = AmazonS3Builder::new()
/// .with_region(region)
/// .with_bucket_name("my-bucket")
/// .with_access_key_id("my-access-key-id")
/// .with_secret_access_key("my-secret-access-key")
/// .build()?;
///
/// let url = s3.signed_url(
/// Method::PUT,
/// &Path::from("some-folder/some-file.txt"),
/// Duration::from_secs(60 * 60)
/// ).await?;
/// # Ok(())
/// # }
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region);
let path_url = self.path_url(path);
let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
store: STORE,
source: format!("Unable to parse url {path_url}: {e}").into(),
})?;
authorizer.sign(method, &mut url, expires_in);
Ok(url)
}
}
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let mut request = self.client.put_request(location, payload, true);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config.disable_tagging {
request = request.header(&TAGS_HEADER, tags);
}
match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
// Technically If-None-Match should return NotModified but some stores,
// such as R2, instead return PreconditionFailed
// https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject
Err(e @ Error::NotModified { .. } | e @ Error::Precondition { .. }) => {
Err(Error::AlreadyExists {
path: location.to_string(),
source: Box::new(e),
})
}
r => r,
}
}
(PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
d.conditional_op(&self.client, location, None, move || request.do_put())
.await
}
(PutMode::Update(v), Some(put)) => {
let etag = v.e_tag.ok_or_else(|| Error::Generic {
store: STORE,
source: "ETag required for conditional put".to_string().into(),
})?;
match put {
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
}
S3ConditionalPut::Dynamo(d) => {
d.conditional_op(&self.client, location, Some(&etag), move || {
request.do_put()
})
.await
}
}
}
}
}
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
let upload_id = self.client.create_multipart(location).await?;
Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
upload_id: upload_id.clone(),
parts: Default::default(),
}),
}))
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.client.get_opts(location, options).await
}
async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location, &()).await
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
locations
.try_chunks(1_000)
.map(move |locations| async {
// Early return the error. We ignore the paths that have already been
// collected into the chunk.
let locations = locations.map_err(|e| e.1)?;
self.client
.bulk_delete_request(locations)
.await
.map(futures::stream::iter)
})
.buffered(20)
.try_flatten()
.boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
if self.client.config.is_s3_express() {
let offset = offset.clone();
// S3 Express does not support start-after
return self
.client
.list(prefix)
.try_filter(move |f| futures::future::ready(f.location > offset))
.boxed();
}
self.client.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.client.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.client
.copy_request(from, to)
.idempotent(true)
.send()
.await?;
Ok(())
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let (k, v, status) = match &self.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
Some(S3CopyIfNotExists::Dynamo(lock)) => {
return lock.copy_if_not_exists(&self.client, from, to).await
}
None => {
return Err(Error::NotSupported {
source: "S3 does not support copy-if-not-exists".to_string().into(),
})
}
};
let req = self.client.copy_request(from, to);
match req.header(k, v).send().await {
Err(RequestError::Retry { source, path }) if source.status() == Some(status) => {
Err(Error::AlreadyExists {
source: Box::new(source),
path,
})
}
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
}
#[derive(Debug)]
struct S3MultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
}
#[derive(Debug)]
struct UploadState {
parts: Parts,
location: Path,
upload_id: String,
client: Arc<S3Client>,
}
#[async_trait]
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.await?;
state.parts.put(idx, part);
Ok(())
})
}
async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;
self.state
.client
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
.await
}
async fn abort(&mut self) -> Result<()> {
self.state
.client
.delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)])
.await
}
}
#[async_trait]
impl MultipartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}
async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
}
async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.complete_multipart(path, id, parts).await
}
async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
self.client.delete_request(path, &[("uploadId", id)]).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{client::get::GetClient, tests::*};
use hyper::HeaderMap;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn s3_test() {
crate::test_util::maybe_skip_integration!();
let config = AmazonS3Builder::from_env();
let integration = config.build().unwrap();
let config = &integration.client.config;
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();
put_get_delete_list_opts(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
stream_get(&integration).await;
multipart(&integration, &integration).await;
signing(&integration).await;
s3_encryption(&integration).await;
// Object tagging is not supported by S3 Express One Zone
if config.session_provider.is_none() {
tagging(&integration, !config.disable_tagging, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
.await;
}
if test_not_exists {
copy_if_not_exists(&integration).await;
}
if test_conditional_put {
put_opts(&integration, true).await;
}
// run integration test with unsigned payload enabled
let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
let integration = builder.build().unwrap();
put_get_delete_list_opts(&integration).await;
// run integration test with checksum set to sha256
let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
let integration = builder.build().unwrap();
put_get_delete_list_opts(&integration).await;
match &integration.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await,
_ => eprintln!("Skipping dynamo integration test - dynamo not configured"),
};
}
#[tokio::test]
async fn s3_test_get_nonexistent_location() {
crate::test_util::maybe_skip_integration!();
let integration = AmazonS3Builder::from_env().build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
}
#[tokio::test]
async fn s3_test_get_nonexistent_bucket() {
crate::test_util::maybe_skip_integration!();
let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
let integration = config.build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
}
#[tokio::test]
async fn s3_test_put_nonexistent_bucket() {
crate::test_util::maybe_skip_integration!();
let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
let integration = config.build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let data = PutPayload::from("arbitrary data");
let err = integration.put(&location, data).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
}
#[tokio::test]
async fn s3_test_delete_nonexistent_location() {
crate::test_util::maybe_skip_integration!();
let integration = AmazonS3Builder::from_env().build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
integration.delete(&location).await.unwrap();
}
#[tokio::test]
async fn s3_test_delete_nonexistent_bucket() {
crate::test_util::maybe_skip_integration!();
let config = AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
let integration = config.build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
let err = integration.delete(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
}
#[tokio::test]
#[ignore = "Tests shouldn't call use remote services by default"]
async fn test_disable_creds() {
// https://registry.opendata.aws/daylight-osm/
let v1 = AmazonS3Builder::new()
.with_bucket_name("daylight-map-distribution")
.with_region("us-west-1")
.with_access_key_id("local")
.with_secret_access_key("development")
.build()
.unwrap();
let prefix = Path::from("release");
v1.list_with_delimiter(Some(&prefix)).await.unwrap_err();
let v2 = AmazonS3Builder::new()
.with_bucket_name("daylight-map-distribution")
.with_region("us-west-1")
.with_skip_signature(true)
.build()
.unwrap();
v2.list_with_delimiter(Some(&prefix)).await.unwrap();
}
async fn s3_encryption(store: &AmazonS3) {
crate::test_util::maybe_skip_integration!();
let data = PutPayload::from(vec![3u8; 1024]);
let encryption_headers: HeaderMap = store.client.config.encryption_headers.clone().into();
let expected_encryption =
if let Some(encryption_type) = encryption_headers.get("x-amz-server-side-encryption") {
encryption_type
} else {
eprintln!("Skipping S3 encryption test - encryption not configured");
return;
};
let locations = [
Path::from("test-encryption-1"),
Path::from("test-encryption-2"),
Path::from("test-encryption-3"),
];
store.put(&locations[0], data.clone()).await.unwrap();
store.copy(&locations[0], &locations[1]).await.unwrap();
let mut upload = store.put_multipart(&locations[2]).await.unwrap();
upload.put_part(data.clone()).await.unwrap();
upload.complete().await.unwrap();
for location in &locations {
let res = store
.client
.get_request(location, GetOptions::default())
.await
.unwrap();
let headers = res.headers();
assert_eq!(
headers
.get("x-amz-server-side-encryption")
.expect("object is not encrypted"),
expected_encryption
);
store.delete(location).await.unwrap();
}
}
}