Add Attributes API (#5329) (#5650)

* Add Attributes API (#5329)

* Clippy

* Emulator test tweaks
diff --git a/object_store/src/attributes.rs b/object_store/src/attributes.rs
new file mode 100644
index 0000000..9b90b53
--- /dev/null
+++ b/object_store/src/attributes.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::ops::Deref;
+
+/// Additional object attribute types
+#[non_exhaustive]
+#[derive(Debug, Hash, Eq, PartialEq, Clone)]
+pub enum Attribute {
+    /// Specifies the MIME type of the object
+    ///
+    /// This takes precedence over any [ClientOptions](crate::ClientOptions) configuration
+    ///
+    /// See [Content-Type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type)
+    ContentType,
+    /// Overrides cache control policy of the object
+    ///
+    /// See [Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control)
+    CacheControl,
+}
+
+/// The value of an [`Attribute`]
+///
+/// Provides efficient conversion from both static and owned strings
+///
+/// ```
+/// # use object_store::AttributeValue;
+/// // Can use static strings without needing an allocation
+/// let value = AttributeValue::from("bar");
+/// // Can also store owned strings
+/// let value = AttributeValue::from("foo".to_string());
+/// ```
+#[derive(Debug, Hash, Eq, PartialEq, Clone)]
+pub struct AttributeValue(Cow<'static, str>);
+
+impl AsRef<str> for AttributeValue {
+    fn as_ref(&self) -> &str {
+        &self.0
+    }
+}
+
+impl From<&'static str> for AttributeValue {
+    fn from(value: &'static str) -> Self {
+        Self(Cow::Borrowed(value))
+    }
+}
+
+impl From<String> for AttributeValue {
+    fn from(value: String) -> Self {
+        Self(Cow::Owned(value))
+    }
+}
+
+impl Deref for AttributeValue {
+    type Target = str;
+
+    fn deref(&self) -> &Self::Target {
+        self.0.as_ref()
+    }
+}
+
+/// Additional attributes of an object
+///
+/// Attributes can be specified in [PutOptions](crate::PutOptions) and retrieved
+/// from APIs returning [GetResult](crate::GetResult).
+///
+/// Unlike [`ObjectMeta`](crate::ObjectMeta), [`Attributes`] are not returned by
+/// listing APIs
+#[derive(Debug, Default, Eq, PartialEq, Clone)]
+pub struct Attributes(HashMap<Attribute, AttributeValue>);
+
+impl Attributes {
+    /// Create a new empty [`Attributes`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Create a new [`Attributes`] with space for `capacity` [`Attribute`]
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self(HashMap::with_capacity(capacity))
+    }
+
+    /// Insert a new [`Attribute`], [`AttributeValue`] pair
+    ///
+    /// Returns the previous value for `key` if any
+    pub fn insert(&mut self, key: Attribute, value: AttributeValue) -> Option<AttributeValue> {
+        self.0.insert(key, value)
+    }
+
+    /// Returns the [`AttributeValue`] for `key` if any
+    pub fn get(&self, key: &Attribute) -> Option<&AttributeValue> {
+        self.0.get(key)
+    }
+
+    /// Removes the [`AttributeValue`] for `key` if any
+    pub fn remove(&mut self, key: &Attribute) -> Option<AttributeValue> {
+        self.0.remove(key)
+    }
+
+    /// Returns an [`AttributesIter`] over this
+    pub fn iter(&self) -> AttributesIter<'_> {
+        self.into_iter()
+    }
+
+    /// Returns the number of [`Attribute`] in this collection
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    /// Returns true if this contains no [`Attribute`]
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
+}
+
+impl<K, V> FromIterator<(K, V)> for Attributes
+where
+    K: Into<Attribute>,
+    V: Into<AttributeValue>,
+{
+    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+        Self(
+            iter.into_iter()
+                .map(|(k, v)| (k.into(), v.into()))
+                .collect(),
+        )
+    }
+}
+
+impl<'a> IntoIterator for &'a Attributes {
+    type Item = (&'a Attribute, &'a AttributeValue);
+    type IntoIter = AttributesIter<'a>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        AttributesIter(self.0.iter())
+    }
+}
+
+/// Iterator over [`Attributes`]
+#[derive(Debug)]
+pub struct AttributesIter<'a>(std::collections::hash_map::Iter<'a, Attribute, AttributeValue>);
+
+impl<'a> Iterator for AttributesIter<'a> {
+    type Item = (&'a Attribute, &'a AttributeValue);
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.0.next()
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.0.size_hint()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_attributes_basic() {
+        let mut attributes = Attributes::from_iter([
+            (Attribute::ContentType, "test"),
+            (Attribute::CacheControl, "control"),
+        ]);
+
+        assert!(!attributes.is_empty());
+        assert_eq!(attributes.len(), 2);
+
+        assert_eq!(
+            attributes.get(&Attribute::ContentType),
+            Some(&"test".into())
+        );
+
+        let metav = "control".into();
+        assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
+        assert_eq!(
+            attributes.insert(Attribute::CacheControl, "v1".into()),
+            Some(metav)
+        );
+        assert_eq!(attributes.len(), 2);
+
+        assert_eq!(
+            attributes.remove(&Attribute::CacheControl).unwrap(),
+            "v1".into()
+        );
+        assert_eq!(attributes.len(), 1);
+
+        let metav: AttributeValue = "v2".into();
+        attributes.insert(Attribute::CacheControl, metav.clone());
+        assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
+        assert_eq!(attributes.len(), 2);
+    }
+}
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index c1789ed..e81ef6a 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -35,23 +35,21 @@
 use crate::multipart::PartId;
 use crate::path::DELIMITER;
 use crate::{
-    ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result,
-    RetryConfig,
+    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload,
+    PutResult, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
 use hyper::http;
 use hyper::http::HeaderName;
 use itertools::Itertools;
 use md5::{Digest, Md5};
 use percent_encoding::{utf8_percent_encode, PercentEncode};
 use quick_xml::events::{self as xml_events};
-use reqwest::{
-    header::{CONTENT_LENGTH, CONTENT_TYPE},
-    Client as ReqwestClient, Method, RequestBuilder, Response,
-};
+use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, RequestBuilder, Response};
 use ring::digest;
 use ring::digest::Context;
 use serde::{Deserialize, Serialize};
