#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
//! # object_store
//! This crate provides a uniform API for interacting with object
//! storage services and local files via the [`ObjectStore`]
//! trait.
//! Using this crate, the same binary and code can run in multiple
//! clouds and local test environments, via a simple runtime
//! configuration change.
//! # Highlights
//! 1. A high-performance async API focused on providing a consistent interface
//! mirroring that of object stores such as [S3]
//! 2. Production quality, leading this crate to be used in large
//! scale production systems, such as [] and [InfluxDB IOx]
//! 3. Support for advanced functionality, including atomic, conditional reads
//! and writes, vectored IO, bulk deletion, and more...
//! 4. Stable and predictable governance via the [Apache Arrow] project
//! 5. Small dependency footprint, depending on only a small number of common crates
//! Originally developed by [InfluxData] and subsequently donated
//! to [Apache Arrow].
//! [Apache Arrow]:
//! [InfluxData]:
//! []:
//! [ACID]:
//! [S3]:
//! # Available [`ObjectStore`] Implementations
//! By default, this crate provides the following implementations:
//! * Memory: [`InMemory`](memory::InMemory)
//! * Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)
//! Feature flags are used to enable support for other implementations:
feature = "gcp",
doc = "* [`gcp`]: [Google Cloud Storage]( support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
feature = "aws",
doc = "* [`aws`]: [Amazon S3]( See [`AmazonS3Builder`](aws::AmazonS3Builder)"
feature = "azure",
doc = "* [`azure`]: [Azure Blob Storage]( See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
feature = "http",
doc = "* [`http`]: [HTTP/WebDAV Storage]( See [`HttpBuilder`](http::HttpBuilder)"
//! # Why not a Filesystem Interface?
//! The [`ObjectStore`] interface is designed to mirror the APIs
//! of object stores and *not* filesystems, and thus has stateless APIs instead
//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
//! This design provides the following advantages:
//! * All operations are atomic, and readers cannot observe partial and/or failed writes
//! * Methods map directly to object store APIs, providing both efficiency and predictability
//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
//! * Allows for functionality not native to filesystems, such as operation preconditions
//! and atomic multipart uploads
//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
//! which provide a more filesystem-like API for working with the
//! [`ObjectStore`] trait, however, they should be used with care
//! [`BufReader`]: buffered::BufReader
//! [`BufWriter`]: buffered::BufWriter
//! # Adapters
//! [`ObjectStore`] instances can be composed with various adapters
//! which add additional functionality:
//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
//! # Configuration System
//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
//! from a URL and an optional list of key value pairs. This provides a flexible interface
//! to support a wide variety of user-defined store configurations, with minimal additional
//! application complexity.
//! ```no_run
//! # #[cfg(feature = "aws")] {
//! # use url::Url;
//! # use object_store::{parse_url, parse_url_opts};
//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
//! #
//! #
//! // Can manually create a specific store variant using the appropriate builder
//! let store: AmazonS3 = AmazonS3Builder::from_env()
//! .with_bucket_name("my-bucket").build().unwrap();
//! // Alternatively can create an ObjectStore from an S3 URL
//! let url = Url::parse("s3://bucket/path").unwrap();
//! let (store, path) = parse_url(&url).unwrap();
//! assert_eq!(path.as_ref(), "path");
//! // Potentially with additional options
//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
//! // Or with URLs that encode the bucket name in the URL path
//! let url = Url::parse("").unwrap();
//! let (store, path) = parse_url(&url).unwrap();
//! assert_eq!(path.as_ref(), "path");
//! # }
//! ```
//! [PyArrow FileSystem]:
//! [fsspec]:
//! [Hadoop FileSystem]:
//! # List objects
//! Use the [`ObjectStore::list`] method to iterate over objects in
//! remote storage or files in the local filesystem:
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
//! # use object_store::{path::Path, ObjectStore};
//! # use futures::stream::StreamExt;
//! # // use LocalFileSystem for example
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! #
//! # async fn example() {
//! #
//! // create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! // Recursively list all files below the 'data' path.
//! // 1. On AWS S3 this would be the 'data/' prefix
//! // 2. On a local filesystem, this would be the 'data' directory
//! let prefix = Path::from("data");
//! // Get an `async` stream of Metadata objects:
//! let mut list_stream = object_store.list(Some(&prefix));
//! // Print a line about each object
//! while let Some(meta) = {
//! println!("Name: {}, size: {}", meta.location, meta.size);
//! }
//! # }
//! ```
//! Which will print out something like the following:
//! ```text
//! Name: data/file01.parquet, size: 112832
//! Name: data/file02.parquet, size: 143119
//! Name: data/child/file03.parquet, size: 100
//! ...
//! ```
//! # Fetch objects
//! Use the [`ObjectStore::get`] method to fetch the data bytes
//! from remote storage or files in the local filesystem as a stream.
//! ```
//! # use futures::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use object_store::{path::Path, ObjectStore, GetResult};
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! #
//! # async fn example() {
//! #
//! // Create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! // Retrieve a specific file
//! let path = Path::from("data/file01.parquet");
//! // Fetch just the file metadata
//! let meta = object_store.head(&path).await.unwrap();
//! println!("{meta:?}");
//! // Fetch the object including metadata
//! let result: GetResult = object_store.get(&path).await.unwrap();
//! assert_eq!(result.meta, meta);
//! // Buffer the entire object in memory
//! let object: Bytes = result.bytes().await.unwrap();
//! assert_eq!(object.len(), meta.size);
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
//! // Count the '0's using `try_fold` from `TryStreamExt` trait
//! let num_zeros = stream
//! .try_fold(0, |acc, bytes| async move {
//! Ok(acc + bytes.iter().filter(|b| **b == 0).count())
//! }).await.unwrap();
//! println!("Num zeros in {} is {}", path, num_zeros);
//! # }
//! ```
//! # Put Object
//! Use the [`ObjectStore::put`] method to atomically write data.
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, PutPayload};
//! # use std::sync::Arc;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn put() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/file1");
//! let payload = PutPayload::from_static(b"hello");
//! object_store.put(&path, payload).await.unwrap();
//! # }
//! ```
//! # Multipart Upload
//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, WriteMultipart};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let upload = object_store.put_multipart(&path).await.unwrap();
//! let mut write = WriteMultipart::new(upload);
//! write.write(b"hello");
//! write.finish().await.unwrap();
//! # }
//! ```
//! # Vectored Read
//! A common pattern, especially when reading structured datasets, is to need to fetch
//! multiple, potentially non-contiguous, ranges of a particular object.
//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::ObjectStore;
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
//! assert_eq!(ranges.len(), 3);
//! assert_eq!(ranges[0].len(), 10);
//! # }
//! ```
//! # Vectored Write
//! When writing data it is often the case that the size of the output is not known ahead of time.
//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
//! this is suboptimal as reallocating memory will often involve copying it to a new location.
//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
//! encapsulates this approach
//! ```
//! # use object_store::local::LocalFileSystem;
//! # use object_store::{ObjectStore, PutPayloadMut};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! # async fn multi_upload() {
//! #
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//! let path = Path::from("data/large_file");
//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
//! for _ in 0..22 {
//! buffer.extend_from_slice(&[0; 1024]);
//! }
//! let payload = buffer.freeze();
//! // Payload consists of 3 separate 8KB allocations
//! assert_eq!(payload.as_ref().len(), 3);
//! assert_eq!(payload.as_ref()[0].len(), 8192);
//! assert_eq!(payload.as_ref()[1].len(), 8192);
//! assert_eq!(payload.as_ref()[2].len(), 6144);
//! object_store.put(&path, payload).await.unwrap();
//! # }
//! ```
//! # Conditional Fetch
//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
//! For example, efficiently refreshing a cache without re-fetching the entire object
//! data if the object hasn't been modified.
//! ```
//! # use std::collections::btree_map::Entry;
//! # use std::collections::HashMap;
//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
//! # use std::sync::Arc;
//! # use std::time::{Duration, Instant};
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::path::Path;
//! struct CacheEntry {
//! /// Data returned by last request
//! data: Bytes,
//! /// ETag identifying the object returned by the server
//! e_tag: String,
//! /// Instant of last refresh
//! refreshed_at: Instant,
//! }
//! /// Example cache that checks entries after 10 seconds for a new version
//! struct Cache {
//! entries: HashMap<Path, CacheEntry>,
//! store: Arc<dyn ObjectStore>,
//! }
//! impl Cache {
//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
//! Ok(match self.entries.get_mut(path) {
//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
//! true =>, // Return cached data
//! false => { // Check if remote version has changed
//! let opts = GetOptions {
//! if_none_match: Some(e.e_tag.clone()),
//! ..GetOptions::default()
//! };
//! match, opts).await {
//! Ok(d) => = d.bytes().await?,
//! Err(Error::NotModified { .. }) => {} // Data has not changed
//! Err(e) => return Err(e),
//! };
//! e.refreshed_at = Instant::now();
//! }
//! },
//! None => { // Not cached, fetch data
//! let get =;
//! let e_tag = get.meta.e_tag.clone();
//! let data = get.bytes().await?;
//! if let Some(e_tag) = e_tag {
//! let entry = CacheEntry {
//! e_tag,
//! data: data.clone(),
//! refreshed_at: Instant::now(),
//! };
//! self.entries.insert(path.clone(), entry);
//! }
//! data
//! }
//! })
//! }
//! }
//! ```
//! # Conditional Put
//! The default behaviour when writing data is to upsert any existing object at the given path,
//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
//! storage, without relying on a separate DBMS.
//! ```
//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
//! # use std::sync::Arc;
//! # use bytes::Bytes;
//! # use tokio::io::AsyncWriteExt;
//! # use object_store::memory::InMemory;
//! # use object_store::path::Path;
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(InMemory::new())
//! # }
//! # fn do_update(b: Bytes) -> Bytes {b}
//! # async fn conditional_put() {
//! let store = get_object_store();
//! let path = Path::from("test");
//! // Perform a conditional update on path
//! loop {
//! // Perform get request
//! let r = store.get(&path).await.unwrap();
//! // Save version information fetched
//! let version = UpdateVersion {
//! e_tag: r.meta.e_tag.clone(),
//! version: r.meta.version.clone(),
//! };
//! // Compute new version of object contents
//! let new = do_update(r.bytes().await.unwrap());
//! // Attempt to commit transaction
//! match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
//! Ok(_) => break, // Successfully committed
//! Err(Error::Precondition { .. }) => continue, // Object has changed, try again
//! Err(e) => panic!("{e}")
//! }
//! }
//! # }
//! ```
//! [Optimistic Concurrency Control]:
//! [Apache Iceberg]:
//! [Delta Lake]:
//! # TLS Certificates
//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA]
//! certificates. By default the system-bundled certificates are used (see
//! [`rustls-native-certs`]). The `tls-webpki-roots` feature switch can be used to also bundle Mozilla's
//! root certificates with the library/application (see [`webpki-roots`]).
//! [CA]:
//! [`rustls-native-certs`]:
//! [`webpki-roots`]:
target_arch = "wasm32",
any(feature = "gcp", feature = "aws", feature = "azure", feature = "http")
compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm.");
#[cfg(feature = "aws")]
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
pub mod buffered;
#[cfg(not(target_arch = "wasm32"))]
pub mod chunked;
pub mod delimited;
#[cfg(feature = "gcp")]
pub mod gcp;
#[cfg(feature = "http")]
pub mod http;
pub mod limit;
#[cfg(not(target_arch = "wasm32"))]
pub mod local;
pub mod memory;
pub mod path;
pub mod prefix;
#[cfg(feature = "cloud")]
pub mod signer;
pub mod throttle;
#[cfg(feature = "cloud")]
mod client;
#[cfg(feature = "cloud")]
pub use client::{
backoff::BackoffConfig, retry::RetryConfig, ClientConfigKey, ClientOptions, CredentialProvider,
#[cfg(feature = "cloud")]
mod config;
mod tags;
pub use tags::TagSet;
pub mod multipart;
mod parse;
mod payload;
mod upload;
mod util;
pub use parse::{parse_url, parse_url_opts};
pub use payload::*;
pub use upload::*;
pub use util::{coalesce_ranges, collect_bytes, GetRange, OBJECT_STORE_COALESCE_DEFAULT};
use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
use crate::util::maybe_spawn_blocking;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
#[cfg(not(target_arch = "wasm32"))]
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use std::sync::Arc;
/// An alias for a dynamically dispatched object store implementation.
pub type DynObjectStore = dyn ObjectStore;
/// Id type for multipart uploads.
pub type MultipartId = String;
/// Universal API to multiple object store services.
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `payload` to `location`, or fail. No clients
/// should be able to observe a partially written object
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
/// Save the provided `payload` to `location` with the given options
async fn put_opts(
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult>;
/// Perform a multipart upload
/// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`] for more information
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>>;
/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
/// Perform a get request with options
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
/// Return the bytes that are stored at the specified location
/// in the given byte range.
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.into()),
self.get_opts(location, options).await?.bytes().await
/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
|range| self.get_range(location, range),
/// Return the metadata for the specified location
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions {
head: true,
Ok(self.get_opts(location, options).await?.meta)
/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;
/// Delete all the objects at the specified locations
/// When supported, this method will use bulk operations that delete more
/// than one object per a request. The default implementation will call
/// the single object delete method for each location, but with up to 10
/// concurrent requests.
/// The returned stream yields the results of the delete operations in the
/// same order as the input locations. However, some errors will be from
/// an overall call to a bulk delete operation, and not from a specific
/// location.
/// If the object did not exist, the result may be an error or a success,
/// depending on the behavior of the underlying store. For example, local
/// filesystems, GCP, and Azure return an error, while S3 and in-memory will
/// return Ok. If it is an error, it will be [`Error::NotFound`].
/// ```
/// # use futures::{StreamExt, TryStreamExt};
/// # use object_store::local::LocalFileSystem;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let root = tempfile::TempDir::new().unwrap();
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
/// # use object_store::{ObjectStore, ObjectMeta};
/// # use object_store::path::Path;
/// # use futures::{StreamExt, TryStreamExt};
/// #
/// // Create two objects
/// store.put(&Path::from("foo"), "foo".into()).await?;
/// store.put(&Path::from("bar"), "bar".into()).await?;
/// // List object
/// let locations = store.list(None).map_ok(|m| m.location).boxed();
/// // Delete them
/// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
/// # Ok(())
/// # }
/// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
/// # rt.block_on(example()).unwrap();
/// ```
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
.map(|location| async {
let location = location?;
/// List all the objects with the given prefix.
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
/// List all the objects with the given prefix and a location greater than `offset`
/// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
/// the number of network requests required
/// Note: the order of returned [`ObjectMeta`] is not guaranteed
fn list_with_offset(
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
let offset = offset.clone();
.try_filter(move |f| futures::future::ready(f.location > offset))
/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
/// Copy an object from one path to another in the same object store.
/// If there exists an object at the destination, it will be overwritten.
async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
/// Move an object from one path to another in the same object store.
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
/// If there exists an object at the destination, it will be overwritten.
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.copy(from, to).await?;
/// Copy an object from one path to another, only if destination is empty.
/// Will return an error if the destination already has an object.
/// Performs an atomic operation if the underlying object storage supports it.
/// If atomic operations are not supported by the underlying object storage (like S3)
/// it will return an error.
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
/// Move an object from one path to another in the same object store.
/// Will return an error if the destination already has an object.
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_if_not_exists(from, to).await?;
macro_rules! as_ref_impl {
($type:ty) => {
impl ObjectStore for $type {
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.as_ref().put(location, payload).await
async fn put_opts(
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
self.as_ref().put_opts(location, payload, opts).await
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
async fn get(&self, location: &Path) -> Result<GetResult> {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.as_ref().get_opts(location, options).await
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
async fn get_ranges(
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
async fn delete(&self, location: &Path) -> Result<()> {
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list_with_offset(
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
self.as_ref().list_with_offset(prefix, offset)
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().copy(from, to).await
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename(from, to).await
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().copy_if_not_exists(from, to).await
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.as_ref().rename_if_not_exists(from, to).await
as_ref_impl!(Arc<dyn ObjectStore>);
as_ref_impl!(Box<dyn ObjectStore>);
/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
pub struct ListResult {
/// Prefixes that are common (like directories)
pub common_prefixes: Vec<Path>,
/// Object metadata for the listing
pub objects: Vec<ObjectMeta>,
/// The metadata that describes an object.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObjectMeta {
/// The full path to the object
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object
pub size: usize,
/// The unique identifier for the object
/// <>
pub e_tag: Option<String>,
/// A version indicator for this object
pub version: Option<String>,
/// Options for a get request, such as range
#[derive(Debug, Default)]
pub struct GetOptions {
/// Request will succeed if the `ObjectMeta::e_tag` matches
/// otherwise returning [`Error::Precondition`]
/// See <>
/// Examples:
/// ```text
/// If-Match: "xyzzy"
/// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
/// If-Match: *
/// ```
pub if_match: Option<String>,
/// Request will succeed if the `ObjectMeta::e_tag` does not match
/// otherwise returning [`Error::NotModified`]
/// See <>
/// Examples:
/// ```text
/// If-None-Match: "xyzzy"
/// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
/// If-None-Match: *
/// ```
pub if_none_match: Option<String>,
/// Request will succeed if the object has been modified since
/// <>
pub if_modified_since: Option<DateTime<Utc>>,
/// Request will succeed if the object has not been modified since
/// otherwise returning [`Error::Precondition`]
/// Some stores, such as S3, will only return `NotModified` for exact
/// timestamp matches, instead of for any timestamp greater than or equal.
/// <>
pub if_unmodified_since: Option<DateTime<Utc>>,
/// Request transfer of only the specified range of bytes
/// otherwise returning [`Error::NotModified`]
/// <>
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
/// <>
pub head: bool,
impl GetOptions {
/// Returns an error if the modification conditions on this request are not satisfied
/// <>
fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
// The use of the invalid etag "*" means no ETag is equivalent to never matching
let etag = meta.e_tag.as_deref().unwrap_or("*");
let last_modified = meta.last_modified;
if let Some(m) = &self.if_match {
if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
return Err(Error::Precondition {
path: meta.location.to_string(),
source: format!("{etag} does not match {m}").into(),
} else if let Some(date) = self.if_unmodified_since {
if last_modified > date {
return Err(Error::Precondition {
path: meta.location.to_string(),
source: format!("{date} < {last_modified}").into(),
if let Some(m) = &self.if_none_match {
if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
return Err(Error::NotModified {
path: meta.location.to_string(),
source: format!("{etag} matches {m}").into(),
} else if let Some(date) = self.if_modified_since {
if last_modified <= date {
return Err(Error::NotModified {
path: meta.location.to_string(),
source: format!("{date} >= {last_modified}").into(),
/// Result for a get request
pub struct GetResult {
/// The [`GetResultPayload`]
pub payload: GetResultPayload,
/// The [`ObjectMeta`] for this object
pub meta: ObjectMeta,
/// The range of bytes returned by this request
pub range: Range<usize>,
/// The kind of a [`GetResult`]
/// This special cases the case of a local file, as some systems may
/// be able to optimise the case of a file already present on local disk
pub enum GetResultPayload {
/// The file, path
File(std::fs::File, std::path::PathBuf),
/// An opaque stream of bytes
Stream(BoxStream<'static, Result<Bytes>>),
impl Debug for GetResultPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::File(_, _) => write!(f, "GetResultPayload(File)"),
Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
impl GetResult {
/// Collects the data into a [`Bytes`]
pub async fn bytes(self) -> Result<Bytes> {
let len = self.range.end - self.range.start;
match self.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || { as _))
.map_err(|source| local::Error::Seek {
path: path.clone(),
let mut buffer = Vec::with_capacity(len);
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
#[cfg(target_arch = "wasm32")]
_ => unimplemented!("File IO not implemented on wasm32."),
/// Converts this into a byte stream
/// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
/// otherwise will return the [`GetResultPayload::Stream`].
/// # Tokio Compatibility
/// Tokio discourages performing blocking IO on a tokio worker thread, however,
/// no major operating systems have stable async file APIs. Therefore if called from
/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
/// If not called from a tokio context, this will perform IO on the current thread with
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
GetResultPayload::Stream(s) => s,
#[cfg(target_arch = "wasm32")]
_ => unimplemented!("File IO not implemented on wasm32."),
/// Configure preconditions for the put operation
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum PutMode {
/// Perform an atomic write operation, overwriting any object present at the provided path
/// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
/// object already exists at the provided path
/// Perform an atomic write operation if the current version of the object matches the
/// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
/// Uniquely identifies a version of an object to update
/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
/// updates, and it is therefore recommended applications preserve both
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UpdateVersion {
/// The unique identifier for the newly created object
/// <>
pub e_tag: Option<String>,
/// A version indicator for the newly created object
pub version: Option<String>,
impl From<PutResult> for UpdateVersion {
fn from(value: PutResult) -> Self {
Self {
e_tag: value.e_tag,
version: value.version,
/// Options for a put request
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct PutOptions {
/// Configure the [`PutMode`] for this operation
pub mode: PutMode,
/// Provide a [`TagSet`] for this object
/// Implementations that don't support object tagging should ignore this
pub tags: TagSet,
impl From<PutMode> for PutOptions {
fn from(mode: PutMode) -> Self {
Self {
impl From<TagSet> for PutOptions {
fn from(tags: TagSet) -> Self {
Self {
/// Result for a put request
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PutResult {
/// The unique identifier for the newly created object
/// <>
pub e_tag: Option<String>,
/// A version indicator for the newly created object
pub version: Option<String>,
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Generic {} error: {}", store, source))]
Generic {
store: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Object at location {} not found: {}", path, source))]
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
display("Encountered object with invalid path: {}", source),
InvalidPath { source: path::Error },
#[snafu(display("Error joining spawned task: {}", source), context(false))]
JoinError { source: tokio::task::JoinError },
#[snafu(display("Operation not supported: {}", source))]
NotSupported {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Object at location {} already exists: {}", path, source))]
AlreadyExists {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Request precondition failure for path {}: {}", path, source))]
Precondition {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Object at location {} not modified: {}", path, source))]
NotModified {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Operation not yet implemented."))]
#[snafu(display("Configuration key: '{}' is not valid for store '{}'.", key, store))]
UnknownConfigurationKey { store: &'static str, key: String },
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
let kind = match &e {
Error::NotFound { .. } => std::io::ErrorKind::NotFound,
_ => std::io::ErrorKind::Other,
Self::new(kind, e)
mod test_util {
use super::*;
use futures::TryStreamExt;
macro_rules! maybe_skip_integration {
() => {
if std::env::var("TEST_INTEGRATION").is_err() {
eprintln!("Skipping integration test - set TEST_INTEGRATION");
pub(crate) use maybe_skip_integration;
pub async fn flatten_list_stream(
storage: &DynObjectStore,
prefix: Option<&Path>,
) -> Result<Vec<Path>> {
.map_ok(|meta| meta.location)
mod tests {
use super::*;
use crate::multipart::MultipartStore;
use crate::test_util::flatten_list_stream;
use chrono::TimeZone;
use futures::stream::FuturesUnordered;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
pub(crate) async fn put_get_delete_list_opts(storage: &DynObjectStore) {
let content_list = flatten_list_stream(storage, None).await.unwrap();
"Expected list to be empty; found: {content_list:?}"
let location = Path::from("test_dir/test_file.json");
let data = Bytes::from("arbitrary data");
storage.put(&location, data.clone().into()).await.unwrap();
let root = Path::from("/");
// List everything
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// Should behave the same as no prefix
let content_list = flatten_list_stream(storage, Some(&root)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// List with delimiter
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
// Should behave the same as no prefix
let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
// Should return not found
let err = storage.get(&Path::from("test_dir")).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// Should return not found
let err = storage.head(&Path::from("test_dir")).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// List everything starting with a prefix that should return results
let prefix = Path::from("test_dir");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that shouldn't return results
let prefix = Path::from("something");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
let read_data = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, data);
// Test range request
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;
let bytes = range_result.unwrap();
assert_eq!(bytes, data.slice(range.clone()));
let opts = GetOptions {
range: Some(GetRange::Bounded(2..5)),
let result = storage.get_opts(&location, opts).await.unwrap();
// Data is `"arbitrary data"`, length 14 bytes
assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
assert_eq!(result.range, 2..5);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bit".as_ref());
let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;
// Should be a non-fatal error
let opts = GetOptions {
range: Some(GetRange::Bounded(2..100)),
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 2..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bitrary data".as_ref());
let opts = GetOptions {
range: Some(GetRange::Suffix(2)),
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 12..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"ta".as_ref());
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
let opts = GetOptions {
range: Some(GetRange::Suffix(100)),
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 0..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"arbitrary data".as_ref());
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
let opts = GetOptions {
range: Some(GetRange::Offset(3)),
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 3..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"itrary data".as_ref());
let opts = GetOptions {
range: Some(GetRange::Offset(100)),
storage.get_opts(&location, opts).await.unwrap_err();
let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, data.slice(range.clone()))
let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, data.len());
let content_list = flatten_list_stream(storage, None).await.unwrap();
let err = storage.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// Test handling of paths containing an encoded delimiter
let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
.put(&file_with_delimiter, "arbitrary".into())
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![file_with_delimiter.clone()]);
let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
let files = storage
let files = storage
assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
let files = storage
.list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
assert_eq!(files.objects.len(), 1);
assert_eq!(files.objects[0].location, file_with_delimiter);
// Test handling of paths containing non-ASCII characters, e.g. emoji
let emoji_prefix = Path::from("🙀");
let emoji_file = Path::from("🙀/😀.parquet");
storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
assert_eq!(files, vec![emoji_file.clone()]);
let dst = Path::from("foo.parquet");
storage.copy(&emoji_file, &dst).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
let dst2 = Path::from("new/nested/foo.parquet");
storage.copy(&emoji_file, &dst2).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);
let dst3 = Path::from("new/nested2/bar.parquet");
storage.rename(&dst, &dst3).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);
let err = storage.head(&dst).await.unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));
let files = flatten_list_stream(storage, Some(&emoji_prefix))
// Test handling of paths containing percent-encoded sequences
// "HELLO" percent encoded
let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
let path = hello_prefix.child("foo.parquet");
storage.put(&path, vec![0, 1].into()).await.unwrap();
let files = flatten_list_stream(storage, Some(&hello_prefix))
assert_eq!(files, vec![path.clone()]);
// Cannot list by decoded representation
let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
// Cannot access by decoded representation
let err = storage
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// Test handling of unicode paths
let path = Path::parse("🇦🇺/$shenanigans@@~.txt").unwrap();
storage.put(&path, "test".into()).await.unwrap();
let r = storage.get(&path).await.unwrap();
assert_eq!(r.bytes().await.unwrap(), "test");
let dir = Path::parse("🇦🇺").unwrap();
let r = storage.list_with_delimiter(None).await.unwrap();
let r = storage.list_with_delimiter(Some(&dir)).await.unwrap();
assert_eq!(r.objects.len(), 1);
assert_eq!(r.objects[0].location, path);
// Can also write non-percent encoded sequences
let path = Path::parse("%Q.parquet").unwrap();
storage.put(&path, vec![0, 1].into()).await.unwrap();
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![path.clone()]);
let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
storage.put(&path, vec![0, 1].into()).await.unwrap();
let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
assert_eq!(files, vec![path.clone()]);
let files = flatten_list_stream(storage, None).await.unwrap();
assert!(files.is_empty(), "{files:?}");
// Test list order
let files = vec![
Path::from("a a/b.file"),
Path::from("a/a file"),
for file in &files {
storage.put(file, "foo".into()).await.unwrap();
let cases = [
(None, Path::from("a")),
(None, Path::from("a/a file")),
(None, Path::from("a/a/b.file")),
(None, Path::from("ab/a.file")),
(None, Path::from("a%2Fa.file")),
(None, Path::from("a/😀.file")),
(Some(Path::from("a")), Path::from("")),
(Some(Path::from("a")), Path::from("a")),
(Some(Path::from("a")), Path::from("a/😀")),
(Some(Path::from("a")), Path::from("a/😀.file")),
(Some(Path::from("a")), Path::from("a/b")),
(Some(Path::from("a")), Path::from("a/a/b.file")),
for (prefix, offset) in cases {
let s = storage.list_with_offset(prefix.as_ref(), &offset);
let mut actual: Vec<_> = s.map_ok(|x| x.location).try_collect().await.unwrap();
let expected: Vec<_> = files
.filter(|x| {
let prefix_match = prefix.as_ref().map(|p| x.prefix_matches(p)).unwrap_or(true);
prefix_match && *x > &offset
assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
// Test bulk delete
let paths = vec![
Path::from("I'm a < & weird path"),
storage.put(&paths[4], "foo".into()).await.unwrap();
let out_paths = storage
assert_eq!(out_paths.len(), paths.len());
let expect_errors = [3];
for (i, input_path) in paths.iter().enumerate() {
let err = storage.head(input_path).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
if expect_errors.contains(&i) {
// Some object stores will report NotFound, but others (such as S3) will
// report success regardless.
match &out_paths[i] {
Err(Error::NotFound { path: out_path, .. }) => {
Ok(out_path) => {
assert_eq!(out_path, input_path);
_ => panic!("unexpected error"),
} else {
assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
let path = Path::from("empty");
storage.put(&path, PutPayload::default()).await.unwrap();
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, 0);
let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(data.len(), 0);
pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
let path = Path::from("test");
storage.put(&path, "foo".into()).await.unwrap();
let meta = storage.head(&path).await.unwrap();
let options = GetOptions {
if_unmodified_since: Some(meta.last_modified),
match storage.get_opts(&path, options).await {
Ok(_) | Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
let options = GetOptions {
if_unmodified_since: Some(
meta.last_modified + chrono::Duration::try_hours(10).unwrap(),
match storage.get_opts(&path, options).await {
Ok(_) | Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
let options = GetOptions {
if_unmodified_since: Some(
meta.last_modified - chrono::Duration::try_hours(10).unwrap(),
match storage.get_opts(&path, options).await {
Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
d => panic!("{d:?}"),
let options = GetOptions {
if_modified_since: Some(meta.last_modified),
match storage.get_opts(&path, options).await {
Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
d => panic!("{d:?}"),
let options = GetOptions {
if_modified_since: Some(meta.last_modified - chrono::Duration::try_hours(10).unwrap()),
match storage.get_opts(&path, options).await {
Ok(_) | Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
let tag = meta.e_tag.unwrap();
let options = GetOptions {
if_match: Some(tag.clone()),
storage.get_opts(&path, options).await.unwrap();
let options = GetOptions {
if_match: Some("invalid".to_string()),
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");
let options = GetOptions {
if_none_match: Some(tag.clone()),
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::NotModified { .. }), "{err}");
let options = GetOptions {
if_none_match: Some("invalid".to_string()),
storage.get_opts(&path, options).await.unwrap();
let result = storage.put(&path, "test".into()).await.unwrap();
let new_tag = result.e_tag.unwrap();
assert_ne!(tag, new_tag);
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.e_tag.unwrap(), new_tag);
let options = GetOptions {
if_match: Some(new_tag),
storage.get_opts(&path, options).await.unwrap();
let options = GetOptions {
if_match: Some(tag),
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");
if let Some(version) = meta.version {
storage.put(&path, "bar".into()).await.unwrap();
let options = GetOptions {
version: Some(version),
// Can retrieve previous version
let get_opts = storage.get_opts(&path, options).await.unwrap();
let old = get_opts.bytes().await.unwrap();
assert_eq!(old, b"test".as_slice());
// Current version contains the updated data
let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(&current, b"bar".as_slice());
pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
// When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB
// As a result each conditional operation will need to wait for the lease to timeout before proceeding
// One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code
// so we instead just generate a random suffix for the filenames
let rng = thread_rng();
let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
let path = Path::from(format!("put_opts_{suffix}"));
let v1 = storage
.put_opts(&path, "a".into(), PutMode::Create.into())
let err = storage
.put_opts(&path, "b".into(), PutMode::Create.into())
assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(b.as_ref(), b"a");
if !supports_update {
let v2 = storage
.put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into())
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(b.as_ref(), b"c");
let err = storage
.put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
assert!(matches!(err, Error::Precondition { .. }), "{err}");
.put_opts(&path, "e".into(), PutMode::Update(v2.clone().into()).into())
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(b.as_ref(), b"e");
// Update not exists
let path = Path::from("I don't exist");
let err = storage
.put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
assert!(matches!(err, Error::Precondition { .. }), "{err}");
const NUM_WORKERS: usize = 5;
const NUM_INCREMENTS: usize = 10;
let path = Path::from(format!("RACE-{suffix}"));
let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
.map(|_| async {
for _ in 0..NUM_INCREMENTS {
loop {
match storage.get(&path).await {
Ok(r) => {
let mode = PutMode::Update(UpdateVersion {
e_tag: r.meta.e_tag.clone(),
version: r.meta.version.clone(),
let b = r.bytes().await.unwrap();
let v: usize = std::str::from_utf8(&b).unwrap().parse().unwrap();
let new = (v + 1).to_string();
match storage.put_opts(&path, new.into(), mode.into()).await {
Ok(_) => break,
Err(Error::Precondition { .. }) => continue,
Err(e) => return Err(e),
Err(Error::NotFound { .. }) => {
let mode = PutMode::Create;
match storage.put_opts(&path, "1".into(), mode.into()).await {
Ok(_) => break,
Err(Error::AlreadyExists { .. }) => continue,
Err(e) => return Err(e),
Err(e) => return Err(e),
while {}
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
/// Returns a chunk of length `chunk_length`
fn get_chunk(chunk_length: usize) -> Bytes {
let mut data = vec![0_u8; chunk_length];
let mut rng = thread_rng();
// Set a random selection of bytes
for _ in 0..1000 {
data[rng.gen_range(0..chunk_length)] = rng.gen();
/// Returns `num_chunks` of length `chunks`
fn get_chunks(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
(0..num_chunks).map(|_| get_chunk(chunk_length)).collect()
pub(crate) async fn stream_get(storage: &DynObjectStore) {
let location = Path::from("test_dir/test_upload_file.txt");
// Can write to storage
let data = get_chunks(5 * 1024 * 1024, 3);
let bytes_expected = data.concat();
let mut upload = storage.put_multipart(&location).await.unwrap();
let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
// Object should not yet exist in store
let meta_res = storage.head(&location).await;
crate::Error::NotFound { .. }
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(&files, &[]);
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
// Can overwrite some storage
// Sizes chosen to ensure we write three parts
let data = get_chunks(3_200_000, 7);
let bytes_expected = data.concat();
let upload = storage.put_multipart(&location).await.unwrap();
let mut writer = WriteMultipart::new(upload);
for chunk in &data {
let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
let mut upload = storage.put_multipart(&location).await.unwrap();
let get_res = storage.get(&location).await;
crate::Error::NotFound { .. }
// We can abort an in-progress write
let mut upload = storage.put_multipart(&location).await.unwrap();
let get_res = storage.get(&location).await;
crate::Error::NotFound { .. }
pub(crate) async fn list_uses_directories_correctly(storage: &DynObjectStore) {
let content_list = flatten_list_stream(storage, None).await.unwrap();
"Expected list to be empty; found: {content_list:?}"
let location1 = Path::from("foo/x.json");
let location2 = Path::from("");
let data = PutPayload::from("arbitrary data");
storage.put(&location1, data.clone()).await.unwrap();
storage.put(&location2, data).await.unwrap();
let prefix = Path::from("foo");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location1.clone()]);
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.objects.len(), 1);
assert_eq!(result.objects[0].location, location1);
assert_eq!(result.common_prefixes, &[]);
// Listing an existing path (file) should return an empty list:
let content_list = flatten_list_stream(storage, Some(&location1))
assert_eq!(content_list, &[]);
let list = storage.list_with_delimiter(Some(&location1)).await.unwrap();
assert_eq!(list.objects, &[]);
assert_eq!(list.common_prefixes, &[]);
let prefix = Path::from("foo/x");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[]);
let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(list.objects, &[]);
assert_eq!(list.common_prefixes, &[]);
pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await.unwrap();
// ==================== do: create files ====================
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
.map(|&s| Path::from(s))
for f in &files {
storage.put(f, data.clone().into()).await.unwrap();
// ==================== check: prefix-list `mydb/wb` (directory) ====================
let prefix = Path::from("mydb/wb");
let expected_000 = Path::from("mydb/wb/000");
let expected_001 = Path::from("mydb/wb/001");
let expected_location = Path::from("mydb/wb/foo.json");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
assert_eq!(result.objects.len(), 1);
let object = &result.objects[0];
assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from("mydb/wb/000/000/001");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.objects.len(), 0);
// ==================== check: prefix-list `not_there` (non-existing prefix) ====================
let prefix = Path::from("not_there");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
// ==================== do: remove all files ====================
for f in &files {
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await.unwrap();
pub(crate) async fn get_nonexistent_object(
storage: &DynObjectStore,
location: Option<Path>,
) -> crate::Result<Bytes> {
let location = location.unwrap_or_else(|| Path::from("this_file_should_not_exist"));
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }));
pub(crate) async fn rename_and_copy(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
// copy() make both objects identical
storage.put(&path1, contents1.clone().into()).await.unwrap();
storage.put(&path2, contents2.clone().into()).await.unwrap();
storage.copy(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
// rename() copies contents and deletes original
storage.put(&path1, contents1.clone().into()).await.unwrap();
storage.put(&path2, contents2.clone().into()).await.unwrap();
storage.rename(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("not_exists_nested/test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
// copy_if_not_exists() errors if destination already exists
storage.put(&path1, contents1.clone().into()).await.unwrap();
storage.put(&path2, contents2.clone().into()).await.unwrap();
let result = storage.copy_if_not_exists(&path1, &path2).await;
crate::Error::AlreadyExists { .. }
// copy_if_not_exists() copies contents and allows deleting original
storage.copy_if_not_exists(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
pub(crate) async fn copy_rename_nonexistent_object(storage: &DynObjectStore) {
// Create empty source object
let path1 = Path::from("test1");
// Create destination object
let path2 = Path::from("test2");
storage.put(&path2, "hello".into()).await.unwrap();
// copy() errors if source does not exist
let result = storage.copy(&path1, &path2).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// rename() errors if source does not exist
let result = storage.rename(&path1, &path2).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// copy_if_not_exists() errors if source does not exist
let result = storage.copy_if_not_exists(&path1, &path2).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore) {
let path = Path::from("test_multipart");
let chunk_size = 5 * 1024 * 1024;
let chunks = get_chunks(chunk_size, 2);
let id = multipart.create_multipart(&path).await.unwrap();
let parts: Vec<_> = futures::stream::iter(chunks)
.map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
.complete_multipart(&path, &id, parts)
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, chunk_size * 2);
// Empty case
let path = Path::from("test_empty_multipart");
let id = multipart.create_multipart(&path).await.unwrap();
let parts = vec![];
.complete_multipart(&path, &id, parts)
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, 0);
#[cfg(any(feature = "azure", feature = "aws"))]
pub(crate) async fn signing<T>(integration: &T)
T: ObjectStore + crate::signer::Signer,
use reqwest::Method;
use std::time::Duration;
let data = Bytes::from("hello world");
let path = Path::from("file.txt");
integration.put(&path, data.clone().into()).await.unwrap();
let signed = integration
.signed_url(Method::GET, &path, Duration::from_secs(60))
let resp = reqwest::get(signed).await.unwrap();
let loaded = resp.bytes().await.unwrap();
assert_eq!(data, loaded);
#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: bool, get_tags: F)
F: Fn(Path) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
use bytes::Buf;
use serde::Deserialize;
struct Tagging {
#[serde(rename = "TagSet")]
list: TagList,
#[derive(Debug, Deserialize)]
struct TagList {
#[serde(rename = "Tag")]
tags: Vec<Tag>,
#[derive(Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "PascalCase")]
struct Tag {
key: String,
value: String,
let tags = vec![
Tag {
key: "".to_string(),
value: "bananas/".to_string(),
Tag {
key: "namespace/".to_string(),
value: "value with a space".to_string(),
let mut tag_set = TagSet::default();
for t in &tags {
tag_set.push(&t.key, &t.value)
let path = Path::from("tag_test");
.put_opts(&path, "test".into(), tag_set.into())
// Write should always succeed, but certain configurations may simply ignore tags
if !validate {
let resp = get_tags(path.clone()).await.unwrap();
let body = resp.bytes().await.unwrap();
let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
assert_eq!(resp.list.tags, tags);
async fn delete_fixtures(storage: &DynObjectStore) {
let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
/// Test that the returned stream does not borrow the lifetime of Path
fn list_store<'a>(
store: &'a dyn ObjectStore,
path_str: &str,
) -> BoxStream<'a, Result<ObjectMeta>> {
let path = Path::from(path_str);
async fn test_list_lifetimes() {
let store = memory::InMemory::new();
let mut stream = list_store(&store, "path");
fn test_preconditions() {
let mut meta = ObjectMeta {
location: Path::from("test"),
last_modified: Utc.timestamp_nanos(100),
size: 100,
e_tag: Some("123".to_string()),
version: None,
let mut options = GetOptions::default();
options.if_modified_since = Some(Utc.timestamp_nanos(50));
options.if_modified_since = Some(Utc.timestamp_nanos(100));
options.if_modified_since = Some(Utc.timestamp_nanos(101));
options = GetOptions::default();
options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
options = GetOptions::default();
options.if_match = Some("123".to_string());
options.if_match = Some("123,354".to_string());
options.if_match = Some("354, 123,".to_string());
options.if_match = Some("354".to_string());
options.if_match = Some("*".to_string());
// If-Match takes precedence
options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
options = GetOptions::default();
options.if_none_match = Some("123".to_string());
options.if_none_match = Some("*".to_string());
options.if_none_match = Some("1232".to_string());
options.if_none_match = Some("23, 123".to_string());
// If-None-Match takes precedence
options.if_modified_since = Some(Utc.timestamp_nanos(10));
// Check missing ETag
meta.e_tag = None;
options = GetOptions::default();
options.if_none_match = Some("*".to_string()); // Fails if any file exists
options = GetOptions::default();
options.if_match = Some("*".to_string()); // Passes if file exists