use crate::aws::builder::S3EncryptionHeaders;
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{
AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE,
use crate::client::get::GetClient;
use crate::client::header::{get_etag, HeaderConfig};
use crate::client::header::{get_put_result, get_version};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result,
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use hyper::http;
use hyper::http::HeaderName;
use itertools::Itertools;
use md5::{Digest, Md5};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
use reqwest::{
Client as ReqwestClient, Method, RequestBuilder, Response,
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
const VERSION_HEADER: &str = "x-amz-version-id";
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
pub(crate) enum Error {
#[snafu(display("Error fetching get response body {}: {}", path, source))]
GetResponseBody {
source: reqwest::Error,
path: String,
#[snafu(display("Error performing DeleteObjects request: {}", source))]
DeleteObjectsRequest { source: crate::client::retry::Error },
"DeleteObjects request failed for key {}: {} (code: {})",
DeleteFailed {
path: String,
code: String,
message: String,
#[snafu(display("Error getting DeleteObjects response body: {}", source))]
DeleteObjectsResponse { source: reqwest::Error },
#[snafu(display("Got invalid DeleteObjects response: {}", source))]
InvalidDeleteObjectsResponse {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Error performing list request: {}", source))]
ListRequest { source: crate::client::retry::Error },
#[snafu(display("Error getting list response body: {}", source))]
ListResponseBody { source: reqwest::Error },
#[snafu(display("Error performing create multipart request: {}", source))]
CreateMultipartRequest { source: crate::client::retry::Error },
#[snafu(display("Error getting create multipart response body: {}", source))]
CreateMultipartResponseBody { source: reqwest::Error },
#[snafu(display("Error performing complete multipart request: {}", source))]
CompleteMultipartRequest { source: crate::client::retry::Error },
#[snafu(display("Error getting complete multipart response body: {}", source))]
CompleteMultipartResponseBody { source: reqwest::Error },
#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },
#[snafu(display("Got invalid multipart response: {}", source))]
InvalidMultipartResponse { source: quick_xml::de::DeError },
#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
Self::Generic {
store: STORE,
source: Box::new(err),
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
#[serde(rename = "$value")]
content: Vec<DeleteObjectResult>,
enum DeleteObjectResult {
#[serde(rename_all = "PascalCase", rename = "Deleted")]
struct DeletedObject {
key: String,
#[serde(rename_all = "PascalCase", rename = "Error")]
struct DeleteError {
key: String,
code: String,
message: String,
impl From<DeleteError> for Error {
fn from(err: DeleteError) -> Self {
Self::DeleteFailed {
path: err.key,
code: err.code,
message: err.message,
pub struct S3Config {
pub region: String,
pub endpoint: Option<String>,
pub bucket: String,
pub bucket_endpoint: String,
pub credentials: AwsCredentialProvider,
pub session_provider: Option<AwsCredentialProvider>,
pub retry_config: RetryConfig,
pub client_options: ClientOptions,
pub sign_payload: bool,
pub skip_signature: bool,
pub disable_tagging: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
pub encryption_headers: S3EncryptionHeaders,
impl S3Config {
pub(crate) fn path_url(&self, path: &Path) -> String {
format!("{}/{}", self.bucket_endpoint, encode_path(path))
async fn get_session_credential(&self) -> Result<SessionCredential<'_>> {
let credential = match self.skip_signature {
false => {
let provider = self.session_provider.as_ref().unwrap_or(&self.credentials);
true => None,
Ok(SessionCredential {
session_token: self.session_provider.is_some(),
config: self,
pub(crate) async fn get_credential(&self) -> Result<Option<Arc<AwsCredential>>> {
Ok(match self.skip_signature {
false => Some(self.credentials.get_credential().await?),
true => None,
pub(crate) fn is_s3_express(&self) -> bool {
struct SessionCredential<'a> {
credential: Option<Arc<AwsCredential>>,
session_token: bool,
config: &'a S3Config,
impl<'a> SessionCredential<'a> {
fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
let mut authorizer =
AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region)
if self.session_token {
let token = HeaderName::from_static("x-amz-s3session-token");
authorizer = authorizer.with_token_header(token)
#[derive(Debug, Snafu)]
pub enum RequestError {
Generic { source: crate::Error },
Retry {
source: crate::client::retry::Error,
path: String,
impl From<RequestError> for crate::Error {
fn from(value: RequestError) -> Self {
match value {
RequestError::Generic { source } => source,
RequestError::Retry { source, path } => source.error(STORE, path),
/// A builder for a request allowing customisation of the headers and query string
pub(crate) struct Request<'a> {
path: &'a Path,
config: &'a S3Config,
builder: RequestBuilder,
payload_sha256: Option<digest::Digest>,
payload: Option<PutPayload>,
use_session_creds: bool,
idempotent: bool,
impl<'a> Request<'a> {
pub fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
let builder = self.builder.query(query);
Self { builder, ..self }
pub fn header<K>(self, k: K, v: &str) -> Self
HeaderName: TryFrom<K>,
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
let builder = self.builder.header(k, v);
Self { builder, ..self }
pub fn idempotent(mut self, idempotent: bool) -> Self {
self.idempotent = idempotent;
pub async fn send(self) -> Result<Response, RequestError> {
let credential = match self.use_session_creds {
true => self.config.get_session_credential().await?,
false => SessionCredential {
credential: self.config.get_credential().await?,
session_token: false,
config: self.config,
let sha = self.payload_sha256.as_ref().map(|x| x.as_ref());
let path = self.path.as_ref();
.with_aws_sigv4(credential.authorizer(), sha)
.context(RetrySnafu { path })
pub async fn do_put(self) -> Result<PutResult> {
let response = self.send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
pub(crate) struct S3Client {
pub config: S3Config,
pub client: ReqwestClient,
impl S3Client {
pub fn new(config: S3Config) -> Result<Self> {
let client = config.client_options.client()?;
Ok(Self { config, client })
/// Make an S3 PUT request <>
/// Returns the ETag
pub fn put_request<'a>(
&'a self,
path: &'a Path,
payload: PutPayload,
with_encryption_headers: bool,
) -> Request<'a> {
let url = self.config.path_url(path);
let mut builder = self.client.request(Method::PUT, url);
if with_encryption_headers {
builder = builder.headers(self.config.encryption_headers.clone().into());
let mut sha256 = Context::new(&digest::SHA256);
payload.iter().for_each(|x| sha256.update(x));
let payload_sha256 = sha256.finish();
if let Some(Checksum::SHA256) = self.config.checksum {
builder = builder.header(
if let Some(value) = self.config.client_options.get_content_type(path) {
builder = builder.header(CONTENT_TYPE, value);
Request {
builder: builder.header(CONTENT_LENGTH, payload.content_length()),
payload: Some(payload),
payload_sha256: Some(payload_sha256),
config: &self.config,
use_session_creds: true,
idempotent: false,
/// Make an S3 Delete request <>
pub async fn delete_request<T: Serialize + ?Sized + Sync>(
path: &Path,
query: &T,
) -> Result<()> {
let credential = self.config.get_session_credential().await?;
let url = self.config.path_url(path);
.request(Method::DELETE, url)
.with_aws_sigv4(credential.authorizer(), None)
.map_err(|e| e.error(STORE, path.to_string()))?;
/// Make an S3 Delete Objects request <>
/// Produces a vector of results, one for each path in the input vector. If
/// the delete was successful, the path is returned in the `Ok` variant. If
/// there was an error for a certain path, the error will be returned in the
/// vector. If there was an issue with making the overall request, an error
/// will be returned at the top level.
pub async fn bulk_delete_request(&self, paths: Vec<Path>) -> Result<Vec<Result<Path>>> {
if paths.is_empty() {
return Ok(Vec::new());
let credential = self.config.get_session_credential().await?;
let url = format!("{}?delete", self.config.bucket_endpoint);
let mut buffer = Vec::new();
let mut writer = quick_xml::Writer::new(&mut buffer);
.with_attributes([("xmlns", "")]),
for path in &paths {
// <Object><Key>{path}</Key></Object>
.map_err(|err| crate::Error::Generic {
store: STORE,
source: Box::new(err),
let body = Bytes::from(buffer);
let mut builder = self.client.request(Method::POST, url);
let digest = digest::digest(&digest::SHA256, &body);
builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(digest));
// S3 *requires* DeleteObjects to include a Content-MD5 header:
// > "The Content-MD5 request header is required for all Multi-Object Delete requests"
// Some platforms, like MinIO, enforce this requirement and fail requests without the header.
let mut hasher = Md5::new();
builder = builder.header("Content-MD5", BASE64_STANDARD.encode(hasher.finalize()));
let response = builder
.header(CONTENT_TYPE, "application/xml")
.with_aws_sigv4(credential.authorizer(), Some(digest.as_ref()))
.context(DeleteObjectsRequestSnafu {})?
.context(DeleteObjectsResponseSnafu {})?;
let response: BatchDeleteResponse =
quick_xml::de::from_reader(response.reader()).map_err(|err| {
Error::InvalidDeleteObjectsResponse {
source: Box::new(err),
// Assume all were ok, then fill in errors. This guarantees output order
// matches input order.
let mut results: Vec<Result<Path>> = paths.iter().cloned().map(Ok).collect();
for content in response.content.into_iter() {
if let DeleteObjectResult::Error(error) = content {
let path =
Path::parse(&error.key).map_err(|err| Error::InvalidDeleteObjectsResponse {
source: Box::new(err),
let i = paths.iter().find_position(|&p| p == &path).unwrap().0;
results[i] = Err(Error::from(error).into());
/// Make an S3 Copy request <>
pub fn copy_request<'a>(&'a self, from: &'a Path, to: &Path) -> Request<'a> {
let url = self.config.path_url(to);
let source = format!("{}/{}", self.config.bucket, encode_path(from));
let builder = self
.request(Method::PUT, url)
.header("x-amz-copy-source", source)
Request {
path: from,
config: &self.config,
payload: None,
payload_sha256: None,
use_session_creds: false,
idempotent: false,
pub async fn create_multipart(&self, location: &Path) -> Result<MultipartId> {
let credential = self.config.get_session_credential().await?;
let url = format!("{}?uploads=", self.config.path_url(location),);
let response = self
.request(Method::POST, url)
.with_aws_sigv4(credential.authorizer(), None)
let response: InitiateMultipartUploadResult =
pub async fn put_part(
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: PutPayload,
) -> Result<PartId> {
let part = (part_idx + 1).to_string();
let response = self
.put_request(path, data, false)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
Ok(PartId { content_id })
pub async fn complete_multipart(
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
) -> Result<PutResult> {
let parts = if parts.is_empty() {
// If no parts were uploaded, upload an empty part
// otherwise the completion request will fail
let part = self
.put_part(location, &upload_id.to_string(), 0, PutPayload::default())
} else {
let request = CompleteMultipartUpload::from(parts);
let body = quick_xml::se::to_string(&request).unwrap();
let credential = self.config.get_session_credential().await?;
let url = self.config.path_url(location);
let response = self
.request(Method::POST, url)
.query(&[("uploadId", upload_id)])
.with_aws_sigv4(credential.authorizer(), None)
let version = get_version(response.headers(), VERSION_HEADER).context(MetadataSnafu)?;
let data = response
let response: CompleteMultipartUploadResult =
Ok(PutResult {
e_tag: Some(response.e_tag),
pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.config.get_session_credential().await?;
let url = format!("{}?tagging", self.config.path_url(path));
let response = self
.request(Method::GET, url)
.with_aws_sigv4(credential.authorizer(), None)
.map_err(|e| e.error(STORE, path.to_string()))?;
impl GetClient for S3Client {
const STORE: &'static str = STORE;
const HEADER_CONFIG: HeaderConfig = HeaderConfig {
etag_required: false,
last_modified_required: false,
version_header: Some(VERSION_HEADER),
/// Make an S3 GET request <>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
let credential = self.config.get_session_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
true => Method::HEAD,
false => Method::GET,
let mut builder = self.client.request(method, url);
if let Some(v) = &options.version {
builder = builder.query(&[("versionId", v)])
let response = builder
.with_aws_sigv4(credential.authorizer(), None)
.map_err(|e| e.error(STORE, path.to_string()))?;
impl ListClient for S3Client {
/// Make an S3 List request <>
async fn list_request(
prefix: Option<&str>,
delimiter: bool,
token: Option<&str>,
offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
let credential = self.config.get_session_credential().await?;
let url = self.config.bucket_endpoint.clone();
let mut query = Vec::with_capacity(4);
if let Some(token) = token {
query.push(("continuation-token", token))
if delimiter {
query.push(("delimiter", DELIMITER))
query.push(("list-type", "2"));
if let Some(prefix) = prefix {
query.push(("prefix", prefix))
if let Some(offset) = offset {
query.push(("start-after", offset))
let response = self
.request(Method::GET, &url)
.with_aws_sigv4(credential.authorizer(), None)
let mut response: ListResponse =
let token = response.next_continuation_token.take();
Ok((response.try_into()?, token))
fn encode_path(path: &Path) -> PercentEncode<'_> {
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)