blob: 4a56b03bfb726ad95d15fc468778e5e426cb6cd1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
//! # object_store
//! This crate provides APIs for interacting with object storage services.
//! It currently supports PUT, GET, DELETE, HEAD and list for:
//! * [Google Cloud Storage](
//! * [Amazon S3](
//! * [Azure Blob Storage](
//! * In-memory
//! * Local file storage
#[cfg(feature = "aws")]
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
#[cfg(feature = "gcp")]
pub mod gcp;
pub mod local;
pub mod memory;
pub mod path;
pub mod throttle;
#[cfg(feature = "gcp")]
mod oauth;
#[cfg(feature = "gcp")]
mod token;
mod util;
use crate::path::Path;
use crate::util::{collect_bytes, maybe_spawn_blocking};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
/// An alias for a dynamically dispatched object store implementation.
pub type DynObjectStore = dyn ObjectStore;
/// Universal API to multiple object store services.
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Save the provided bytes to the specified location.
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult>;
/// Return the bytes that are stored at the specified location
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;
/// Return the metadata for the specified location
async fn head(&self, location: &Path) -> Result<ObjectMeta>;
/// Delete the object at the specified location.
async fn delete(&self, location: &Path) -> Result<()>;
/// List all the objects with the given prefix.
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
async fn list(
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
/// List objects with the given prefix and an implementation specific
/// delimiter. Returns common prefixes (directories) in addition to object
/// metadata.
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
/// Copy an object from one path to another in the same object store.
/// If there exists an object at the destination, it will be overwritten.
async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
/// Move an object from one path to another in the same object store.
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
/// If there exists an object at the destination, it will be overwritten.
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.copy(from, to).await?;
/// Copy an object from one path to another, only if destination is empty.
/// Will return an error if the destination already has an object.
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
/// Move an object from one path to another in the same object store.
/// Will return an error if the destination already has an object.
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_if_not_exists(from, to).await?;
/// Result of a list call that includes objects, prefixes (directories) and a
/// token for the next set of results. Individual result sets may be limited to
/// 1,000 objects based on the underlying object storage's limitations.
pub struct ListResult {
/// Prefixes that are common (like directories)
pub common_prefixes: Vec<Path>,
/// Object metadata for the listing
pub objects: Vec<ObjectMeta>,
/// The metadata that describes an object.
#[derive(Debug, Clone, PartialEq)]
pub struct ObjectMeta {
/// The full path to the object
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object
pub size: usize,
/// Result for a get request
/// This special cases the case of a local file, as some systems may
/// be able to optimise the case of a file already present on local disk
pub enum GetResult {
/// A file and its path on the local filesystem
File(std::fs::File, std::path::PathBuf),
/// An asynchronous stream
Stream(BoxStream<'static, Result<Bytes>>),
impl Debug for GetResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::File(_, _) => write!(f, "GetResult(File)"),
Self::Stream(_) => write!(f, "GetResult(Stream)"),
impl GetResult {
/// Collects the data into a [`Bytes`]
pub async fn bytes(self) -> Result<Bytes> {
match self {
Self::File(mut file, path) => {
maybe_spawn_blocking(move || {
let len =|source| {
local::Error::Seek {
path: path.clone(),
})?;|source| {
local::Error::Seek {
path: path.clone(),
let mut buffer = Vec::with_capacity(len as usize);
file.read_to_end(&mut buffer).map_err(|source| {
local::Error::UnableToReadBytes { source, path }
Self::Stream(s) => collect_bytes(s, None).await,
/// Converts this into a byte stream
/// If the result is [`Self::File`] will perform chunked reads of the file, otherwise
/// will return the [`Self::Stream`].
/// # Tokio Compatibility
/// Tokio discourages performing blocking IO on a tokio worker thread, however,
/// no major operating systems have stable async file APIs. Therefore if called from
/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
/// If not called from a tokio context, this will perform IO on the current thread with
/// no additional complexity or overheads
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self {
Self::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
(file, path, false),
|(mut file, path, finished)| {
maybe_spawn_blocking(move || {
if finished {
return Ok(None);
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let read = file
.take(CHUNK_SIZE as u64)
.read_to_end(&mut buffer)
.map_err(|e| local::Error::UnableToReadBytes {
source: e,
path: path.clone(),
Ok(Some((buffer.into(), (file, path, read != CHUNK_SIZE))))
Self::Stream(s) => s,
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Generic {} error: {}", store, source))]
Generic {
store: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Object at location {} not found: {}", path, source))]
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
display("Encountered object with invalid path: {}", source),
InvalidPath { source: path::Error },
#[snafu(display("Error joining spawned task: {}", source), context(false))]
JoinError { source: tokio::task::JoinError },
#[snafu(display("Operation not supported: {}", source))]
NotSupported {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Object at location {} already exists: {}", path, source))]
AlreadyExists {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
#[snafu(display("Operation not yet implemented."))]
#[cfg(feature = "gcp")]
#[snafu(display("OAuth error: {}", source), context(false))]
OAuth { source: oauth::Error },
mod test_util {
use super::*;
use futures::TryStreamExt;
pub async fn flatten_list_stream(
storage: &DynObjectStore,
prefix: Option<&Path>,
) -> Result<Vec<Path>> {
.map_ok(|meta| meta.location)
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) -> Result<()> {
let store_str = storage.to_string();
let content_list = flatten_list_stream(storage, None).await?;
"Expected list to be empty; found: {:?}",
let location = Path::from("test_dir/test_file.json");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
storage.put(&location, data).await?;
let root = Path::from("/");
// List everything
let content_list = flatten_list_stream(storage, None).await?;
assert_eq!(content_list, &[location.clone()]);
// Should behave the same as no prefix
let content_list = flatten_list_stream(storage, Some(&root)).await?;
assert_eq!(content_list, &[location.clone()]);
// List with delimiter
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
// Should behave the same as no prefix
let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
// List everything starting with a prefix that should return results
let prefix = Path::from("test_dir");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that shouldn't return results
let prefix = Path::from("something");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
let read_data = storage.get(&location).await?.bytes().await?;
assert_eq!(&*read_data, expected_data);
// Test range request
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;
let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;
if store_str.starts_with("MicrosoftAzureEmulator") {
// Azurite doesn't support x-ms-range-get-content-crc64 set by Azure SDK
let err = range_result.unwrap_err().to_string();
assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
let err = out_of_range_result.unwrap_err().to_string();
assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
} else {
let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range));
// Should be a non-fatal error
let head = storage.head(&location).await?;
assert_eq!(head.size, expected_data.len());
let content_list = flatten_list_stream(storage, None).await?;
let err = storage.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// Test handling of paths containing an encoded delimiter
let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
.put(&file_with_delimiter, Bytes::from("arbitrary"))
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![file_with_delimiter.clone()]);
let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
let files = storage
let files = storage
assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
let files = storage
.list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
assert_eq!(files.objects.len(), 1);
assert_eq!(files.objects[0].location, file_with_delimiter);
// Test handling of paths containing non-ASCII characters, e.g. emoji
let emoji_prefix = Path::from("🙀");
let emoji_file = Path::from("🙀/😀.parquet");
.put(&emoji_file, Bytes::from("arbitrary"))
let files = flatten_list_stream(storage, Some(&emoji_prefix))
assert_eq!(files, vec![emoji_file.clone()]);
let files = flatten_list_stream(storage, Some(&emoji_prefix))
pub(crate) async fn list_uses_directories_correctly(
storage: &DynObjectStore,
) -> Result<()> {
let content_list = flatten_list_stream(storage, None).await?;
"Expected list to be empty; found: {:?}",
let location1 = Path::from("foo/x.json");
let location2 = Path::from("");
let data = Bytes::from("arbitrary data");
storage.put(&location1, data.clone()).await?;
storage.put(&location2, data).await?;
let prefix = Path::from("foo");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[location1.clone()]);
let prefix = Path::from("foo/x");
let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
assert_eq!(content_list, &[]);
pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) -> Result<()> {
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await?;
// ==================== do: create files ====================
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
.map(|&s| Path::from(s))
for f in &files {
let data = data.clone();
storage.put(f, data).await.unwrap();
// ==================== check: prefix-list `mydb/wb` (directory) ====================
let prefix = Path::from("mydb/wb");
let expected_000 = Path::from("mydb/wb/000");
let expected_001 = Path::from("mydb/wb/001");
let expected_location = Path::from("mydb/wb/foo.json");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
assert_eq!(result.objects.len(), 1);
let object = &result.objects[0];
assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from("mydb/wb/000/000/001");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.objects.len(), 0);
// ==================== check: prefix-list `not_there` (non-existing prefix) ====================
let prefix = Path::from("not_there");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
// ==================== do: remove all files ====================
for f in &files {
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await?;
pub(crate) async fn get_nonexistent_object(
storage: &DynObjectStore,
location: Option<Path>,
) -> crate::Result<Bytes> {
let location =
location.unwrap_or_else(|| Path::from("this_file_should_not_exist"));
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }));
pub(crate) async fn rename_and_copy(storage: &DynObjectStore) -> Result<()> {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
// copy() make both objects identical
storage.put(&path1, contents1.clone()).await?;
storage.put(&path2, contents2.clone()).await?;
storage.copy(&path1, &path2).await?;
let new_contents = storage.get(&path2).await?.bytes().await?;
assert_eq!(&new_contents, &contents1);
// rename() copies contents and deletes original
storage.put(&path1, contents1.clone()).await?;
storage.put(&path2, contents2.clone()).await?;
storage.rename(&path1, &path2).await?;
let new_contents = storage.get(&path2).await?.bytes().await?;
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) -> Result<()> {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
// copy_if_not_exists() errors if destination already exists
storage.put(&path1, contents1.clone()).await?;
storage.put(&path2, contents2.clone()).await?;
let result = storage.copy_if_not_exists(&path1, &path2).await;
crate::Error::AlreadyExists { .. }
// copy_if_not_exists() copies contents and allows deleting original
storage.copy_if_not_exists(&path1, &path2).await?;
let new_contents = storage.get(&path2).await?.bytes().await?;
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
async fn delete_fixtures(storage: &DynObjectStore) {
let paths = flatten_list_stream(storage, None).await.unwrap();
for f in &paths {
let _ = storage.delete(f).await;
/// Test that the returned stream does not borrow the lifetime of Path
async fn list_store<'a, 'b>(
store: &'a dyn ObjectStore,
path_str: &'b str,
) -> super::Result<BoxStream<'a, super::Result<ObjectMeta>>> {
let path = Path::from(path_str);
async fn test_list_lifetimes() {
let store = memory::InMemory::new();
let stream = list_store(&store, "path").await.unwrap();
assert_eq!(stream.count().await, 0);
// Tests TODO:
// GET nonexisting location (in_memory/file)
// DELETE nonexisting location
// PUT overwriting