@@ -344,6 +342,7 @@
         &'a self,
         path: &'a Path,
         payload: PutPayload,
+        attributes: Attributes,
         with_encryption_headers: bool,
     ) -> Request<'a> {
         let url = self.config.path_url(path);
@@ -363,8 +362,21 @@
             )
         }
 
-        if let Some(value) = self.config.client_options.get_content_type(path) {
-            builder = builder.header(CONTENT_TYPE, value);
+        let mut has_content_type = false;
+        for (k, v) in &attributes {
+            builder = match k {
+                Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+                Attribute::ContentType => {
+                    has_content_type = true;
+                    builder.header(CONTENT_TYPE, v.as_ref())
+                }
+            };
+        }
+
+        if !has_content_type {
+            if let Some(value) = self.config.client_options.get_content_type(path) {
+                builder = builder.header(CONTENT_TYPE, value);
+            }
         }
 
         Request {
@@ -556,7 +568,7 @@
         let part = (part_idx + 1).to_string();
 
         let response = self
-            .put_request(path, data, false)
+            .put_request(path, data, Attributes::default(), false)
             .query(&[("partNumber", &part), ("uploadId", upload_id)])
             .idempotent(true)
             .send()
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 9e741c9..43bd38a 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -156,7 +156,8 @@
         payload: PutPayload,
         opts: PutOptions,
     ) -> Result<PutResult> {
-        let mut request = self.client.put_request(location, payload, true);
+        let attrs = opts.attributes;
+        let mut request = self.client.put_request(location, payload, attrs, true);
         let tags = opts.tags.encoded();
         if !tags.is_empty() && !self.client.config.disable_tagging {
             request = request.header(&TAGS_HEADER, tags);
@@ -403,7 +404,7 @@
         let test_not_exists = config.copy_if_not_exists.is_some();
         let test_conditional_put = config.conditional_put.is_some();
 
-        put_get_delete_list_opts(&integration).await;
+        put_get_delete_list(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
@@ -412,6 +413,7 @@
         multipart(&integration, &integration).await;
         signing(&integration).await;
         s3_encryption(&integration).await;
+        put_get_attributes(&integration).await;
 
         // Object tagging is not supported by S3 Express One Zone
         if config.session_provider.is_none() {
@@ -432,12 +434,12 @@
         // run integration test with unsigned payload enabled
         let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
         let integration = builder.build().unwrap();
-        put_get_delete_list_opts(&integration).await;
+        put_get_delete_list(&integration).await;
 
         // run integration test with checksum set to sha256
         let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
         let integration = builder.build().unwrap();
-        put_get_delete_list_opts(&integration).await;
+        put_get_delete_list(&integration).await;
 
         match &integration.client.config.copy_if_not_exists {
             Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await,
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index d5972d0..134609e 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -27,14 +27,15 @@
 use crate::path::DELIMITER;
 use crate::util::{deserialize_rfc1123, GetRange};
 use crate::{
-    ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutPayload,
-    PutResult, Result, RetryConfig,
+    Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode,
+    PutOptions, PutPayload, PutResult, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
+use hyper::header::CACHE_CONTROL;
 use hyper::http::HeaderName;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{
@@ -187,9 +188,8 @@
         Self { builder, ..self }
     }
 
-    fn set_idempotent(mut self, idempotent: bool) -> Self {
-        self.idempotent = idempotent;
-        self
+    fn set_idempotent(self, idempotent: bool) -> Self {
+        Self { idempotent, ..self }
     }
 
     async fn send(self) -> Result<Response> {
@@ -199,7 +199,7 @@
             .header(CONTENT_LENGTH, self.payload.content_length())
             .with_azure_authorization(&credential, &self.config.account)
             .retryable(&self.config.retry_config)
-            .idempotent(true)
+            .idempotent(self.idempotent)
             .payload(Some(self.payload))
             .send()
             .await
@@ -233,13 +233,31 @@
         self.config.get_credential().await
     }
 
-    fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> {
+    fn put_request<'a>(
+        &'a self,
+        path: &'a Path,
+        payload: PutPayload,
+        attributes: Attributes,
+    ) -> PutRequest<'a> {
         let url = self.config.path_url(path);
 
         let mut builder = self.client.request(Method::PUT, url);
 
-        if let Some(value) = self.config().client_options.get_content_type(path) {
-            builder = builder.header(CONTENT_TYPE, value);
+        let mut has_content_type = false;
+        for (k, v) in &attributes {
+            builder = match k {
+                Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+                Attribute::ContentType => {
+                    has_content_type = true;
+                    builder.header(CONTENT_TYPE, v.as_ref())
+                }
+            };
+        }
+
+        if !has_content_type {
+            if let Some(value) = self.config.client_options.get_content_type(path) {
+                builder = builder.header(CONTENT_TYPE, value);
+            }
         }
 
         PutRequest {
@@ -258,7 +276,7 @@
         payload: PutPayload,
         opts: PutOptions,
     ) -> Result<PutResult> {
-        let builder = self.put_request(path, payload);
+        let builder = self.put_request(path, payload, opts.attributes);
 
         let builder = match &opts.mode {
             PutMode::Overwrite => builder.set_idempotent(true),
@@ -288,7 +306,7 @@
         let content_id = format!("{part_idx:20}");
         let block_id = BASE64_STANDARD.encode(&content_id);
 
-        self.put_request(path, payload)
+        self.put_request(path, payload, Attributes::default())
             .query(&[("comp", "block"), ("blockid", &block_id)])
             .set_idempotent(true)
             .send()
@@ -304,8 +322,9 @@
             .map(|part| BlockId::from(part.content_id))
             .collect();
 
+        let payload = BlockList { blocks }.to_xml().into();
         let response = self
-            .put_request(path, BlockList { blocks }.to_xml().into())
+            .put_request(path, payload, Attributes::default())
             .query(&[("comp", "blocklist")])
             .set_idempotent(true)
             .send()
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 8dc5242..3bb57c4 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -276,7 +276,7 @@
         crate::test_util::maybe_skip_integration!();
         let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
 
-        put_get_delete_list_opts(&integration).await;
+        put_get_delete_list(&integration).await;
         get_opts(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
@@ -292,7 +292,12 @@
             let client = Arc::clone(&integration.client);
             async move { client.get_blob_tagging(&p).await }
         })
-        .await
+        .await;
+
+        // Azurite doesn't support attributes properly
+        if !integration.client.config().is_emulator {
+            put_get_attributes(&integration).await;
+        }
     }
 
     #[ignore = "Used for manual testing against a real storage account."]
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 2e399e5..f700457 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -19,10 +19,10 @@
 
 use crate::client::header::{header_meta, HeaderConfig};
 use crate::path::Path;
-use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result};
+use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result};
 use async_trait::async_trait;
 use futures::{StreamExt, TryStreamExt};
-use hyper::header::CONTENT_RANGE;
+use hyper::header::{CACHE_CONTROL, CONTENT_RANGE, CONTENT_TYPE};
 use hyper::StatusCode;
 use reqwest::header::ToStrError;
 use reqwest::Response;
@@ -117,6 +117,12 @@
     #[snafu(display("Content-Range header contained non UTF-8 characters"))]
     InvalidContentRange { source: ToStrError },
 
+    #[snafu(display("Cache-Control header contained non UTF-8 characters"))]
+    InvalidCacheControl { source: ToStrError },
+
+    #[snafu(display("Content-Type header contained non UTF-8 characters"))]
+    InvalidContentType { source: ToStrError },
+
     #[snafu(display("Requested {expected:?}, got {actual:?}"))]
     UnexpectedRange {
         expected: Range<usize>,
@@ -161,6 +167,16 @@
         0..meta.size
     };
 
+    let mut attributes = Attributes::new();
+    if let Some(x) = response.headers().get(CACHE_CONTROL) {
+        let x = x.to_str().context(InvalidCacheControlSnafu)?;
+        attributes.insert(Attribute::CacheControl, x.to_string().into());
+    }
+    if let Some(x) = response.headers().get(CONTENT_TYPE) {
+        let x = x.to_str().context(InvalidContentTypeSnafu)?;
+        attributes.insert(Attribute::ContentType, x.to_string().into());
+    }
+
     let stream = response
         .bytes_stream()
         .map_err(|source| crate::Error::Generic {
@@ -172,6 +188,7 @@
     Ok(GetResult {
         range,
         meta,
+        attributes,
         payload: GetResultPayload::Stream(stream),
     })
 }
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 7728f38..3fefbb5 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -485,7 +485,7 @@
     /// mime type if it was defined initially through
     /// `ClientOptions::with_content_type_for_suffix`
     ///
-    /// Otherwise returns the default mime type if it was defined
+    /// Otherwise, returns the default mime type if it was defined
     /// earlier through `ClientOptions::with_default_content_type`
     pub fn get_content_type(&self, path: &Path) -> Option<&str> {
         match path.extension() {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index f91217f..4ee03ea 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -29,14 +29,14 @@
 use crate::path::{Path, DELIMITER};
 use crate::util::hex_encode;
 use crate::{
-    ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutPayload, PutResult,
-    Result, RetryConfig,
+    Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions,
+    PutPayload, PutResult, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::Buf;
-use hyper::header::CONTENT_LENGTH;
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE};
 use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
 use reqwest::header::HeaderName;
 use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
@@ -45,6 +45,7 @@
 use std::sync::Arc;
 
 const VERSION_HEADER: &str = "x-goog-generation";
+const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
 
 static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match");
 
@@ -323,19 +324,31 @@
     /// Perform a put request <https://cloud.google.com/storage/docs/xml-api/put-object-upload>
     ///
     /// Returns the new ETag
-    pub fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> {
+    pub fn put_request<'a>(
+        &'a self,
+        path: &'a Path,
+        payload: PutPayload,
+        attributes: Attributes,
+    ) -> PutRequest<'a> {
         let url = self.object_url(path);
+        let mut builder = self.client.request(Method::PUT, url);
 
-        let content_type = self
-            .config
-            .client_options
-            .get_content_type(path)
-            .unwrap_or("application/octet-stream");
+        let mut has_content_type = false;
+        for (k, v) in &attributes {
+            builder = match k {
+                Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+                Attribute::ContentType => {
+                    has_content_type = true;
+                    builder.header(CONTENT_TYPE, v.as_ref())
+                }
+            };
+        }
 
-        let builder = self
-            .client
-            .request(Method::PUT, url)
-            .header(header::CONTENT_TYPE, content_type);
+        if !has_content_type {
+            let opts = &self.config.client_options;
+            let value = opts.get_content_type(path).unwrap_or(DEFAULT_CONTENT_TYPE);
+            builder = builder.header(CONTENT_TYPE, value)
+        }
 
         PutRequest {
             path,
@@ -352,7 +365,7 @@
         payload: PutPayload,
         opts: PutOptions,
     ) -> Result<PutResult> {
-        let builder = self.put_request(path, payload);
+        let builder = self.put_request(path, payload, opts.attributes);
 
         let builder = match &opts.mode {
             PutMode::Overwrite => builder.set_idempotent(true),
@@ -386,7 +399,7 @@
             ("uploadId", upload_id),
         ];
         let result = self
-            .put_request(path, data)
+            .put_request(path, data, Attributes::new())
             .query(query)
             .set_idempotent(true)
             .send()
@@ -459,7 +472,7 @@
         if completed_parts.is_empty() {
             // GCS doesn't allow empty multipart uploads
             let result = self
-                .put_request(path, Default::default())
+                .put_request(path, Default::default(), Attributes::new())
                 .set_idempotent(true)
                 .send()
                 .await?;
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 149da76..af6e671 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -292,6 +292,8 @@
             // Fake GCS server doesn't currently honor preconditions
             get_opts(&integration).await;
             put_opts(&integration, true).await;
+            // Fake GCS server doesn't currently support attributes
+            put_get_attributes(&integration).await;
         }
     }
 
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 39f68ec..cf25919 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -21,11 +21,11 @@
 use crate::client::GetOptionsExt;
 use crate::path::{Path, DELIMITER};
 use crate::util::deserialize_rfc1123;
-use crate::{ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
+use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
 use async_trait::async_trait;
 use bytes::Buf;
 use chrono::{DateTime, Utc};
-use hyper::header::CONTENT_LENGTH;
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
 use percent_encoding::percent_decode_str;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, Response, StatusCode};
@@ -157,13 +157,32 @@
         Ok(())
     }
 
-    pub async fn put(&self, location: &Path, payload: PutPayload) -> Result<Response> {
+    pub async fn put(
+        &self,
+        location: &Path,
+        payload: PutPayload,
+        attributes: Attributes,
+    ) -> Result<Response> {
         let mut retry = false;
         loop {
             let url = self.path_url(location);
             let mut builder = self.client.put(url);
-            if let Some(value) = self.client_options.get_content_type(location) {
-                builder = builder.header(CONTENT_TYPE, value);
+
+            let mut has_content_type = false;
+            for (k, v) in &attributes {
+                builder = match k {
+                    Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+                    Attribute::ContentType => {
+                        has_content_type = true;
+                        builder.header(CONTENT_TYPE, v.as_ref())
+                    }
+                };
+            }
+
+            if !has_content_type {
+                if let Some(value) = self.client_options.get_content_type(location) {
+                    builder = builder.header(CONTENT_TYPE, value);
+                }
             }
 
             let resp = builder
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index a838a0f..d6ba4f4 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -105,7 +105,7 @@
             return Err(crate::Error::NotImplemented);
         }
 
-        let response = self.client.put(location, payload).await?;
+        let response = self.client.put(location, payload, opts.attributes).await?;
         let e_tag = match get_etag(response.headers()) {
             Ok(e_tag) => Some(e_tag),
             Err(crate::client::header::Error::MissingEtag) => None,
@@ -260,7 +260,7 @@
             .build()
             .unwrap();
 
-        put_get_delete_list_opts(&integration).await;
+        put_get_delete_list(&integration).await;
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 157852f..b492d93 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -543,6 +543,10 @@
 mod upload;
 mod util;
 
+mod attributes;
+
+pub use attributes::*;
+
 pub use parse::{parse_url, parse_url_opts};
 pub use payload::*;
 pub use upload::*;
@@ -989,6 +993,8 @@
     pub meta: ObjectMeta,
     /// The range of bytes returned by this request
     pub range: Range<usize>,
+    /// Additional object attributes
+    pub attributes: Attributes,
 }
 
 /// The kind of a [`GetResult`]
@@ -1114,6 +1120,10 @@
     ///
     /// Implementations that don't support object tagging should ignore this
     pub tags: TagSet,
+    /// Provide a set of [`Attributes`]
+    ///
+    /// Implementations that don't support an attribute should return an error
+    pub attributes: Attributes,
 }
 
 impl From<PutMode> for PutOptions {
@@ -1251,10 +1261,6 @@
     use rand::{thread_rng, Rng};
 
     pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
-        put_get_delete_list_opts(storage).await
-    }
-
-    pub(crate) async fn put_get_delete_list_opts(storage: &DynObjectStore) {
         delete_fixtures(storage).await;
 
         let content_list = flatten_list_stream(storage, None).await.unwrap();
@@ -1674,6 +1680,28 @@
         storage.delete(&path).await.unwrap();
     }
 
+    pub(crate) async fn put_get_attributes(integration: &dyn ObjectStore) {
+        // Test handling of attributes
+        let attributes = Attributes::from_iter([
+            (Attribute::ContentType, "text/html; charset=utf-8"),
+            (Attribute::CacheControl, "max-age=604800"),
+        ]);
+
+        let path = Path::from("attributes");
+        let opts = PutOptions {
+            attributes: attributes.clone(),
+            ..Default::default()
+        };
+        match integration.put_opts(&path, "foo".into(), opts).await {
+            Ok(_) => {
+                let r = integration.get(&path).await.unwrap();
+                assert_eq!(r.attributes, attributes);
+            }
+            Err(Error::NotImplemented) => {}
+            Err(e) => panic!("{e}"),
+        }
+    }
+
     pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
         let path = Path::from("test");
         storage.put(&path, "foo".into()).await.unwrap();
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index d5581cdc..a3695ad 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -38,8 +38,8 @@
     maybe_spawn_blocking,
     path::{absolute_path_to_url, Path},
     util::InvalidGetRange,
-    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
-    PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
+    Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
+    ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
 };
 
 /// A specialized `Error` for filesystem object store-related errors
@@ -346,6 +346,10 @@
             return Err(crate::Error::NotImplemented);
         }
 
+        if !opts.attributes.is_empty() {
+            return Err(crate::Error::NotImplemented);
+        }
+
         let path = self.path_to_filesystem(location)?;
         maybe_spawn_blocking(move || {
             let (mut file, staging_path) = new_staged_upload(&path)?;
@@ -421,6 +425,7 @@
 
             Ok(GetResult {
                 payload: GetResultPayload::File(file, path),
+                attributes: Attributes::default(),
                 range,
                 meta,
             })
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index d42e6f2..e34b28f 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -30,8 +30,9 @@
 use crate::multipart::{MultipartStore, PartId};
 use crate::util::InvalidGetRange;
 use crate::{
-    path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
-    ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart,
+    path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
+    MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result,
+    UpdateVersion, UploadPart,
 };
 use crate::{GetOptions, PutPayload};
 
@@ -88,15 +89,22 @@
 struct Entry {
     data: Bytes,
     last_modified: DateTime<Utc>,
+    attributes: Attributes,
     e_tag: usize,
 }
 
 impl Entry {
-    fn new(data: Bytes, last_modified: DateTime<Utc>, e_tag: usize) -> Self {
+    fn new(
+        data: Bytes,
+        last_modified: DateTime<Utc>,
+        e_tag: usize,
+        attributes: Attributes,
+    ) -> Self {
         Self {
             data,
             last_modified,
             e_tag,
+            attributes,
         }
     }
 }
@@ -116,10 +124,10 @@
 type SharedStorage = Arc<RwLock<Storage>>;
 
 impl Storage {
-    fn insert(&mut self, location: &Path, bytes: Bytes) -> usize {
+    fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
         let etag = self.next_etag;
         self.next_etag += 1;
-        let entry = Entry::new(bytes, Utc::now(), etag);
+        let entry = Entry::new(bytes, Utc::now(), etag, attributes);
         self.overwrite(location, entry);
         etag
     }
@@ -200,7 +208,7 @@
     ) -> Result<PutResult> {
         let mut storage = self.storage.write();
         let etag = storage.next_etag;
-        let entry = Entry::new(payload.into(), Utc::now(), etag);
+        let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
 
         match opts.mode {
             PutMode::Overwrite => storage.overwrite(location, entry),
@@ -247,6 +255,7 @@
 
         Ok(GetResult {
             payload: GetResultPayload::Stream(stream.boxed()),
+            attributes: entry.attributes,
             meta,
             range,
         })
@@ -363,7 +372,9 @@
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
         let entry = self.entry(from).await?;
-        self.storage.write().insert(to, entry.data);
+        self.storage
+            .write()
+            .insert(to, entry.data, entry.attributes);
         Ok(())
     }
 
@@ -376,7 +387,7 @@
             }
             .into());
         }
-        storage.insert(to, entry.data);
+        storage.insert(to, entry.data, entry.attributes);
         Ok(())
     }
 }
@@ -426,7 +437,7 @@
         for x in &upload.parts {
             buf.extend_from_slice(x.as_ref().unwrap())
         }
-        let etag = storage.insert(path, buf.into());
+        let etag = storage.insert(path, buf.into(), Default::default());
         Ok(PutResult {
             e_tag: Some(etag.to_string()),
             version: None,
@@ -492,7 +503,11 @@
         let mut buf = Vec::with_capacity(cap);
         let parts = self.parts.iter().flatten();
         parts.for_each(|x| buf.extend_from_slice(x));
-        let etag = self.storage.write().insert(&self.location, buf.into());
+        let etag = self
+            .storage
+            .write()
+            .insert(&self.location, buf.into(), Attributes::new());
+
         Ok(PutResult {
             e_tag: Some(etag.to_string()),
             version: None,
@@ -523,6 +538,7 @@
         stream_get(&integration).await;
         put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
+        put_get_attributes(&integration).await;
     }
 
     #[tokio::test]