Add Attributes API (#5329) (#5650)
* Add Attributes API (#5329)
* Clippy
* Emulator test tweaks
diff --git a/object_store/src/attributes.rs b/object_store/src/attributes.rs
new file mode 100644
index 0000000..9b90b53
--- /dev/null
+++ b/object_store/src/attributes.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::ops::Deref;
+
+/// Additional object attribute types
+#[non_exhaustive]
+#[derive(Debug, Hash, Eq, PartialEq, Clone)]
+pub enum Attribute {
+ /// Specifies the MIME type of the object
+ ///
+ /// This takes precedence over any [ClientOptions](crate::ClientOptions) configuration
+ ///
+ /// See [Content-Type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type)
+ ContentType,
+ /// Overrides cache control policy of the object
+ ///
+ /// See [Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control)
+ CacheControl,
+}
+
+/// The value of an [`Attribute`]
+///
+/// Provides efficient conversion from both static and owned strings
+///
+/// ```
+/// # use object_store::AttributeValue;
+/// // Can use static strings without needing an allocation
+/// let value = AttributeValue::from("bar");
+/// // Can also store owned strings
+/// let value = AttributeValue::from("foo".to_string());
+/// ```
+#[derive(Debug, Hash, Eq, PartialEq, Clone)]
+pub struct AttributeValue(Cow<'static, str>);
+
+impl AsRef<str> for AttributeValue {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
+impl From<&'static str> for AttributeValue {
+ fn from(value: &'static str) -> Self {
+ Self(Cow::Borrowed(value))
+ }
+}
+
+impl From<String> for AttributeValue {
+ fn from(value: String) -> Self {
+ Self(Cow::Owned(value))
+ }
+}
+
+impl Deref for AttributeValue {
+ type Target = str;
+
+ fn deref(&self) -> &Self::Target {
+ self.0.as_ref()
+ }
+}
+
+/// Additional attributes of an object
+///
+/// Attributes can be specified in [PutOptions](crate::PutOptions) and retrieved
+/// from APIs returning [GetResult](crate::GetResult).
+///
+/// Unlike [`ObjectMeta`](crate::ObjectMeta), [`Attributes`] are not returned by
+/// listing APIs
+#[derive(Debug, Default, Eq, PartialEq, Clone)]
+pub struct Attributes(HashMap<Attribute, AttributeValue>);
+
+impl Attributes {
+ /// Create a new empty [`Attributes`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Create a new [`Attributes`] with space for `capacity` [`Attribute`]
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self(HashMap::with_capacity(capacity))
+ }
+
+ /// Insert a new [`Attribute`], [`AttributeValue`] pair
+ ///
+ /// Returns the previous value for `key` if any
+ pub fn insert(&mut self, key: Attribute, value: AttributeValue) -> Option<AttributeValue> {
+ self.0.insert(key, value)
+ }
+
+ /// Returns the [`AttributeValue`] for `key` if any
+ pub fn get(&self, key: &Attribute) -> Option<&AttributeValue> {
+ self.0.get(key)
+ }
+
+ /// Removes the [`AttributeValue`] for `key` if any
+ pub fn remove(&mut self, key: &Attribute) -> Option<AttributeValue> {
+ self.0.remove(key)
+ }
+
+ /// Returns an [`AttributesIter`] over this
+ pub fn iter(&self) -> AttributesIter<'_> {
+ self.into_iter()
+ }
+
+ /// Returns the number of [`Attribute`] in this collection
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ /// Returns true if this contains no [`Attribute`]
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+}
+
+impl<K, V> FromIterator<(K, V)> for Attributes
+where
+ K: Into<Attribute>,
+ V: Into<AttributeValue>,
+{
+ fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+ Self(
+ iter.into_iter()
+ .map(|(k, v)| (k.into(), v.into()))
+ .collect(),
+ )
+ }
+}
+
+impl<'a> IntoIterator for &'a Attributes {
+ type Item = (&'a Attribute, &'a AttributeValue);
+ type IntoIter = AttributesIter<'a>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ AttributesIter(self.0.iter())
+ }
+}
+
+/// Iterator over [`Attributes`]
+#[derive(Debug)]
+pub struct AttributesIter<'a>(std::collections::hash_map::Iter<'a, Attribute, AttributeValue>);
+
+impl<'a> Iterator for AttributesIter<'a> {
+ type Item = (&'a Attribute, &'a AttributeValue);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.next()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.size_hint()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_attributes_basic() {
+ let mut attributes = Attributes::from_iter([
+ (Attribute::ContentType, "test"),
+ (Attribute::CacheControl, "control"),
+ ]);
+
+ assert!(!attributes.is_empty());
+ assert_eq!(attributes.len(), 2);
+
+ assert_eq!(
+ attributes.get(&Attribute::ContentType),
+ Some(&"test".into())
+ );
+
+ let metav = "control".into();
+ assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
+ assert_eq!(
+ attributes.insert(Attribute::CacheControl, "v1".into()),
+ Some(metav)
+ );
+ assert_eq!(attributes.len(), 2);
+
+ assert_eq!(
+ attributes.remove(&Attribute::CacheControl).unwrap(),
+ "v1".into()
+ );
+ assert_eq!(attributes.len(), 1);
+
+ let metav: AttributeValue = "v2".into();
+ attributes.insert(Attribute::CacheControl, metav.clone());
+ assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
+ assert_eq!(attributes.len(), 2);
+ }
+}
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index c1789ed..e81ef6a 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -35,23 +35,21 @@
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
- ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload, PutResult, Result,
- RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload,
+ PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
use hyper::http;
use hyper::http::HeaderName;
use itertools::Itertools;
use md5::{Digest, Md5};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
-use reqwest::{
- header::{CONTENT_LENGTH, CONTENT_TYPE},
- Client as ReqwestClient, Method, RequestBuilder, Response,
-};
+use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, RequestBuilder, Response};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
@@ -344,6 +342,7 @@
&'a self,
path: &'a Path,
payload: PutPayload,
+ attributes: Attributes,
with_encryption_headers: bool,
) -> Request<'a> {
let url = self.config.path_url(path);
@@ -363,8 +362,21 @@
)
}
- if let Some(value) = self.config.client_options.get_content_type(path) {
- builder = builder.header(CONTENT_TYPE, value);
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) = self.config.client_options.get_content_type(path) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
}
Request {
@@ -556,7 +568,7 @@
let part = (part_idx + 1).to_string();
let response = self
- .put_request(path, data, false)
+ .put_request(path, data, Attributes::default(), false)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true)
.send()
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 9e741c9..43bd38a 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -156,7 +156,8 @@
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let mut request = self.client.put_request(location, payload, true);
+ let attrs = opts.attributes;
+ let mut request = self.client.put_request(location, payload, attrs, true);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config.disable_tagging {
request = request.header(&TAGS_HEADER, tags);
@@ -403,7 +404,7 @@
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
@@ -412,6 +413,7 @@
multipart(&integration, &integration).await;
signing(&integration).await;
s3_encryption(&integration).await;
+ put_get_attributes(&integration).await;
// Object tagging is not supported by S3 Express One Zone
if config.session_provider.is_none() {
@@ -432,12 +434,12 @@
// run integration test with unsigned payload enabled
let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
let integration = builder.build().unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
// run integration test with checksum set to sha256
let builder = AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
let integration = builder.build().unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
match &integration.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Dynamo(d)) => dynamo::integration_test(&integration, d).await,
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index d5972d0..134609e 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -27,14 +27,15 @@
use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
- ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutPayload,
- PutResult, Result, RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode,
+ PutOptions, PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
+use hyper::header::CACHE_CONTROL;
use hyper::http::HeaderName;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
@@ -187,9 +188,8 @@
Self { builder, ..self }
}
- fn set_idempotent(mut self, idempotent: bool) -> Self {
- self.idempotent = idempotent;
- self
+ fn set_idempotent(self, idempotent: bool) -> Self {
+ Self { idempotent, ..self }
}
async fn send(self) -> Result<Response> {
@@ -199,7 +199,7 @@
.header(CONTENT_LENGTH, self.payload.content_length())
.with_azure_authorization(&credential, &self.config.account)
.retryable(&self.config.retry_config)
- .idempotent(true)
+ .idempotent(self.idempotent)
.payload(Some(self.payload))
.send()
.await
@@ -233,13 +233,31 @@
self.config.get_credential().await
}
- fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> {
+ fn put_request<'a>(
+ &'a self,
+ path: &'a Path,
+ payload: PutPayload,
+ attributes: Attributes,
+ ) -> PutRequest<'a> {
let url = self.config.path_url(path);
let mut builder = self.client.request(Method::PUT, url);
- if let Some(value) = self.config().client_options.get_content_type(path) {
- builder = builder.header(CONTENT_TYPE, value);
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) = self.config.client_options.get_content_type(path) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
}
PutRequest {
@@ -258,7 +276,7 @@
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let builder = self.put_request(path, payload);
+ let builder = self.put_request(path, payload, opts.attributes);
let builder = match &opts.mode {
PutMode::Overwrite => builder.set_idempotent(true),
@@ -288,7 +306,7 @@
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);
- self.put_request(path, payload)
+ self.put_request(path, payload, Attributes::default())
.query(&[("comp", "block"), ("blockid", &block_id)])
.set_idempotent(true)
.send()
@@ -304,8 +322,9 @@
.map(|part| BlockId::from(part.content_id))
.collect();
+ let payload = BlockList { blocks }.to_xml().into();
let response = self
- .put_request(path, BlockList { blocks }.to_xml().into())
+ .put_request(path, payload, Attributes::default())
.query(&[("comp", "blocklist")])
.set_idempotent(true)
.send()
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 8dc5242..3bb57c4 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -276,7 +276,7 @@
crate::test_util::maybe_skip_integration!();
let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
@@ -292,7 +292,12 @@
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
- .await
+ .await;
+
+ // Azurite doesn't support attributes properly
+ if !integration.client.config().is_emulator {
+ put_get_attributes(&integration).await;
+ }
}
#[ignore = "Used for manual testing against a real storage account."]
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 2e399e5..f700457 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -19,10 +19,10 @@
use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
-use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result};
+use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
-use hyper::header::CONTENT_RANGE;
+use hyper::header::{CACHE_CONTROL, CONTENT_RANGE, CONTENT_TYPE};
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
@@ -117,6 +117,12 @@
#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },
+ #[snafu(display("Cache-Control header contained non UTF-8 characters"))]
+ InvalidCacheControl { source: ToStrError },
+
+ #[snafu(display("Content-Type header contained non UTF-8 characters"))]
+ InvalidContentType { source: ToStrError },
+
#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
expected: Range<usize>,
@@ -161,6 +167,16 @@
0..meta.size
};
+ let mut attributes = Attributes::new();
+ if let Some(x) = response.headers().get(CACHE_CONTROL) {
+ let x = x.to_str().context(InvalidCacheControlSnafu)?;
+ attributes.insert(Attribute::CacheControl, x.to_string().into());
+ }
+ if let Some(x) = response.headers().get(CONTENT_TYPE) {
+ let x = x.to_str().context(InvalidContentTypeSnafu)?;
+ attributes.insert(Attribute::ContentType, x.to_string().into());
+ }
+
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
@@ -172,6 +188,7 @@
Ok(GetResult {
range,
meta,
+ attributes,
payload: GetResultPayload::Stream(stream),
})
}
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 7728f38..3fefbb5 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -485,7 +485,7 @@
/// mime type if it was defined initially through
/// `ClientOptions::with_content_type_for_suffix`
///
- /// Otherwise returns the default mime type if it was defined
+ /// Otherwise, returns the default mime type if it was defined
/// earlier through `ClientOptions::with_default_content_type`
pub fn get_content_type(&self, path: &Path) -> Option<&str> {
match path.extension() {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index f91217f..4ee03ea 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -29,14 +29,14 @@
use crate::path::{Path, DELIMITER};
use crate::util::hex_encode;
use crate::{
- ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutPayload, PutResult,
- Result, RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions,
+ PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Buf;
-use hyper::header::CONTENT_LENGTH;
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE};
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::header::HeaderName;
use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
@@ -45,6 +45,7 @@
use std::sync::Arc;
const VERSION_HEADER: &str = "x-goog-generation";
+const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
static VERSION_MATCH: HeaderName = HeaderName::from_static("x-goog-if-generation-match");
@@ -323,19 +324,31 @@
/// Perform a put request <https://cloud.google.com/storage/docs/xml-api/put-object-upload>
///
/// Returns the new ETag
- pub fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) -> PutRequest<'a> {
+ pub fn put_request<'a>(
+ &'a self,
+ path: &'a Path,
+ payload: PutPayload,
+ attributes: Attributes,
+ ) -> PutRequest<'a> {
let url = self.object_url(path);
+ let mut builder = self.client.request(Method::PUT, url);
- let content_type = self
- .config
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
- let builder = self
- .client
- .request(Method::PUT, url)
- .header(header::CONTENT_TYPE, content_type);
+ if !has_content_type {
+ let opts = &self.config.client_options;
+ let value = opts.get_content_type(path).unwrap_or(DEFAULT_CONTENT_TYPE);
+ builder = builder.header(CONTENT_TYPE, value)
+ }
PutRequest {
path,
@@ -352,7 +365,7 @@
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let builder = self.put_request(path, payload);
+ let builder = self.put_request(path, payload, opts.attributes);
let builder = match &opts.mode {
PutMode::Overwrite => builder.set_idempotent(true),
@@ -386,7 +399,7 @@
("uploadId", upload_id),
];
let result = self
- .put_request(path, data)
+ .put_request(path, data, Attributes::new())
.query(query)
.set_idempotent(true)
.send()
@@ -459,7 +472,7 @@
if completed_parts.is_empty() {
// GCS doesn't allow empty multipart uploads
let result = self
- .put_request(path, Default::default())
+ .put_request(path, Default::default(), Attributes::new())
.set_idempotent(true)
.send()
.await?;
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 149da76..af6e671 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -292,6 +292,8 @@
// Fake GCS server doesn't currently honor preconditions
get_opts(&integration).await;
put_opts(&integration, true).await;
+ // Fake GCS server doesn't currently support attributes
+ put_get_attributes(&integration).await;
}
}
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 39f68ec..cf25919 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -21,11 +21,11 @@
use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
-use crate::{ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
+use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
use async_trait::async_trait;
use bytes::Buf;
use chrono::{DateTime, Utc};
-use hyper::header::CONTENT_LENGTH;
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
use percent_encoding::percent_decode_str;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, Response, StatusCode};
@@ -157,13 +157,32 @@
Ok(())
}
- pub async fn put(&self, location: &Path, payload: PutPayload) -> Result<Response> {
+ pub async fn put(
+ &self,
+ location: &Path,
+ payload: PutPayload,
+ attributes: Attributes,
+ ) -> Result<Response> {
let mut retry = false;
loop {
let url = self.path_url(location);
let mut builder = self.client.put(url);
- if let Some(value) = self.client_options.get_content_type(location) {
- builder = builder.header(CONTENT_TYPE, value);
+
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL, v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) = self.client_options.get_content_type(location) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
}
let resp = builder
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index a838a0f..d6ba4f4 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -105,7 +105,7 @@
return Err(crate::Error::NotImplemented);
}
- let response = self.client.put(location, payload).await?;
+ let response = self.client.put(location, payload, opts.attributes).await?;
let e_tag = match get_etag(response.headers()) {
Ok(e_tag) => Some(e_tag),
Err(crate::client::header::Error::MissingEtag) => None,
@@ -260,7 +260,7 @@
.build()
.unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 157852f..b492d93 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -543,6 +543,10 @@
mod upload;
mod util;
+mod attributes;
+
+pub use attributes::*;
+
pub use parse::{parse_url, parse_url_opts};
pub use payload::*;
pub use upload::*;
@@ -989,6 +993,8 @@
pub meta: ObjectMeta,
/// The range of bytes returned by this request
pub range: Range<usize>,
+ /// Additional object attributes
+ pub attributes: Attributes,
}
/// The kind of a [`GetResult`]
@@ -1114,6 +1120,10 @@
///
/// Implementations that don't support object tagging should ignore this
pub tags: TagSet,
+ /// Provide a set of [`Attributes`]
+ ///
+ /// Implementations that don't support an attribute should return an error
+ pub attributes: Attributes,
}
impl From<PutMode> for PutOptions {
@@ -1251,10 +1261,6 @@
use rand::{thread_rng, Rng};
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
- put_get_delete_list_opts(storage).await
- }
-
- pub(crate) async fn put_get_delete_list_opts(storage: &DynObjectStore) {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
@@ -1674,6 +1680,28 @@
storage.delete(&path).await.unwrap();
}
+ pub(crate) async fn put_get_attributes(integration: &dyn ObjectStore) {
+ // Test handling of attributes
+ let attributes = Attributes::from_iter([
+ (Attribute::ContentType, "text/html; charset=utf-8"),
+ (Attribute::CacheControl, "max-age=604800"),
+ ]);
+
+ let path = Path::from("attributes");
+ let opts = PutOptions {
+ attributes: attributes.clone(),
+ ..Default::default()
+ };
+ match integration.put_opts(&path, "foo".into(), opts).await {
+ Ok(_) => {
+ let r = integration.get(&path).await.unwrap();
+ assert_eq!(r.attributes, attributes);
+ }
+ Err(Error::NotImplemented) => {}
+ Err(e) => panic!("{e}"),
+ }
+ }
+
pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
let path = Path::from("test");
storage.put(&path, "foo".into()).await.unwrap();
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index d5581cdc..a3695ad 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -38,8 +38,8 @@
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
- GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
- PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
+ Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
+ ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
/// A specialized `Error` for filesystem object store-related errors
@@ -346,6 +346,10 @@
return Err(crate::Error::NotImplemented);
}
+ if !opts.attributes.is_empty() {
+ return Err(crate::Error::NotImplemented);
+ }
+
let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, staging_path) = new_staged_upload(&path)?;
@@ -421,6 +425,7 @@
Ok(GetResult {
payload: GetResultPayload::File(file, path),
+ attributes: Attributes::default(),
range,
meta,
})
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index d42e6f2..e34b28f 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -30,8 +30,9 @@
use crate::multipart::{MultipartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
- path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
- ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart,
+ path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId,
+ MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result,
+ UpdateVersion, UploadPart,
};
use crate::{GetOptions, PutPayload};
@@ -88,15 +89,22 @@
struct Entry {
data: Bytes,
last_modified: DateTime<Utc>,
+ attributes: Attributes,
e_tag: usize,
}
impl Entry {
- fn new(data: Bytes, last_modified: DateTime<Utc>, e_tag: usize) -> Self {
+ fn new(
+ data: Bytes,
+ last_modified: DateTime<Utc>,
+ e_tag: usize,
+ attributes: Attributes,
+ ) -> Self {
Self {
data,
last_modified,
e_tag,
+ attributes,
}
}
}
@@ -116,10 +124,10 @@
type SharedStorage = Arc<RwLock<Storage>>;
impl Storage {
- fn insert(&mut self, location: &Path, bytes: Bytes) -> usize {
+ fn insert(&mut self, location: &Path, bytes: Bytes, attributes: Attributes) -> usize {
let etag = self.next_etag;
self.next_etag += 1;
- let entry = Entry::new(bytes, Utc::now(), etag);
+ let entry = Entry::new(bytes, Utc::now(), etag, attributes);
self.overwrite(location, entry);
etag
}
@@ -200,7 +208,7 @@
) -> Result<PutResult> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
- let entry = Entry::new(payload.into(), Utc::now(), etag);
+ let entry = Entry::new(payload.into(), Utc::now(), etag, opts.attributes);
match opts.mode {
PutMode::Overwrite => storage.overwrite(location, entry),
@@ -247,6 +255,7 @@
Ok(GetResult {
payload: GetResultPayload::Stream(stream.boxed()),
+ attributes: entry.attributes,
meta,
range,
})
@@ -363,7 +372,9 @@
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let entry = self.entry(from).await?;
- self.storage.write().insert(to, entry.data);
+ self.storage
+ .write()
+ .insert(to, entry.data, entry.attributes);
Ok(())
}
@@ -376,7 +387,7 @@
}
.into());
}
- storage.insert(to, entry.data);
+ storage.insert(to, entry.data, entry.attributes);
Ok(())
}
}
@@ -426,7 +437,7 @@
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
- let etag = storage.insert(path, buf.into());
+ let etag = storage.insert(path, buf.into(), Default::default());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
@@ -492,7 +503,11 @@
let mut buf = Vec::with_capacity(cap);
let parts = self.parts.iter().flatten();
parts.for_each(|x| buf.extend_from_slice(x));
- let etag = self.storage.write().insert(&self.location, buf.into());
+ let etag = self
+ .storage
+ .write()
+ .insert(&self.location, buf.into(), Attributes::new());
+
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
@@ -523,6 +538,7 @@
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
+ put_get_attributes(&integration).await;
}
#[tokio::test]