Donate `object_store` code from object_store_rs to arrow-rs (#2081)
* Import https://github.com/influxdata/object_store_rs/commit/3c51870ac41a90942c2e45bb499a893d514ed1da
* Add object_store to workspace, update notes and readme
* Remove old github items
* Remove old gitignore
* Remove kodiak config
* Remove redundant license files
* Remove influx specific security policy
* Remove redudant rust-toolchain and rustfmt
* Add Apache License (RAT)
* ignore bubble_up_io_errors test
* Fix list_store with explicit lifetime, only run `test_list_root` on linux
* Only run object_store throttle tests on a mac
diff --git a/Cargo.toml b/Cargo.toml
index 2837f02..9bf55c0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,6 +23,7 @@
"parquet_derive_test",
"arrow-flight",
"integration-testing",
+ "object_store",
]
# Enable the version 2 feature resolver, which avoids unifying features for targets that are not being built
#
diff --git a/object_store/.circleci/config.yml b/object_store/.circleci/config.yml
new file mode 100644
index 0000000..b4dff6d
--- /dev/null
+++ b/object_store/.circleci/config.yml
@@ -0,0 +1,262 @@
+---
+# CI Overview
+# -----------
+#
+# Each night:
+#
+# A build image is created (ci_image) from `docker/Dockerfile.ci` and is
+# pushed to `quay.io/influxdb/rust:ci`. This build image is then used to run
+# the CI tasks for the day.
+#
+# Every commit:
+#
+# The CI for every PR and merge to main runs tests, fmt, lints and compiles debug binaries
+#
+# On main if all these checks pass it will then additionally compile in "release" mode and
+# publish a docker image to quay.io/influxdb/iox:$COMMIT_SHA
+#
+# Manual CI Image:
+#
+# It is possible to manually trigger a rebuild of the image used in CI. To do this, navigate to
+# https://app.circleci.com/pipelines/github/influxdata/influxdb_iox?branch=main (overriding the
+# branch name if desired). Then:
+# - Click "Run Pipeline" in the top-right
+# - Expand "Add Parameters"
+# - Add a "boolean" parameter called "ci_image" with the value true
+# - Click "Run Pipeline"
+#
+# If you refresh the page you should see a newly running ci_image workflow
+#
+
+version: 2.1
+
+orbs:
+ win: circleci/windows@4.1
+
+commands:
+ rust_components:
+ description: Verify installed components
+ steps:
+ - run:
+ name: Verify installed components
+ command: |
+ rustup --version
+ rustup show
+ cargo fmt --version
+ cargo clippy --version
+
+ cache_restore:
+ description: Restore Cargo Cache
+ steps:
+ - restore_cache:
+ name: Restoring Cargo Cache
+ keys:
+ - cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.lock" }}
+ - cargo-cache-{{ arch }}-{{ .Branch }}
+ - cargo-cache
+ cache_save:
+ description: Save Cargo Cache
+ steps:
+ - save_cache:
+ name: Save Cargo Cache
+ paths:
+ - /usr/local/cargo/registry
+ key: cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.lock" }}
+
+jobs:
+ fmt:
+ docker:
+ - image: quay.io/influxdb/rust:ci
+ environment:
+ # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+ CARGO_INCREMENTAL: "0"
+ # Disable full debug symbol generation to speed up CI build
+ # "1" means line tables only, which is useful for panic tracebacks.
+ RUSTFLAGS: "-C debuginfo=1"
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ steps:
+ - checkout
+ - rust_components
+ - cache_restore
+ - run:
+ name: Rust fmt
+ command: cargo fmt --all -- --check
+ - cache_save
+ lint:
+ docker:
+ - image: quay.io/influxdb/rust:ci
+ environment:
+ # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+ CARGO_INCREMENTAL: "0"
+ # Disable full debug symbol generation to speed up CI build
+ # "1" means line tables only, which is useful for panic tracebacks.
+ RUSTFLAGS: "-C debuginfo=1"
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ steps:
+ - checkout
+ - rust_components
+ - cache_restore
+ - run:
+ name: Clippy
+ command: cargo clippy --all-targets --all-features --workspace -- -D warnings
+ - cache_save
+ cargo_audit:
+ docker:
+ - image: quay.io/influxdb/rust:ci
+ environment:
+ # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+ CARGO_INCREMENTAL: "0"
+ # Disable full debug symbol generation to speed up CI build
+ # "1" means line tables only, which is useful for panic tracebacks.
+ RUSTFLAGS: "-C debuginfo=1"
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ steps:
+ - checkout
+ - rust_components
+ - cache_restore
+ - run:
+ name: Install cargo-deny
+ command: cargo install --force cargo-deny
+ - run:
+ name: cargo-deny Checks
+ command: cargo deny check -s
+ - cache_save
+ check:
+ docker:
+ - image: quay.io/influxdb/rust:ci
+ environment:
+ # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+ CARGO_INCREMENTAL: "0"
+ # Disable full debug symbol generation to speed up CI build
+ # "1" means line tables only, which is useful for panic tracebacks.
+ RUSTFLAGS: "-C debuginfo=1"
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ steps:
+ - checkout
+ - rust_components
+ - cache_restore
+ - run:
+ name: Install cargo-hack
+ command: cargo install cargo-hack
+ - run:
+ name: Check all features
+ command: cargo hack check --feature-powerset --no-dev-deps --workspace
+ - cache_save
+ doc:
+ docker:
+ - image: quay.io/influxdb/rust:ci
+ environment:
+ # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+ CARGO_INCREMENTAL: "0"
+ # Disable full debug symbol generation to speed up CI build
+ # "1" means line tables only, which is useful for panic tracebacks.
+ RUSTFLAGS: "-C debuginfo=1"
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ steps:
+ - checkout
+ - rust_components
+ - cache_restore
+ - run:
+ name: Cargo doc
+ # excluding datafusion because it's effectively a dependency masqueraded as workspace crate.
+ command: cargo doc --document-private-items --no-deps --workspace --exclude datafusion
+ - cache_save
+ - run:
+ name: Compress Docs
+ command: tar -cvzf rustdoc.tar.gz target/doc/
+ - store_artifacts:
+ path: rustdoc.tar.gz
+ test:
+ # setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
+ docker:
+ - image: quay.io/influxdb/rust:ci
+ - image: localstack/localstack:0.14.4
+ - image: mcr.microsoft.com/azure-storage/azurite
+ - image: fsouza/fake-gcs-server
+ command:
+ - "-scheme"
+ - "http"
+ resource_class: 2xlarge # use of a smaller executor tends crashes on link
+ environment:
+ # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+ CARGO_INCREMENTAL: "0"
+ # Disable full debug symbol generation to speed up CI build
+ # "1" means line tables only, which is useful for panic tracebacks.
+ RUSTFLAGS: "-C debuginfo=1"
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ RUST_BACKTRACE: "1"
+ # Run integration tests
+ TEST_INTEGRATION: 1
+ AWS_DEFAULT_REGION: "us-east-1"
+ AWS_ACCESS_KEY_ID: test
+ AWS_SECRET_ACCESS_KEY: test
+ AWS_ENDPOINT: http://127.0.0.1:4566
+ AZURE_USE_EMULATOR: "1"
+ GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
+ OBJECT_STORE_BUCKET: test-bucket
+ steps:
+ - run:
+ name: Setup localstack (AWS emulation)
+ command: |
+ cd /tmp
+ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
+ unzip awscliv2.zip
+ sudo ./aws/install
+ aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
+ - run:
+ name: Setup Azurite (Azure emulation)
+ # the magical connection string is from https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings
+ command: |
+ curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
+ az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
+ - run:
+ name: Setup fake GCS server
+ command: |
+ curl -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
+ echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT"
+ - checkout
+ - rust_components
+ - cache_restore
+ - run:
+ name: Cargo test
+ command: cargo test --workspace --features=aws,azure,azure_test,gcp
+ - cache_save
+
+ test_windows:
+ executor:
+ name: win/default
+ size: medium
+ environment:
+ # https://github.com/rust-lang/cargo/issues/10280
+ CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+ steps:
+ - checkout
+ - run:
+ name: Download rustup
+ command: wget https://win.rustup.rs/x86_64 -O rustup-init.exe
+ - run:
+ name: Install rustup
+ command: .\rustup-init.exe -y --default-host=x86_64-pc-windows-msvc
+ - run:
+ name: Cargo test
+ command: cargo test --workspace
+
+workflows:
+ version: 2
+
+ # CI for all pull requests.
+ ci:
+ jobs:
+ - check
+ - fmt
+ - lint
+ - cargo_audit
+ - test
+ - test_windows
+ - doc
diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
new file mode 100644
index 0000000..2e216dd
--- /dev/null
+++ b/object_store/CONTRIBUTING.md
@@ -0,0 +1,94 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Development instructions
+
+## Running Tests
+
+Tests can be run using `cargo`
+
+```shell
+cargo test
+```
+
+## Running Integration Tests
+
+By default, integration tests are not run. To run them you will need to set `TEST_INTEGRATION=1` and then provide the
+necessary configuration for that object store
+
+### AWS
+
+To test the S3 integration against [localstack](https://localstack.cloud/)
+
+First start up a container running localstack
+
+```
+$ podman run --rm -it -p 4566:4566 -p 4510-4559:4510-4559 localstack/localstack
+```
+
+Setup environment
+
+```
+export TEST_INTEGRATION=1
+export AWS_DEFAULT_REGION=us-east-1
+export AWS_ACCESS_KEY_ID=test
+export AWS_SECRET_ACCESS_KEY=test
+export AWS_ENDPOINT=http://127.0.0.1:4566
+export OBJECT_STORE_BUCKET=test-bucket
+```
+
+Create a bucket using the AWS CLI
+
+```
+podman run --net=host --env-host amazon/aws-cli --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
+```
+
+Run tests
+
+```
+$ cargo test --features aws
+```
+
+### Azure
+
+To test the Azure integration
+against [azurite](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio)
+
+Startup azurite
+
+```
+$ podman run -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite
+```
+
+Create a bucket
+
+```
+$ podman run --net=host mcr.microsoft.com/azure-cli az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
+```
+
+Run tests
+
+```
+$ cargo test --features azure
+```
+
+### GCP
+
+We don't have a good story yet for testing the GCP integration locally. You will need to create a GCS bucket, a
+service account that has access to it, and use this to run the tests.
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
new file mode 100644
index 0000000..613b6ab
--- /dev/null
+++ b/object_store/Cargo.toml
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "object_store"
+version = "0.3.0"
+edition = "2021"
+license = "MIT/Apache-2.0"
+readme = "README.md"
+description = "A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage and Azure Blob Storage"
+keywords = [
+ "object",
+ "storage",
+ "cloud",
+]
+repository = "https://github.com/apache/arrow-rs"
+
+[package.metadata.docs.rs]
+all-features = true
+
+[dependencies] # In alphabetical order
+async-trait = "0.1.53"
+# Microsoft Azure Blob storage integration
+azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] }
+azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+bytes = "1.0"
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+# Google Cloud Storage integration
+futures = "0.3"
+serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
+serde_json = { version = "1.0", default-features = false, optional = true }
+rustls-pemfile = { version = "1.0", default-features = false, optional = true }
+ring = { version = "0.16", default-features = false, features = ["std"] }
+base64 = { version = "0.13", default-features = false, optional = true }
+# for rusoto
+hyper = { version = "0.14", optional = true, default-features = false }
+# for rusoto
+hyper-rustls = { version = "0.23.0", optional = true, default-features = false, features = ["webpki-tokio", "http1", "http2", "tls12"] }
+itertools = "0.10.1"
+percent-encoding = "2.1"
+# rusoto crates are for Amazon S3 integration
+rusoto_core = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
+rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
+rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
+rusoto_sts = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
+snafu = "0.7"
+tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time"] }
+tracing = { version = "0.1" }
+reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] }
+parking_lot = { version = "0.12" }
+# Filesystem integration
+url = "2.2"
+walkdir = "2"
+
+[features]
+azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
+azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
+gcp = ["serde", "serde_json", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64"]
+aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"]
+
+[dev-dependencies] # In alphabetical order
+dotenv = "0.15.0"
+tempfile = "3.1.0"
+futures-test = "0.3"
diff --git a/object_store/README.md b/object_store/README.md
new file mode 100644
index 0000000..313588b
--- /dev/null
+++ b/object_store/README.md
@@ -0,0 +1,26 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Rust Object Store
+
+A crate providing a generic interface to object stores, such as S3, Azure Blob Storage and Google Cloud Storage.
+
+Originally developed for [InfluxDB IOx](https://github.com/influxdata/influxdb_iox/) and later split out and donated to Apache Arrow.
+
+See [docs.rs](https://docs.rs/object_store) for usage instructions
diff --git a/object_store/deny.toml b/object_store/deny.toml
new file mode 100644
index 0000000..bfd060a
--- /dev/null
+++ b/object_store/deny.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Configuration documentation:
+# https://embarkstudios.github.io/cargo-deny/index.html
+
+[advisories]
+vulnerability = "deny"
+yanked = "deny"
+unmaintained = "warn"
+notice = "warn"
+ignore = [
+]
+git-fetch-with-cli = true
+
+[licenses]
+default = "allow"
+unlicensed = "allow"
+copyleft = "allow"
+
+[bans]
+multiple-versions = "warn"
+deny = [
+ # We are using rustls as the TLS implementation, so we shouldn't be linking
+ # in OpenSSL too.
+ #
+ # If you're hitting this, you might want to take a look at what new
+ # dependencies you have introduced and check if there's a way to depend on
+ # rustls instead of OpenSSL (tip: check the crate's feature flags).
+ { name = "openssl-sys" }
+]
diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs
new file mode 100644
index 0000000..7ebcc2a
--- /dev/null
+++ b/object_store/src/aws.rs
@@ -0,0 +1,1041 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for S3
+use crate::util::format_http_range;
+use crate::{
+ collect_bytes,
+ path::{Path, DELIMITER},
+ util::format_prefix,
+ GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{
+ stream::{self, BoxStream},
+ Future, Stream, StreamExt, TryStreamExt,
+};
+use hyper::client::Builder as HyperBuilder;
+use rusoto_core::ByteStream;
+use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
+use rusoto_s3::S3;
+use rusoto_sts::WebIdentityProvider;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::ops::Range;
+use std::{
+ convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration,
+};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+use tracing::{debug, warn};
+
+/// The maximum number of times a request will be retried in the case of an AWS server error
+pub const MAX_NUM_RETRIES: u32 = 3;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+ #[snafu(display(
+ "Expected streamed data to have length {}, got {}",
+ expected,
+ actual
+ ))]
+ DataDoesNotMatchLength { expected: usize, actual: usize },
+
+ #[snafu(display(
+ "Did not receive any data. Bucket: {}, Location: {}",
+ bucket,
+ path
+ ))]
+ NoData { bucket: String, path: String },
+
+ #[snafu(display(
+ "Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})",
+ bucket,
+ path,
+ source,
+ source,
+ ))]
+ UnableToDeleteData {
+ source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
+ bucket: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})",
+ bucket,
+ path,
+ source,
+ source,
+ ))]
+ UnableToGetData {
+ source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
+ bucket: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to HEAD data. Bucket: {}, Location: {}, Error: {} ({:?})",
+ bucket,
+ path,
+ source,
+ source,
+ ))]
+ UnableToHeadData {
+ source: rusoto_core::RusotoError<rusoto_s3::HeadObjectError>,
+ bucket: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})",
+ bucket,
+ path,
+ source,
+ source,
+ ))]
+ UnableToGetPieceOfData {
+ source: std::io::Error,
+ bucket: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
+ bucket,
+ path,
+ source,
+ source,
+ ))]
+ UnableToPutData {
+ source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
+ bucket: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to list data. Bucket: {}, Error: {} ({:?})",
+ bucket,
+ source,
+ source,
+ ))]
+ UnableToListData {
+ source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>,
+ bucket: String,
+ },
+
+ #[snafu(display(
+ "Unable to copy object. Bucket: {}, From: {}, To: {}, Error: {}",
+ bucket,
+ from,
+ to,
+ source,
+ ))]
+ UnableToCopyObject {
+ source: rusoto_core::RusotoError<rusoto_s3::CopyObjectError>,
+ bucket: String,
+ from: String,
+ to: String,
+ },
+
+ #[snafu(display(
+ "Unable to parse last modified date. Bucket: {}, Error: {} ({:?})",
+ bucket,
+ source,
+ source,
+ ))]
+ UnableToParseLastModified {
+ source: chrono::ParseError,
+ bucket: String,
+ },
+
+ #[snafu(display(
+ "Unable to buffer data into temporary file, Error: {} ({:?})",
+ source,
+ source,
+ ))]
+ UnableToBufferStream { source: std::io::Error },
+
+ #[snafu(display(
+ "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})",
+ region,
+ source,
+ source,
+ ))]
+ InvalidRegion {
+ region: String,
+ source: rusoto_core::region::ParseRegionError,
+ },
+
+ #[snafu(display("Missing aws-access-key"))]
+ MissingAccessKey,
+
+ #[snafu(display("Missing aws-secret-access-key"))]
+ MissingSecretAccessKey,
+
+ NotFound {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+}
+
+impl From<Error> for super::Error {
+ fn from(source: Error) -> Self {
+ match source {
+ Error::NotFound { path, source } => Self::NotFound { path, source },
+ _ => Self::Generic {
+ store: "S3",
+ source: Box::new(source),
+ },
+ }
+ }
+}
+
+/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
+pub struct AmazonS3 {
+ /// S3 client w/o any connection limit.
+ ///
+ /// You should normally use [`Self::client`] instead.
+ client_unrestricted: rusoto_s3::S3Client,
+
+ /// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted).
+ connection_semaphore: Arc<Semaphore>,
+
+ /// Bucket name used by this object store client.
+ bucket_name: String,
+}
+
+impl fmt::Debug for AmazonS3 {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AmazonS3")
+ .field("client", &"rusoto_s3::S3Client")
+ .field("bucket_name", &self.bucket_name)
+ .finish()
+ }
+}
+
+impl fmt::Display for AmazonS3 {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "AmazonS3({})", self.bucket_name)
+ }
+}
+
+#[async_trait]
+impl ObjectStore for AmazonS3 {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ let bucket_name = self.bucket_name.clone();
+ let request_factory = move || {
+ let bytes = bytes.clone();
+
+ let length = bytes.len();
+ let stream_data = Ok(bytes);
+ let stream = futures::stream::once(async move { stream_data });
+ let byte_stream = ByteStream::new_with_size(stream, length);
+
+ rusoto_s3::PutObjectRequest {
+ bucket: bucket_name.clone(),
+ key: location.to_string(),
+ body: Some(byte_stream),
+ ..Default::default()
+ }
+ };
+
+ let s3 = self.client().await;
+
+ s3_request(move || {
+ let (s3, request_factory) = (s3.clone(), request_factory.clone());
+
+ async move { s3.put_object(request_factory()).await }
+ })
+ .await
+ .context(UnableToPutDataSnafu {
+ bucket: &self.bucket_name,
+ path: location.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ Ok(GetResult::Stream(
+ self.get_object(location, None).await?.boxed(),
+ ))
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let size_hint = range.end - range.start;
+ let stream = self.get_object(location, Some(range)).await?;
+ collect_bytes(stream, Some(size_hint)).await
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let key = location.to_string();
+ let head_request = rusoto_s3::HeadObjectRequest {
+ bucket: self.bucket_name.clone(),
+ key: key.clone(),
+ ..Default::default()
+ };
+ let s = self
+ .client()
+ .await
+ .head_object(head_request)
+ .await
+ .map_err(|e| match e {
+ rusoto_core::RusotoError::Service(
+ rusoto_s3::HeadObjectError::NoSuchKey(_),
+ ) => Error::NotFound {
+ path: key.clone(),
+ source: e.into(),
+ },
+ rusoto_core::RusotoError::Unknown(h) if h.status.as_u16() == 404 => {
+ Error::NotFound {
+ path: key.clone(),
+ source: "resource not found".into(),
+ }
+ }
+ _ => Error::UnableToHeadData {
+ bucket: self.bucket_name.to_owned(),
+ path: key.clone(),
+ source: e,
+ },
+ })?;
+
+ // Note: GetObject and HeadObject return a different date format from ListObjects
+ //
+ // S3 List returns timestamps in the form
+ // <LastModified>2013-09-17T18:07:53.000Z</LastModified>
+ // S3 GetObject returns timestamps in the form
+ // Last-Modified: Sun, 1 Jan 2006 12:00:00 GMT
+ let last_modified = match s.last_modified {
+ Some(lm) => DateTime::parse_from_rfc2822(&lm)
+ .context(UnableToParseLastModifiedSnafu {
+ bucket: &self.bucket_name,
+ })?
+ .with_timezone(&Utc),
+ None => Utc::now(),
+ };
+
+ Ok(ObjectMeta {
+ last_modified,
+ location: location.clone(),
+ size: usize::try_from(s.content_length.unwrap_or(0))
+ .expect("unsupported size on this platform"),
+ })
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ let bucket_name = self.bucket_name.clone();
+
+ let request_factory = move || rusoto_s3::DeleteObjectRequest {
+ bucket: bucket_name.clone(),
+ key: location.to_string(),
+ ..Default::default()
+ };
+
+ let s3 = self.client().await;
+
+ s3_request(move || {
+ let (s3, request_factory) = (s3.clone(), request_factory.clone());
+
+ async move { s3.delete_object(request_factory()).await }
+ })
+ .await
+ .context(UnableToDeleteDataSnafu {
+ bucket: &self.bucket_name,
+ path: location.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ Ok(self
+ .list_objects_v2(prefix, None)
+ .await?
+ .map_ok(move |list_objects_v2_result| {
+ let contents = list_objects_v2_result.contents.unwrap_or_default();
+ let iter = contents
+ .into_iter()
+ .map(|object| convert_object_meta(object, &self.bucket_name));
+
+ futures::stream::iter(iter)
+ })
+ .try_flatten()
+ .boxed())
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ Ok(self
+ .list_objects_v2(prefix, Some(DELIMITER.to_string()))
+ .await?
+ .try_fold(
+ ListResult {
+ common_prefixes: vec![],
+ objects: vec![],
+ },
+ |acc, list_objects_v2_result| async move {
+ let mut res = acc;
+ let contents = list_objects_v2_result.contents.unwrap_or_default();
+ let mut objects = contents
+ .into_iter()
+ .map(|object| convert_object_meta(object, &self.bucket_name))
+ .collect::<Result<Vec<_>>>()?;
+
+ res.objects.append(&mut objects);
+
+ let prefixes =
+ list_objects_v2_result.common_prefixes.unwrap_or_default();
+ res.common_prefixes.reserve(prefixes.len());
+
+ for p in prefixes {
+ let prefix =
+ p.prefix.expect("can't have a prefix without a value");
+ res.common_prefixes.push(Path::parse(prefix)?);
+ }
+
+ Ok(res)
+ },
+ )
+ .await?)
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ let from = from.as_ref();
+ let to = to.as_ref();
+ let bucket_name = self.bucket_name.clone();
+
+ let request_factory = move || rusoto_s3::CopyObjectRequest {
+ bucket: bucket_name.clone(),
+ copy_source: format!("{}/{}", &bucket_name, from),
+ key: to.to_string(),
+ ..Default::default()
+ };
+
+ let s3 = self.client().await;
+
+ s3_request(move || {
+ let (s3, request_factory) = (s3.clone(), request_factory.clone());
+
+ async move { s3.copy_object(request_factory()).await }
+ })
+ .await
+ .context(UnableToCopyObjectSnafu {
+ bucket: &self.bucket_name,
+ from,
+ to,
+ })?;
+
+ Ok(())
+ }
+
+ async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) -> Result<()> {
+ // Will need dynamodb_lock
+ Err(crate::Error::NotImplemented)
+ }
+}
+
+fn convert_object_meta(object: rusoto_s3::Object, bucket: &str) -> Result<ObjectMeta> {
+ let key = object.key.expect("object doesn't exist without a key");
+ let location = Path::parse(key)?;
+ let last_modified = match object.last_modified {
+ Some(lm) => DateTime::parse_from_rfc3339(&lm)
+ .context(UnableToParseLastModifiedSnafu { bucket })?
+ .with_timezone(&Utc),
+ None => Utc::now(),
+ };
+ let size = usize::try_from(object.size.unwrap_or(0))
+ .expect("unsupported size on this platform");
+
+ Ok(ObjectMeta {
+ location,
+ last_modified,
+ size,
+ })
+}
+
+/// Configure a connection to Amazon S3 using the specified credentials in
+/// the specified Amazon region and bucket.
+#[allow(clippy::too_many_arguments)]
+pub fn new_s3(
+ access_key_id: Option<impl Into<String>>,
+ secret_access_key: Option<impl Into<String>>,
+ region: impl Into<String>,
+ bucket_name: impl Into<String>,
+ endpoint: Option<impl Into<String>>,
+ session_token: Option<impl Into<String>>,
+ max_connections: NonZeroUsize,
+ allow_http: bool,
+) -> Result<AmazonS3> {
+ let region = region.into();
+ let region: rusoto_core::Region = match endpoint {
+ None => region.parse().context(InvalidRegionSnafu { region })?,
+ Some(endpoint) => rusoto_core::Region::Custom {
+ name: region,
+ endpoint: endpoint.into(),
+ },
+ };
+
+ let mut builder = HyperBuilder::default();
+ builder.pool_max_idle_per_host(max_connections.get());
+
+ let connector = if allow_http {
+ hyper_rustls::HttpsConnectorBuilder::new()
+ .with_webpki_roots()
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build()
+ } else {
+ hyper_rustls::HttpsConnectorBuilder::new()
+ .with_webpki_roots()
+ .https_only()
+ .enable_http1()
+ .enable_http2()
+ .build()
+ };
+
+ let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector);
+
+ let client = match (access_key_id, secret_access_key, session_token) {
+ (Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
+ let credentials_provider = StaticProvider::new(
+ access_key_id.into(),
+ secret_access_key.into(),
+ Some(session_token.into()),
+ None,
+ );
+ rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
+ }
+ (Some(access_key_id), Some(secret_access_key), None) => {
+ let credentials_provider = StaticProvider::new_minimal(
+ access_key_id.into(),
+ secret_access_key.into(),
+ );
+ rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
+ }
+ (None, Some(_), _) => return Err(Error::MissingAccessKey.into()),
+ (Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
+ _ if std::env::var_os("AWS_WEB_IDENTITY_TOKEN_FILE").is_some() => {
+ rusoto_s3::S3Client::new_with(
+ http_client,
+ WebIdentityProvider::from_k8s_env(),
+ region,
+ )
+ }
+ _ => rusoto_s3::S3Client::new_with(
+ http_client,
+ InstanceMetadataProvider::new(),
+ region,
+ ),
+ };
+
+ Ok(AmazonS3 {
+ client_unrestricted: client,
+ connection_semaphore: Arc::new(Semaphore::new(max_connections.get())),
+ bucket_name: bucket_name.into(),
+ })
+}
+
+/// Create a new [`AmazonS3`] that always errors
+pub fn new_failing_s3() -> Result<AmazonS3> {
+ new_s3(
+ Some("foo"),
+ Some("bar"),
+ "us-east-1",
+ "bucket",
+ None as Option<&str>,
+ None as Option<&str>,
+ NonZeroUsize::new(16).unwrap(),
+ true,
+ )
+}
+
+/// S3 client bundled w/ a semaphore permit.
+#[derive(Clone)]
+struct SemaphoreClient {
+ /// Permit for this specific use of the client.
+ ///
+ /// Note that this field is never read and therefore considered "dead code" by rustc.
+ #[allow(dead_code)]
+ permit: Arc<OwnedSemaphorePermit>,
+
+ inner: rusoto_s3::S3Client,
+}
+
+impl Deref for SemaphoreClient {
+ type Target = rusoto_s3::S3Client;
+
+ fn deref(&self) -> &Self::Target {
+ &self.inner
+ }
+}
+
+impl AmazonS3 {
+ /// Get a client according to the current connection limit.
+ async fn client(&self) -> SemaphoreClient {
+ let permit = Arc::clone(&self.connection_semaphore)
+ .acquire_owned()
+ .await
+ .expect("semaphore shouldn't be closed yet");
+ SemaphoreClient {
+ permit: Arc::new(permit),
+ inner: self.client_unrestricted.clone(),
+ }
+ }
+
+ async fn get_object(
+ &self,
+ location: &Path,
+ range: Option<Range<usize>>,
+ ) -> Result<impl Stream<Item = Result<Bytes>>> {
+ let key = location.to_string();
+ let get_request = rusoto_s3::GetObjectRequest {
+ bucket: self.bucket_name.clone(),
+ key: key.clone(),
+ range: range.map(format_http_range),
+ ..Default::default()
+ };
+ let bucket_name = self.bucket_name.clone();
+ let stream = self
+ .client()
+ .await
+ .get_object(get_request)
+ .await
+ .map_err(|e| match e {
+ rusoto_core::RusotoError::Service(
+ rusoto_s3::GetObjectError::NoSuchKey(_),
+ ) => Error::NotFound {
+ path: key.clone(),
+ source: e.into(),
+ },
+ _ => Error::UnableToGetData {
+ bucket: self.bucket_name.to_owned(),
+ path: key.clone(),
+ source: e,
+ },
+ })?
+ .body
+ .context(NoDataSnafu {
+ bucket: self.bucket_name.to_owned(),
+ path: key.clone(),
+ })?
+ .map_err(move |source| Error::UnableToGetPieceOfData {
+ source,
+ bucket: bucket_name.clone(),
+ path: key.clone(),
+ })
+ .err_into();
+
+ Ok(stream)
+ }
+
+ async fn list_objects_v2(
+ &self,
+ prefix: Option<&Path>,
+ delimiter: Option<String>,
+ ) -> Result<BoxStream<'_, Result<rusoto_s3::ListObjectsV2Output>>> {
+ enum ListState {
+ Start,
+ HasMore(String),
+ Done,
+ }
+
+ let prefix = format_prefix(prefix);
+ let bucket = self.bucket_name.clone();
+
+ let request_factory = move || rusoto_s3::ListObjectsV2Request {
+ bucket,
+ prefix,
+ delimiter,
+ ..Default::default()
+ };
+ let s3 = self.client().await;
+
+ Ok(stream::unfold(ListState::Start, move |state| {
+ let request_factory = request_factory.clone();
+ let s3 = s3.clone();
+
+ async move {
+ let continuation_token = match &state {
+ ListState::HasMore(continuation_token) => Some(continuation_token),
+ ListState::Done => {
+ return None;
+ }
+ // If this is the first request we've made, we don't need to make any
+ // modifications to the request
+ ListState::Start => None,
+ };
+
+ let resp = s3_request(move || {
+ let (s3, request_factory, continuation_token) = (
+ s3.clone(),
+ request_factory.clone(),
+ continuation_token.cloned(),
+ );
+
+ async move {
+ s3.list_objects_v2(rusoto_s3::ListObjectsV2Request {
+ continuation_token,
+ ..request_factory()
+ })
+ .await
+ }
+ })
+ .await;
+
+ let resp = match resp {
+ Ok(resp) => resp,
+ Err(e) => return Some((Err(e), state)),
+ };
+
+ // The AWS response contains a field named `is_truncated` as well as
+ // `next_continuation_token`, and we're assuming that `next_continuation_token`
+ // is only set when `is_truncated` is true (and therefore not
+ // checking `is_truncated`).
+ let next_state = if let Some(next_continuation_token) =
+ &resp.next_continuation_token
+ {
+ ListState::HasMore(next_continuation_token.to_string())
+ } else {
+ ListState::Done
+ };
+
+ Some((Ok(resp), next_state))
+ }
+ })
+ .map_err(move |e| {
+ Error::UnableToListData {
+ source: e,
+ bucket: self.bucket_name.clone(),
+ }
+ .into()
+ })
+ .boxed())
+ }
+}
+
+/// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors.
+///
+/// The `future_factory` argument is a function `F` that takes no arguments and, when called, will
+/// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through
+/// `rusoto` and return a `Result` that returns some type `R` on success and some
+/// `rusoto_core::RusotoError<E>` on error.
+///
+/// If the executed `Future` returns success, this function will return that success.
+/// If the executed `Future` returns a 5xx server error, this function will wait an amount of
+/// time that increases exponentially with the number of times it has retried, get a new `Future` by
+/// calling `future_factory` again, and retry the request by `await`ing the `Future` again.
+/// The retries will continue until the maximum number of retries has been attempted. In that case,
+/// this function will return the last encountered error.
+///
+/// Client errors (4xx) will never be retried by this function.
+async fn s3_request<E, F, G, R>(
+ future_factory: F,
+) -> Result<R, rusoto_core::RusotoError<E>>
+where
+ E: std::error::Error + Send,
+ F: Fn() -> G + Send,
+ G: Future<Output = Result<R, rusoto_core::RusotoError<E>>> + Send,
+ R: Send,
+{
+ let mut attempts = 0;
+
+ loop {
+ let request = future_factory();
+
+ let result = request.await;
+
+ match result {
+ Ok(r) => return Ok(r),
+ Err(error) => {
+ attempts += 1;
+
+ let should_retry = matches!(
+ error,
+ rusoto_core::RusotoError::Unknown(ref response)
+ if response.status.is_server_error()
+ );
+
+ if attempts > MAX_NUM_RETRIES {
+ warn!(
+ ?error,
+ attempts, "maximum number of retries exceeded for AWS S3 request"
+ );
+ return Err(error);
+ } else if !should_retry {
+ return Err(error);
+ } else {
+ debug!(?error, attempts, "retrying AWS S3 request");
+ let wait_time = Duration::from_millis(2u64.pow(attempts) * 50);
+ tokio::time::sleep(wait_time).await;
+ }
+ }
+ }
+ }
+}
+
+impl Error {
+ #[cfg(test)]
+ fn s3_error_due_to_credentials(&self) -> bool {
+ use rusoto_core::RusotoError;
+ use Error::*;
+
+ matches!(
+ self,
+ UnableToPutData {
+ source: RusotoError::Credentials(_),
+ bucket: _,
+ path: _,
+ } | UnableToGetData {
+ source: RusotoError::Credentials(_),
+ bucket: _,
+ path: _,
+ } | UnableToDeleteData {
+ source: RusotoError::Credentials(_),
+ bucket: _,
+ path: _,
+ } | UnableToListData {
+ source: RusotoError::Credentials(_),
+ bucket: _,
+ }
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{
+ tests::{
+ get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
+ put_get_delete_list, rename_and_copy,
+ },
+ Error as ObjectStoreError, ObjectStore,
+ };
+ use bytes::Bytes;
+ use std::env;
+
+ type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
+ type Result<T, E = TestError> = std::result::Result<T, E>;
+
+ const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+ #[derive(Debug)]
+ struct AwsConfig {
+ access_key_id: String,
+ secret_access_key: String,
+ region: String,
+ bucket: String,
+ endpoint: Option<String>,
+ token: Option<String>,
+ }
+
+ // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
+ macro_rules! maybe_skip_integration {
+ () => {{
+ dotenv::dotenv().ok();
+
+ let required_vars = [
+ "AWS_DEFAULT_REGION",
+ "OBJECT_STORE_BUCKET",
+ "AWS_ACCESS_KEY_ID",
+ "AWS_SECRET_ACCESS_KEY",
+ ];
+ let unset_vars: Vec<_> = required_vars
+ .iter()
+ .filter_map(|&name| match env::var(name) {
+ Ok(_) => None,
+ Err(_) => Some(name),
+ })
+ .collect();
+ let unset_var_names = unset_vars.join(", ");
+
+ let force = env::var("TEST_INTEGRATION");
+
+ if force.is_ok() && !unset_var_names.is_empty() {
+ panic!(
+ "TEST_INTEGRATION is set, \
+ but variable(s) {} need to be set",
+ unset_var_names
+ );
+ } else if force.is_err() {
+ eprintln!(
+ "skipping AWS integration test - set {}TEST_INTEGRATION to run",
+ if unset_var_names.is_empty() {
+ String::new()
+ } else {
+ format!("{} and ", unset_var_names)
+ }
+ );
+ return;
+ } else {
+ AwsConfig {
+ access_key_id: env::var("AWS_ACCESS_KEY_ID")
+ .expect("already checked AWS_ACCESS_KEY_ID"),
+ secret_access_key: env::var("AWS_SECRET_ACCESS_KEY")
+ .expect("already checked AWS_SECRET_ACCESS_KEY"),
+ region: env::var("AWS_DEFAULT_REGION")
+ .expect("already checked AWS_DEFAULT_REGION"),
+ bucket: env::var("OBJECT_STORE_BUCKET")
+ .expect("already checked OBJECT_STORE_BUCKET"),
+ endpoint: env::var("AWS_ENDPOINT").ok(),
+ token: env::var("AWS_SESSION_TOKEN").ok(),
+ }
+ }
+ }};
+ }
+
+ fn check_credentials<T>(r: Result<T>) -> Result<T> {
+ if let Err(e) = &r {
+ let e = &**e;
+ if let Some(e) = e.downcast_ref::<Error>() {
+ if e.s3_error_due_to_credentials() {
+ eprintln!(
+ "Try setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY \
+ environment variables"
+ );
+ }
+ }
+ }
+
+ r
+ }
+
+ fn make_integration(config: AwsConfig) -> AmazonS3 {
+ new_s3(
+ Some(config.access_key_id),
+ Some(config.secret_access_key),
+ config.region,
+ config.bucket,
+ config.endpoint,
+ config.token,
+ NonZeroUsize::new(16).unwrap(),
+ true,
+ )
+ .expect("Valid S3 config")
+ }
+
+ #[tokio::test]
+ async fn s3_test() {
+ let config = maybe_skip_integration!();
+ let integration = make_integration(config);
+
+ check_credentials(put_get_delete_list(&integration).await).unwrap();
+ check_credentials(list_uses_directories_correctly(&integration).await).unwrap();
+ check_credentials(list_with_delimiter(&integration).await).unwrap();
+ check_credentials(rename_and_copy(&integration).await).unwrap();
+ }
+
+ #[tokio::test]
+ async fn s3_test_get_nonexistent_location() {
+ let config = maybe_skip_integration!();
+ let integration = make_integration(config);
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = get_nonexistent_object(&integration, Some(location))
+ .await
+ .unwrap_err();
+ if let ObjectStoreError::NotFound { path, source } = err {
+ let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>();
+ assert!(
+ matches!(
+ source_variant,
+ Some(rusoto_core::RusotoError::Service(
+ rusoto_s3::GetObjectError::NoSuchKey(_)
+ )),
+ ),
+ "got: {:?}",
+ source_variant
+ );
+ assert_eq!(path, NON_EXISTENT_NAME);
+ } else {
+ panic!("unexpected error type: {:?}", err);
+ }
+ }
+
+ #[tokio::test]
+ async fn s3_test_get_nonexistent_bucket() {
+ let mut config = maybe_skip_integration!();
+ config.bucket = NON_EXISTENT_NAME.into();
+ let integration = make_integration(config);
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = integration.get(&location).await.unwrap_err().to_string();
+ assert!(
+ err.contains("The specified bucket does not exist"),
+ "{}",
+ err
+ )
+ }
+
+ #[tokio::test]
+ async fn s3_test_put_nonexistent_bucket() {
+ let mut config = maybe_skip_integration!();
+ config.bucket = NON_EXISTENT_NAME.into();
+ let integration = make_integration(config);
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+ let data = Bytes::from("arbitrary data");
+
+ let err = integration
+ .put(&location, data)
+ .await
+ .unwrap_err()
+ .to_string();
+
+ assert!(
+ err.contains("The specified bucket does not exist")
+ && err.contains("Unable to PUT data"),
+ "{}",
+ err
+ )
+ }
+
+ #[tokio::test]
+ async fn s3_test_delete_nonexistent_location() {
+ let config = maybe_skip_integration!();
+ let integration = make_integration(config);
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ integration.delete(&location).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn s3_test_delete_nonexistent_bucket() {
+ let mut config = maybe_skip_integration!();
+ config.bucket = NON_EXISTENT_NAME.into();
+ let integration = make_integration(config);
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = integration.delete(&location).await.unwrap_err().to_string();
+ assert!(
+ err.contains("The specified bucket does not exist")
+ && err.contains("Unable to DELETE data"),
+ "{}",
+ err
+ )
+ }
+}
diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs
new file mode 100644
index 0000000..5f43279
--- /dev/null
+++ b/object_store/src/azure.rs
@@ -0,0 +1,646 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for Azure blob storage
+use crate::{
+ path::{Path, DELIMITER},
+ util::format_prefix,
+ GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+use async_trait::async_trait;
+use azure_core::{prelude::*, HttpClient};
+use azure_storage::core::prelude::{AsStorageClient, StorageAccountClient};
+use azure_storage_blobs::blob::responses::ListBlobsResponse;
+use azure_storage_blobs::blob::Blob;
+use azure_storage_blobs::{
+ prelude::{AsBlobClient, AsContainerClient, ContainerClient},
+ DeleteSnapshotsMethod,
+};
+use bytes::Bytes;
+use futures::{
+ stream::{self, BoxStream},
+ StreamExt, TryStreamExt,
+};
+use snafu::{ResultExt, Snafu};
+use std::collections::BTreeSet;
+use std::{convert::TryInto, sync::Arc};
+
+/// A specialized `Error` for Azure object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+ #[snafu(display(
+ "Unable to DELETE data. Container: {}, Location: {}, Error: {} ({:?})",
+ container,
+ path,
+ source,
+ source,
+ ))]
+ UnableToDeleteData {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to GET data. Container: {}, Location: {}, Error: {} ({:?})",
+ container,
+ path,
+ source,
+ source,
+ ))]
+ UnableToGetData {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to HEAD data. Container: {}, Location: {}, Error: {} ({:?})",
+ container,
+ path,
+ source,
+ source,
+ ))]
+ UnableToHeadData {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to GET part of the data. Container: {}, Location: {}, Error: {} ({:?})",
+ container,
+ path,
+ source,
+ source,
+ ))]
+ UnableToGetPieceOfData {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
+ container,
+ path,
+ source,
+ source,
+ ))]
+ UnableToPutData {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ path: String,
+ },
+
+ #[snafu(display(
+ "Unable to list data. Bucket: {}, Error: {} ({:?})",
+ container,
+ source,
+ source,
+ ))]
+ UnableToListData {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ },
+
+ #[snafu(display(
+ "Unable to copy object. Container: {}, From: {}, To: {}, Error: {}",
+ container,
+ from,
+ to,
+ source
+ ))]
+ UnableToCopyFile {
+ source: Box<dyn std::error::Error + Send + Sync>,
+ container: String,
+ from: String,
+ to: String,
+ },
+
+ #[snafu(display(
+ "Unable parse source url. Container: {}, Error: {}",
+ container,
+ source
+ ))]
+ UnableToParseUrl {
+ source: url::ParseError,
+ container: String,
+ },
+
+ NotFound {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ AlreadyExists {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ #[cfg(not(feature = "azure_test"))]
+ #[snafu(display(
+ "Azurite (azure emulator) support not compiled in, please add `azure_test` feature"
+ ))]
+ NoEmulatorFeature,
+}
+
+impl From<Error> for super::Error {
+ fn from(source: Error) -> Self {
+ match source {
+ Error::NotFound { path, source } => Self::NotFound { path, source },
+ Error::AlreadyExists { path, source } => Self::AlreadyExists { path, source },
+ _ => Self::Generic {
+ store: "Azure Blob Storage",
+ source: Box::new(source),
+ },
+ }
+ }
+}
+
+/// Configuration for connecting to [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
+#[derive(Debug)]
+pub struct MicrosoftAzure {
+ container_client: Arc<ContainerClient>,
+ container_name: String,
+ blob_base_url: String,
+ is_emulator: bool,
+}
+
+impl std::fmt::Display for MicrosoftAzure {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self.is_emulator {
+ true => write!(f, "MicrosoftAzureEmulator({})", self.container_name),
+ false => write!(f, "MicrosoftAzure({})", self.container_name),
+ }
+ }
+}
+
+#[allow(clippy::borrowed_box)]
+fn check_err_not_found(err: &Box<dyn std::error::Error + Send + Sync>) -> bool {
+ if let Some(azure_core::HttpError::StatusCode { status, .. }) =
+ err.downcast_ref::<azure_core::HttpError>()
+ {
+ return status.as_u16() == 404;
+ };
+ false
+}
+
+#[async_trait]
+impl ObjectStore for MicrosoftAzure {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ let bytes = bytes::BytesMut::from(&*bytes);
+
+ self.container_client
+ .as_blob_client(location.as_ref())
+ .put_block_blob(bytes)
+ .execute()
+ .await
+ .context(UnableToPutDataSnafu {
+ container: &self.container_name,
+ path: location.to_owned(),
+ })?;
+
+ Ok(())
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let blob = self
+ .container_client
+ .as_blob_client(location.as_ref())
+ .get()
+ .execute()
+ .await
+ .map_err(|err| {
+ if check_err_not_found(&err) {
+ return Error::NotFound {
+ source: err,
+ path: location.to_string(),
+ };
+ };
+ Error::UnableToGetData {
+ source: err,
+ container: self.container_name.clone(),
+ path: location.to_string(),
+ }
+ })?;
+
+ Ok(GetResult::Stream(
+ futures::stream::once(async move { Ok(blob.data) }).boxed(),
+ ))
+ }
+
+ async fn get_range(
+ &self,
+ location: &Path,
+ range: std::ops::Range<usize>,
+ ) -> Result<Bytes> {
+ let blob = self
+ .container_client
+ .as_blob_client(location.as_ref())
+ .get()
+ .range(range)
+ .execute()
+ .await
+ .map_err(|err| {
+ if check_err_not_found(&err) {
+ return Error::NotFound {
+ source: err,
+ path: location.to_string(),
+ };
+ };
+ Error::UnableToGetPieceOfData {
+ source: err,
+ container: self.container_name.clone(),
+ path: location.to_string(),
+ }
+ })?;
+
+ Ok(blob.data)
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let res = self
+ .container_client
+ .as_blob_client(location.as_ref())
+ .get_properties()
+ .execute()
+ .await
+ .map_err(|err| {
+ if check_err_not_found(&err) {
+ return Error::NotFound {
+ source: err,
+ path: location.to_string(),
+ };
+ };
+ Error::UnableToHeadData {
+ source: err,
+ container: self.container_name.clone(),
+ path: location.to_string(),
+ }
+ })?;
+
+ convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound {
+ path: location.to_string(),
+ source: "is directory".to_string().into(),
+ })
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.container_client
+ .as_blob_client(location.as_ref())
+ .delete()
+ .delete_snapshots_method(DeleteSnapshotsMethod::Include)
+ .execute()
+ .await
+ .context(UnableToDeleteDataSnafu {
+ container: &self.container_name,
+ path: location.to_string(),
+ })?;
+
+ Ok(())
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let stream = self
+ .list_impl(prefix, false)
+ .await?
+ .map_ok(|resp| {
+ let names = resp
+ .blobs
+ .blobs
+ .into_iter()
+ .filter_map(|blob| convert_object_meta(blob).transpose());
+ futures::stream::iter(names)
+ })
+ .try_flatten()
+ .boxed();
+
+ Ok(stream)
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let mut stream = self.list_impl(prefix, true).await?;
+
+ let mut common_prefixes = BTreeSet::new();
+ let mut objects = Vec::new();
+
+ while let Some(res) = stream.next().await {
+ let response = res?;
+
+ let prefixes = response.blobs.blob_prefix.unwrap_or_default();
+ for p in prefixes {
+ common_prefixes.insert(Path::parse(&p.name)?);
+ }
+
+ let blobs = response.blobs.blobs;
+ objects.reserve(blobs.len());
+ for blob in blobs {
+ if let Some(meta) = convert_object_meta(blob)? {
+ objects.push(meta);
+ }
+ }
+ }
+
+ Ok(ListResult {
+ common_prefixes: common_prefixes.into_iter().collect(),
+ objects,
+ })
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ let from_url = self.get_copy_from_url(from)?;
+ self.container_client
+ .as_blob_client(to.as_ref())
+ .copy(&from_url)
+ .execute()
+ .await
+ .context(UnableToCopyFileSnafu {
+ container: &self.container_name,
+ from: from.as_ref(),
+ to: to.as_ref(),
+ })?;
+ Ok(())
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ let from_url = self.get_copy_from_url(from)?;
+ self.container_client
+ .as_blob_client(to.as_ref())
+ .copy(&from_url)
+ .if_match_condition(IfMatchCondition::NotMatch("*".to_string()))
+ .execute()
+ .await
+ .map_err(|err| {
+ if let Some(azure_core::HttpError::StatusCode { status, .. }) =
+ err.downcast_ref::<azure_core::HttpError>()
+ {
+ if status.as_u16() == 409 {
+ return Error::AlreadyExists {
+ source: err,
+ path: to.to_string(),
+ };
+ };
+ };
+ Error::UnableToCopyFile {
+ source: err,
+ container: self.container_name.clone(),
+ from: from.to_string(),
+ to: to.to_string(),
+ }
+ })?;
+ Ok(())
+ }
+}
+
+impl MicrosoftAzure {
+ /// helper function to create a source url for copy function
+ fn get_copy_from_url(&self, from: &Path) -> Result<reqwest::Url> {
+ Ok(reqwest::Url::parse(&format!(
+ "{}/{}/{}",
+ &self.blob_base_url, self.container_name, from
+ ))
+ .context(UnableToParseUrlSnafu {
+ container: &self.container_name,
+ })?)
+ }
+
+ async fn list_impl(
+ &self,
+ prefix: Option<&Path>,
+ delimiter: bool,
+ ) -> Result<BoxStream<'_, Result<ListBlobsResponse>>> {
+ enum ListState {
+ Start,
+ HasMore(String),
+ Done,
+ }
+
+ let prefix_raw = format_prefix(prefix);
+
+ Ok(stream::unfold(ListState::Start, move |state| {
+ let mut request = self.container_client.list_blobs();
+
+ if let Some(p) = prefix_raw.as_deref() {
+ request = request.prefix(p);
+ }
+
+ if delimiter {
+ request = request.delimiter(Delimiter::new(DELIMITER));
+ }
+
+ async move {
+ match state {
+ ListState::HasMore(ref marker) => {
+ request = request.next_marker(marker as &str);
+ }
+ ListState::Done => {
+ return None;
+ }
+ ListState::Start => {}
+ }
+
+ let resp = match request.execute().await.context(UnableToListDataSnafu {
+ container: &self.container_name,
+ }) {
+ Ok(resp) => resp,
+ Err(err) => return Some((Err(crate::Error::from(err)), state)),
+ };
+
+ let next_state = if let Some(marker) = &resp.next_marker {
+ ListState::HasMore(marker.as_str().to_string())
+ } else {
+ ListState::Done
+ };
+
+ Some((Ok(resp), next_state))
+ }
+ })
+ .boxed())
+ }
+}
+
+/// Returns `None` if is a directory
+fn convert_object_meta(blob: Blob) -> Result<Option<ObjectMeta>> {
+ let location = Path::parse(blob.name)?;
+ let last_modified = blob.properties.last_modified;
+ let size = blob
+ .properties
+ .content_length
+ .try_into()
+ .expect("unsupported size on this platform");
+
+ // This is needed to filter out gen2 directories
+ // https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-known-issues#blob-storage-apis
+ Ok((size > 0).then(|| ObjectMeta {
+ location,
+ last_modified,
+ size,
+ }))
+}
+
+#[cfg(feature = "azure_test")]
+fn check_if_emulator_works() -> Result<()> {
+ Ok(())
+}
+
+#[cfg(not(feature = "azure_test"))]
+fn check_if_emulator_works() -> Result<()> {
+ Err(Error::NoEmulatorFeature.into())
+}
+
+/// Configure a connection to container with given name on Microsoft Azure
+/// Blob store.
+///
+/// The credentials `account` and `access_key` must provide access to the
+/// store.
+pub fn new_azure(
+ account: impl Into<String>,
+ access_key: impl Into<String>,
+ container_name: impl Into<String>,
+ use_emulator: bool,
+) -> Result<MicrosoftAzure> {
+ let account = account.into();
+ let access_key = access_key.into();
+ let http_client: Arc<dyn HttpClient> = Arc::new(reqwest::Client::new());
+
+ let (is_emulator, storage_account_client) = if use_emulator {
+ check_if_emulator_works()?;
+ (true, StorageAccountClient::new_emulator_default())
+ } else {
+ (
+ false,
+ StorageAccountClient::new_access_key(
+ Arc::clone(&http_client),
+ &account,
+ &access_key,
+ ),
+ )
+ };
+
+ let storage_client = storage_account_client.as_storage_client();
+ let blob_base_url = storage_account_client
+ .blob_storage_url()
+ .as_ref()
+ // make url ending consistent between the emulator and remote storage account
+ .trim_end_matches('/')
+ .to_string();
+
+ let container_name = container_name.into();
+
+ let container_client = storage_client.as_container_client(&container_name);
+
+ Ok(MicrosoftAzure {
+ container_client,
+ container_name,
+ blob_base_url,
+ is_emulator,
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::azure::new_azure;
+ use crate::tests::{
+ copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
+ put_get_delete_list, rename_and_copy,
+ };
+ use std::env;
+
+ #[derive(Debug)]
+ struct AzureConfig {
+ storage_account: String,
+ access_key: String,
+ bucket: String,
+ use_emulator: bool,
+ }
+
+ // Helper macro to skip tests if TEST_INTEGRATION and the Azure environment
+ // variables are not set.
+ macro_rules! maybe_skip_integration {
+ () => {{
+ dotenv::dotenv().ok();
+
+ let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok();
+
+ let mut required_vars = vec!["OBJECT_STORE_BUCKET"];
+ if !use_emulator {
+ required_vars.push("AZURE_STORAGE_ACCOUNT");
+ required_vars.push("AZURE_STORAGE_ACCESS_KEY");
+ }
+ let unset_vars: Vec<_> = required_vars
+ .iter()
+ .filter_map(|&name| match env::var(name) {
+ Ok(_) => None,
+ Err(_) => Some(name),
+ })
+ .collect();
+ let unset_var_names = unset_vars.join(", ");
+
+ let force = std::env::var("TEST_INTEGRATION");
+
+ if force.is_ok() && !unset_var_names.is_empty() {
+ panic!(
+ "TEST_INTEGRATION is set, \
+ but variable(s) {} need to be set",
+ unset_var_names
+ )
+ } else if force.is_err() {
+ eprintln!(
+ "skipping Azure integration test - set {}TEST_INTEGRATION to run",
+ if unset_var_names.is_empty() {
+ String::new()
+ } else {
+ format!("{} and ", unset_var_names)
+ }
+ );
+ return;
+ } else {
+ AzureConfig {
+ storage_account: env::var("AZURE_STORAGE_ACCOUNT")
+ .unwrap_or_default(),
+ access_key: env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(),
+ bucket: env::var("OBJECT_STORE_BUCKET")
+ .expect("already checked OBJECT_STORE_BUCKET"),
+ use_emulator,
+ }
+ }
+ }};
+ }
+
+ #[tokio::test]
+ async fn azure_blob_test() {
+ let config = maybe_skip_integration!();
+ let integration = new_azure(
+ config.storage_account,
+ config.access_key,
+ config.bucket,
+ config.use_emulator,
+ )
+ .unwrap();
+
+ put_get_delete_list(&integration).await.unwrap();
+ list_uses_directories_correctly(&integration).await.unwrap();
+ list_with_delimiter(&integration).await.unwrap();
+ rename_and_copy(&integration).await.unwrap();
+ copy_if_not_exists(&integration).await.unwrap();
+ }
+}
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
new file mode 100644
index 0000000..84fb572
--- /dev/null
+++ b/object_store/src/gcp.rs
@@ -0,0 +1,721 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for Google Cloud Storage
+use std::collections::BTreeSet;
+use std::fs::File;
+use std::io::BufReader;
+use std::ops::Range;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
+use reqwest::header::RANGE;
+use reqwest::{header, Client, Method, Response, StatusCode};
+use snafu::{ResultExt, Snafu};
+
+use crate::util::format_http_range;
+use crate::{
+ oauth::OAuthProvider,
+ path::{Path, DELIMITER},
+ token::TokenCache,
+ util::format_prefix,
+ GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+
+#[derive(Debug, Snafu)]
+enum Error {
+ #[snafu(display("Unable to open service account file: {}", source))]
+ OpenCredentials { source: std::io::Error },
+
+ #[snafu(display("Unable to decode service account file: {}", source))]
+ DecodeCredentials { source: serde_json::Error },
+
+ #[snafu(display("Error performing list request: {}", source))]
+ ListRequest { source: reqwest::Error },
+
+ #[snafu(display("Error performing get request {}: {}", path, source))]
+ GetRequest {
+ source: reqwest::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error performing delete request {}: {}", path, source))]
+ DeleteRequest {
+ source: reqwest::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error performing copy request {}: {}", path, source))]
+ CopyRequest {
+ source: reqwest::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error performing put request: {}", source))]
+ PutRequest { source: reqwest::Error },
+
+ #[snafu(display("Error decoding object size: {}", source))]
+ InvalidSize { source: std::num::ParseIntError },
+}
+
+impl From<Error> for super::Error {
+ fn from(err: Error) -> Self {
+ match err {
+ Error::GetRequest { source, path }
+ | Error::DeleteRequest { source, path }
+ | Error::CopyRequest { source, path }
+ if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
+ {
+ Self::NotFound {
+ path,
+ source: Box::new(source),
+ }
+ }
+ _ => Self::Generic {
+ store: "GCS",
+ source: Box::new(err),
+ },
+ }
+ }
+}
+
+/// A deserialized `service-account-********.json`-file.
+#[derive(serde::Deserialize, Debug)]
+struct ServiceAccountCredentials {
+ /// The private key in RSA format.
+ pub private_key: String,
+
+ /// The email address associated with the service account.
+ pub client_email: String,
+
+ /// Base URL for GCS
+ #[serde(default = "default_gcs_base_url")]
+ pub gcs_base_url: String,
+
+ /// Disable oauth and use empty tokens.
+ #[serde(default = "default_disable_oauth")]
+ pub disable_oauth: bool,
+}
+
+fn default_gcs_base_url() -> String {
+ "https://storage.googleapis.com".to_owned()
+}
+
+fn default_disable_oauth() -> bool {
+ false
+}
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(rename_all = "camelCase")]
+struct ListResponse {
+ next_page_token: Option<String>,
+ #[serde(default)]
+ prefixes: Vec<String>,
+ #[serde(default)]
+ items: Vec<Object>,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct Object {
+ name: String,
+ size: String,
+ updated: DateTime<Utc>,
+}
+
+/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).
+#[derive(Debug)]
+pub struct GoogleCloudStorage {
+ client: Client,
+ base_url: String,
+
+ oauth_provider: Option<OAuthProvider>,
+ token_cache: TokenCache<String>,
+
+ bucket_name: String,
+ bucket_name_encoded: String,
+
+ // TODO: Hook this up in tests
+ max_list_results: Option<String>,
+}
+
+impl std::fmt::Display for GoogleCloudStorage {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "GoogleCloudStorage({})", self.bucket_name)
+ }
+}
+
+impl GoogleCloudStorage {
+ async fn get_token(&self) -> Result<String> {
+ if let Some(oauth_provider) = &self.oauth_provider {
+ Ok(self
+ .token_cache
+ .get_or_insert_with(|| oauth_provider.fetch_token(&self.client))
+ .await?)
+ } else {
+ Ok("".to_owned())
+ }
+ }
+
+ fn object_url(&self, path: &Path) -> String {
+ let encoded =
+ percent_encoding::utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
+ format!(
+ "{}/storage/v1/b/{}/o/{}",
+ self.base_url, self.bucket_name_encoded, encoded
+ )
+ }
+
+ /// Perform a get request <https://cloud.google.com/storage/docs/json_api/v1/objects/get>
+ async fn get_request(
+ &self,
+ path: &Path,
+ range: Option<Range<usize>>,
+ head: bool,
+ ) -> Result<Response> {
+ let token = self.get_token().await?;
+ let url = self.object_url(path);
+
+ let mut builder = self.client.request(Method::GET, url);
+
+ if let Some(range) = range {
+ builder = builder.header(RANGE, format_http_range(range));
+ }
+
+ let alt = match head {
+ true => "json",
+ false => "media",
+ };
+
+ let response = builder
+ .bearer_auth(token)
+ .query(&[("alt", alt)])
+ .send()
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?
+ .error_for_status()
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(response)
+ }
+
+ /// Perform a put request <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>
+ async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
+ let token = self.get_token().await?;
+ let url = format!(
+ "{}/upload/storage/v1/b/{}/o",
+ self.base_url, self.bucket_name_encoded
+ );
+
+ self.client
+ .request(Method::POST, url)
+ .bearer_auth(token)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .header(header::CONTENT_LENGTH, payload.len())
+ .query(&[("uploadType", "media"), ("name", path.as_ref())])
+ .body(payload)
+ .send()
+ .await
+ .context(PutRequestSnafu)?
+ .error_for_status()
+ .context(PutRequestSnafu)?;
+
+ Ok(())
+ }
+
+ /// Perform a delete request <https://cloud.google.com/storage/docs/json_api/v1/objects/delete>
+ async fn delete_request(&self, path: &Path) -> Result<()> {
+ let token = self.get_token().await?;
+ let url = self.object_url(path);
+
+ let builder = self.client.request(Method::DELETE, url);
+ builder
+ .bearer_auth(token)
+ .send()
+ .await
+ .context(DeleteRequestSnafu {
+ path: path.as_ref(),
+ })?
+ .error_for_status()
+ .context(DeleteRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ /// Perform a copy request <https://cloud.google.com/storage/docs/json_api/v1/objects/copy>
+ async fn copy_request(
+ &self,
+ from: &Path,
+ to: &Path,
+ if_not_exists: bool,
+ ) -> Result<()> {
+ let token = self.get_token().await?;
+
+ let source =
+ percent_encoding::utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
+ let destination =
+ percent_encoding::utf8_percent_encode(to.as_ref(), NON_ALPHANUMERIC);
+ let url = format!(
+ "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
+ self.base_url,
+ self.bucket_name_encoded,
+ source,
+ self.bucket_name_encoded,
+ destination
+ );
+
+ let mut builder = self.client.request(Method::POST, url);
+
+ if if_not_exists {
+ builder = builder.query(&[("ifGenerationMatch", "0")]);
+ }
+
+ builder
+ .bearer_auth(token)
+ .send()
+ .await
+ .context(CopyRequestSnafu {
+ path: from.as_ref(),
+ })?
+ .error_for_status()
+ .context(CopyRequestSnafu {
+ path: from.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ /// Perform a list request <https://cloud.google.com/storage/docs/json_api/v1/objects/list>
+ async fn list_request(
+ &self,
+ prefix: Option<&str>,
+ delimiter: bool,
+ page_token: Option<&str>,
+ ) -> Result<ListResponse> {
+ let token = self.get_token().await?;
+
+ let url = format!(
+ "{}/storage/v1/b/{}/o",
+ self.base_url, self.bucket_name_encoded
+ );
+
+ let mut query = Vec::with_capacity(4);
+ if delimiter {
+ query.push(("delimiter", DELIMITER))
+ }
+
+ if let Some(prefix) = &prefix {
+ query.push(("prefix", prefix))
+ }
+
+ if let Some(page_token) = page_token {
+ query.push(("pageToken", page_token))
+ }
+
+ if let Some(max_results) = &self.max_list_results {
+ query.push(("maxResults", max_results))
+ }
+
+ let response: ListResponse = self
+ .client
+ .request(Method::GET, url)
+ .query(&query)
+ .bearer_auth(token)
+ .send()
+ .await
+ .context(ListRequestSnafu)?
+ .error_for_status()
+ .context(ListRequestSnafu)?
+ .json()
+ .await
+ .context(ListRequestSnafu)?;
+
+ Ok(response)
+ }
+
+ /// Perform a list operation automatically handling pagination
+ fn list_paginated(
+ &self,
+ prefix: Option<&Path>,
+ delimiter: bool,
+ ) -> Result<BoxStream<'_, Result<ListResponse>>> {
+ let prefix = format_prefix(prefix);
+
+ enum ListState {
+ Start,
+ HasMore(String),
+ Done,
+ }
+
+ Ok(futures::stream::unfold(ListState::Start, move |state| {
+ let prefix = prefix.clone();
+
+ async move {
+ let page_token = match &state {
+ ListState::Start => None,
+ ListState::HasMore(page_token) => Some(page_token.as_str()),
+ ListState::Done => {
+ return None;
+ }
+ };
+
+ let resp = match self
+ .list_request(prefix.as_deref(), delimiter, page_token)
+ .await
+ {
+ Ok(resp) => resp,
+ Err(e) => return Some((Err(e), state)),
+ };
+
+ let next_state = match &resp.next_page_token {
+ Some(token) => ListState::HasMore(token.clone()),
+ None => ListState::Done,
+ };
+
+ Some((Ok(resp), next_state))
+ }
+ })
+ .boxed())
+ }
+}
+
+#[async_trait]
+impl ObjectStore for GoogleCloudStorage {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ self.put_request(location, bytes).await
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let response = self.get_request(location, None, false).await?;
+ let stream = response
+ .bytes_stream()
+ .map_err(|source| crate::Error::Generic {
+ store: "GCS",
+ source: Box::new(source),
+ })
+ .boxed();
+
+ Ok(GetResult::Stream(stream))
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let response = self.get_request(location, Some(range), false).await?;
+ Ok(response.bytes().await.context(GetRequestSnafu {
+ path: location.as_ref(),
+ })?)
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let response = self.get_request(location, None, true).await?;
+ let object = response.json().await.context(GetRequestSnafu {
+ path: location.as_ref(),
+ })?;
+ convert_object_meta(&object)
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.delete_request(location).await
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let stream = self
+ .list_paginated(prefix, false)?
+ .map_ok(|r| {
+ futures::stream::iter(
+ r.items.into_iter().map(|x| convert_object_meta(&x)),
+ )
+ })
+ .try_flatten()
+ .boxed();
+
+ Ok(stream)
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let mut stream = self.list_paginated(prefix, true)?;
+
+ let mut common_prefixes = BTreeSet::new();
+ let mut objects = Vec::new();
+
+ while let Some(result) = stream.next().await {
+ let response = result?;
+
+ for p in response.prefixes {
+ common_prefixes.insert(Path::parse(p)?);
+ }
+
+ objects.reserve(response.items.len());
+ for object in &response.items {
+ objects.push(convert_object_meta(object)?);
+ }
+ }
+
+ Ok(ListResult {
+ common_prefixes: common_prefixes.into_iter().collect(),
+ objects,
+ })
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ self.copy_request(from, to, false).await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.copy_request(from, to, true).await
+ }
+}
+
+fn reader_credentials_file(
+ service_account_path: impl AsRef<std::path::Path>,
+) -> Result<ServiceAccountCredentials> {
+ let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
+ let reader = BufReader::new(file);
+ Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?)
+}
+
+/// Configure a connection to Google Cloud Storage.
+pub fn new_gcs(
+ service_account_path: impl AsRef<std::path::Path>,
+ bucket_name: impl Into<String>,
+) -> Result<GoogleCloudStorage> {
+ let credentials = reader_credentials_file(service_account_path)?;
+ let client = Client::new();
+
+ // TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes
+ let scope = "https://www.googleapis.com/auth/devstorage.full_control";
+ let audience = "https://www.googleapis.com/oauth2/v4/token".to_string();
+
+ let oauth_provider = (!credentials.disable_oauth)
+ .then(|| {
+ OAuthProvider::new(
+ credentials.client_email,
+ credentials.private_key,
+ scope.to_string(),
+ audience,
+ )
+ })
+ .transpose()?;
+
+ let bucket_name = bucket_name.into();
+ let encoded_bucket_name =
+ percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
+
+ // The cloud storage crate currently only supports authentication via
+ // environment variables. Set the environment variable explicitly so
+ // that we can optionally accept command line arguments instead.
+ Ok(GoogleCloudStorage {
+ client,
+ base_url: credentials.gcs_base_url,
+ oauth_provider,
+ token_cache: Default::default(),
+ bucket_name,
+ bucket_name_encoded: encoded_bucket_name,
+ max_list_results: None,
+ })
+}
+
+fn convert_object_meta(object: &Object) -> Result<ObjectMeta> {
+ let location = Path::parse(&object.name)?;
+ let last_modified = object.updated;
+ let size = object.size.parse().context(InvalidSizeSnafu)?;
+
+ Ok(ObjectMeta {
+ location,
+ last_modified,
+ size,
+ })
+}
+
+#[cfg(test)]
+mod test {
+ use std::env;
+
+ use bytes::Bytes;
+
+ use crate::{
+ tests::{
+ get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
+ put_get_delete_list, rename_and_copy,
+ },
+ Error as ObjectStoreError, ObjectStore,
+ };
+
+ use super::*;
+
+ const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+ #[derive(Debug)]
+ struct GoogleCloudConfig {
+ bucket: String,
+ service_account: String,
+ }
+
+ // Helper macro to skip tests if TEST_INTEGRATION and the GCP environment variables are not set.
+ macro_rules! maybe_skip_integration {
+ () => {{
+ dotenv::dotenv().ok();
+
+ let required_vars = ["OBJECT_STORE_BUCKET", "GOOGLE_SERVICE_ACCOUNT"];
+ let unset_vars: Vec<_> = required_vars
+ .iter()
+ .filter_map(|&name| match env::var(name) {
+ Ok(_) => None,
+ Err(_) => Some(name),
+ })
+ .collect();
+ let unset_var_names = unset_vars.join(", ");
+
+ let force = std::env::var("TEST_INTEGRATION");
+
+ if force.is_ok() && !unset_var_names.is_empty() {
+ panic!(
+ "TEST_INTEGRATION is set, \
+ but variable(s) {} need to be set",
+ unset_var_names
+ )
+ } else if force.is_err() {
+ eprintln!(
+ "skipping Google Cloud integration test - set {}TEST_INTEGRATION to run",
+ if unset_var_names.is_empty() {
+ String::new()
+ } else {
+ format!("{} and ", unset_var_names)
+ }
+ );
+ return;
+ } else {
+ GoogleCloudConfig {
+ bucket: env::var("OBJECT_STORE_BUCKET")
+ .expect("already checked OBJECT_STORE_BUCKET"),
+ service_account: env::var("GOOGLE_SERVICE_ACCOUNT")
+ .expect("already checked GOOGLE_SERVICE_ACCOUNT"),
+ }
+ }
+ }};
+ }
+
+ #[tokio::test]
+ async fn gcs_test() {
+ let config = maybe_skip_integration!();
+ let integration = new_gcs(config.service_account, config.bucket).unwrap();
+
+ put_get_delete_list(&integration).await.unwrap();
+ list_uses_directories_correctly(&integration).await.unwrap();
+ list_with_delimiter(&integration).await.unwrap();
+ rename_and_copy(&integration).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn gcs_test_get_nonexistent_location() {
+ let config = maybe_skip_integration!();
+ let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = integration.get(&location).await.unwrap_err();
+
+ assert!(
+ matches!(err, ObjectStoreError::NotFound { .. }),
+ "unexpected error type: {}",
+ err
+ );
+ }
+
+ #[tokio::test]
+ async fn gcs_test_get_nonexistent_bucket() {
+ let mut config = maybe_skip_integration!();
+ config.bucket = NON_EXISTENT_NAME.into();
+ let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = get_nonexistent_object(&integration, Some(location))
+ .await
+ .unwrap_err();
+
+ assert!(
+ matches!(err, ObjectStoreError::NotFound { .. }),
+ "unexpected error type: {}",
+ err
+ );
+ }
+
+ #[tokio::test]
+ async fn gcs_test_delete_nonexistent_location() {
+ let config = maybe_skip_integration!();
+ let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = integration.delete(&location).await.unwrap_err();
+ assert!(
+ matches!(err, ObjectStoreError::NotFound { .. }),
+ "unexpected error type: {}",
+ err
+ );
+ }
+
+ #[tokio::test]
+ async fn gcs_test_delete_nonexistent_bucket() {
+ let mut config = maybe_skip_integration!();
+ config.bucket = NON_EXISTENT_NAME.into();
+ let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+ let err = integration.delete(&location).await.unwrap_err();
+ assert!(
+ matches!(err, ObjectStoreError::NotFound { .. }),
+ "unexpected error type: {}",
+ err
+ );
+ }
+
+ #[tokio::test]
+ async fn gcs_test_put_nonexistent_bucket() {
+ let mut config = maybe_skip_integration!();
+ config.bucket = NON_EXISTENT_NAME.into();
+ let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+ let location = Path::from_iter([NON_EXISTENT_NAME]);
+ let data = Bytes::from("arbitrary data");
+
+ let err = integration
+ .put(&location, data)
+ .await
+ .unwrap_err()
+ .to_string();
+ assert!(
+ err.contains(
+ "Error performing put request: HTTP status client error (404 Not Found)"
+ ),
+ "{}",
+ err
+ )
+ }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
new file mode 100644
index 0000000..4a56b03
--- /dev/null
+++ b/object_store/src/lib.rs
@@ -0,0 +1,706 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
+#![warn(
+ missing_copy_implementations,
+ missing_debug_implementations,
+ missing_docs,
+ clippy::explicit_iter_loop,
+ clippy::future_not_send,
+ clippy::use_self,
+ clippy::clone_on_ref_ptr
+)]
+
+//! # object_store
+//!
+//! This crate provides APIs for interacting with object storage services.
+//!
+//! It currently supports PUT, GET, DELETE, HEAD and list for:
+//!
+//! * [Google Cloud Storage](https://cloud.google.com/storage/)
+//! * [Amazon S3](https://aws.amazon.com/s3/)
+//! * [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/#overview)
+//! * In-memory
+//! * Local file storage
+//!
+
+#[cfg(feature = "aws")]
+pub mod aws;
+#[cfg(feature = "azure")]
+pub mod azure;
+#[cfg(feature = "gcp")]
+pub mod gcp;
+pub mod local;
+pub mod memory;
+pub mod path;
+pub mod throttle;
+
+#[cfg(feature = "gcp")]
+mod oauth;
+
+#[cfg(feature = "gcp")]
+mod token;
+
+mod util;
+
+use crate::path::Path;
+use crate::util::{collect_bytes, maybe_spawn_blocking};
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{stream::BoxStream, StreamExt};
+use snafu::Snafu;
+use std::fmt::{Debug, Formatter};
+use std::io::{Read, Seek, SeekFrom};
+use std::ops::Range;
+
+/// An alias for a dynamically dispatched object store implementation.
+pub type DynObjectStore = dyn ObjectStore;
+
+/// Universal API to multiple object store services.
+#[async_trait]
+pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
+ /// Save the provided bytes to the specified location.
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
+
+ /// Return the bytes that are stored at the specified location.
+ async fn get(&self, location: &Path) -> Result<GetResult>;
+
+ /// Return the bytes that are stored at the specified location
+ /// in the given byte range
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;
+
+ /// Return the metadata for the specified location
+ async fn head(&self, location: &Path) -> Result<ObjectMeta>;
+
+ /// Delete the object at the specified location.
+ async fn delete(&self, location: &Path) -> Result<()>;
+
+ /// List all the objects with the given prefix.
+ ///
+ /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
+ /// `foo/bar_baz/x`.
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+
+ /// List objects with the given prefix and an implementation specific
+ /// delimiter. Returns common prefixes (directories) in addition to object
+ /// metadata.
+ ///
+ /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
+ /// `foo/bar_baz/x`.
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
+
+ /// Copy an object from one path to another in the same object store.
+ ///
+ /// If there exists an object at the destination, it will be overwritten.
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
+
+ /// Move an object from one path to another in the same object store.
+ ///
+ /// By default, this is implemented as a copy and then delete source. It may not
+ /// check when deleting source that it was the same object that was originally copied.
+ ///
+ /// If there exists an object at the destination, it will be overwritten.
+ async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ self.copy(from, to).await?;
+ self.delete(from).await
+ }
+
+ /// Copy an object from one path to another, only if destination is empty.
+ ///
+ /// Will return an error if the destination already has an object.
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
+
+ /// Move an object from one path to another in the same object store.
+ ///
+ /// Will return an error if the destination already has an object.
+ async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.copy_if_not_exists(from, to).await?;
+ self.delete(from).await
+ }
+}
+
+/// Result of a list call that includes objects, prefixes (directories) and a
+/// token for the next set of results. Individual result sets may be limited to
+/// 1,000 objects based on the underlying object storage's limitations.
+#[derive(Debug)]
+pub struct ListResult {
+ /// Prefixes that are common (like directories)
+ pub common_prefixes: Vec<Path>,
+ /// Object metadata for the listing
+ pub objects: Vec<ObjectMeta>,
+}
+
+/// The metadata that describes an object.
+#[derive(Debug, Clone, PartialEq)]
+pub struct ObjectMeta {
+ /// The full path to the object
+ pub location: Path,
+ /// The last modified time
+ pub last_modified: DateTime<Utc>,
+ /// The size in bytes of the object
+ pub size: usize,
+}
+
+/// Result for a get request
+///
+/// This special cases the case of a local file, as some systems may
+/// be able to optimise the case of a file already present on local disk
+pub enum GetResult {
+ /// A file and its path on the local filesystem
+ File(std::fs::File, std::path::PathBuf),
+ /// An asynchronous stream
+ Stream(BoxStream<'static, Result<Bytes>>),
+}
+
+impl Debug for GetResult {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::File(_, _) => write!(f, "GetResult(File)"),
+ Self::Stream(_) => write!(f, "GetResult(Stream)"),
+ }
+ }
+}
+
+impl GetResult {
+ /// Collects the data into a [`Bytes`]
+ pub async fn bytes(self) -> Result<Bytes> {
+ match self {
+ Self::File(mut file, path) => {
+ maybe_spawn_blocking(move || {
+ let len = file.seek(SeekFrom::End(0)).map_err(|source| {
+ local::Error::Seek {
+ source,
+ path: path.clone(),
+ }
+ })?;
+
+ file.seek(SeekFrom::Start(0)).map_err(|source| {
+ local::Error::Seek {
+ source,
+ path: path.clone(),
+ }
+ })?;
+
+ let mut buffer = Vec::with_capacity(len as usize);
+ file.read_to_end(&mut buffer).map_err(|source| {
+ local::Error::UnableToReadBytes { source, path }
+ })?;
+
+ Ok(buffer.into())
+ })
+ .await
+ }
+ Self::Stream(s) => collect_bytes(s, None).await,
+ }
+ }
+
+ /// Converts this into a byte stream
+ ///
+ /// If the result is [`Self::File`] will perform chunked reads of the file, otherwise
+ /// will return the [`Self::Stream`].
+ ///
+ /// # Tokio Compatibility
+ ///
+ /// Tokio discourages performing blocking IO on a tokio worker thread, however,
+ /// no major operating systems have stable async file APIs. Therefore if called from
+ /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
+ /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
+ ///
+ /// If not called from a tokio context, this will perform IO on the current thread with
+ /// no additional complexity or overheads
+ pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
+ match self {
+ Self::File(file, path) => {
+ const CHUNK_SIZE: usize = 8 * 1024;
+
+ futures::stream::try_unfold(
+ (file, path, false),
+ |(mut file, path, finished)| {
+ maybe_spawn_blocking(move || {
+ if finished {
+ return Ok(None);
+ }
+
+ let mut buffer = Vec::with_capacity(CHUNK_SIZE);
+ let read = file
+ .by_ref()
+ .take(CHUNK_SIZE as u64)
+ .read_to_end(&mut buffer)
+ .map_err(|e| local::Error::UnableToReadBytes {
+ source: e,
+ path: path.clone(),
+ })?;
+
+ Ok(Some((buffer.into(), (file, path, read != CHUNK_SIZE))))
+ })
+ },
+ )
+ .boxed()
+ }
+ Self::Stream(s) => s,
+ }
+ }
+}
+
+/// A specialized `Result` for object store-related errors
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub enum Error {
+ #[snafu(display("Generic {} error: {}", store, source))]
+ Generic {
+ store: &'static str,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ #[snafu(display("Object at location {} not found: {}", path, source))]
+ NotFound {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ #[snafu(
+ display("Encountered object with invalid path: {}", source),
+ context(false)
+ )]
+ InvalidPath { source: path::Error },
+
+ #[snafu(display("Error joining spawned task: {}", source), context(false))]
+ JoinError { source: tokio::task::JoinError },
+
+ #[snafu(display("Operation not supported: {}", source))]
+ NotSupported {
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ #[snafu(display("Object at location {} already exists: {}", path, source))]
+ AlreadyExists {
+ path: String,
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ },
+
+ #[snafu(display("Operation not yet implemented."))]
+ NotImplemented,
+
+ #[cfg(feature = "gcp")]
+ #[snafu(display("OAuth error: {}", source), context(false))]
+ OAuth { source: oauth::Error },
+}
+
+#[cfg(test)]
+mod test_util {
+ use super::*;
+ use futures::TryStreamExt;
+
+ pub async fn flatten_list_stream(
+ storage: &DynObjectStore,
+ prefix: Option<&Path>,
+ ) -> Result<Vec<Path>> {
+ storage
+ .list(prefix)
+ .await?
+ .map_ok(|meta| meta.location)
+ .try_collect::<Vec<Path>>()
+ .await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test_util::flatten_list_stream;
+
+ type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
+ type Result<T, E = Error> = std::result::Result<T, E>;
+
+ pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) -> Result<()> {
+ let store_str = storage.to_string();
+
+ delete_fixtures(storage).await;
+
+ let content_list = flatten_list_stream(storage, None).await?;
+ assert!(
+ content_list.is_empty(),
+ "Expected list to be empty; found: {:?}",
+ content_list
+ );
+
+ let location = Path::from("test_dir/test_file.json");
+
+ let data = Bytes::from("arbitrary data");
+ let expected_data = data.clone();
+ storage.put(&location, data).await?;
+
+ let root = Path::from("/");
+
+ // List everything
+ let content_list = flatten_list_stream(storage, None).await?;
+ assert_eq!(content_list, &[location.clone()]);
+
+ // Should behave the same as no prefix
+ let content_list = flatten_list_stream(storage, Some(&root)).await?;
+ assert_eq!(content_list, &[location.clone()]);
+
+ // List with delimiter
+ let result = storage.list_with_delimiter(None).await.unwrap();
+ assert_eq!(&result.objects, &[]);
+ assert_eq!(result.common_prefixes.len(), 1);
+ assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+ // Should behave the same as no prefix
+ let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
+ assert!(result.objects.is_empty());
+ assert_eq!(result.common_prefixes.len(), 1);
+ assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+ // List everything starting with a prefix that should return results
+ let prefix = Path::from("test_dir");
+ let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ assert_eq!(content_list, &[location.clone()]);
+
+ // List everything starting with a prefix that shouldn't return results
+ let prefix = Path::from("something");
+ let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ assert!(content_list.is_empty());
+
+ let read_data = storage.get(&location).await?.bytes().await?;
+ assert_eq!(&*read_data, expected_data);
+
+ // Test range request
+ let range = 3..7;
+ let range_result = storage.get_range(&location, range.clone()).await;
+
+ let out_of_range = 200..300;
+ let out_of_range_result = storage.get_range(&location, out_of_range).await;
+
+ if store_str.starts_with("MicrosoftAzureEmulator") {
+ // Azurite doesn't support x-ms-range-get-content-crc64 set by Azure SDK
+ // https://github.com/Azure/Azurite/issues/444
+ let err = range_result.unwrap_err().to_string();
+ assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
+
+ let err = out_of_range_result.unwrap_err().to_string();
+ assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
+ } else {
+ let bytes = range_result.unwrap();
+ assert_eq!(bytes, expected_data.slice(range));
+
+ // Should be a non-fatal error
+ out_of_range_result.unwrap_err();
+ }
+
+ let head = storage.head(&location).await?;
+ assert_eq!(head.size, expected_data.len());
+
+ storage.delete(&location).await?;
+
+ let content_list = flatten_list_stream(storage, None).await?;
+ assert!(content_list.is_empty());
+
+ let err = storage.get(&location).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ let err = storage.head(&location).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ // Test handling of paths containing an encoded delimiter
+
+ let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
+ storage
+ .put(&file_with_delimiter, Bytes::from("arbitrary"))
+ .await
+ .unwrap();
+
+ let files = flatten_list_stream(storage, None).await.unwrap();
+ assert_eq!(files, vec![file_with_delimiter.clone()]);
+
+ let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
+ .await
+ .unwrap();
+ assert!(files.is_empty());
+
+ let files = storage
+ .list_with_delimiter(Some(&Path::from("a/b")))
+ .await
+ .unwrap();
+ assert!(files.common_prefixes.is_empty());
+ assert!(files.objects.is_empty());
+
+ let files = storage
+ .list_with_delimiter(Some(&Path::from("a")))
+ .await
+ .unwrap();
+ assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
+ assert!(files.objects.is_empty());
+
+ let files = storage
+ .list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
+ .await
+ .unwrap();
+ assert!(files.common_prefixes.is_empty());
+ assert_eq!(files.objects.len(), 1);
+ assert_eq!(files.objects[0].location, file_with_delimiter);
+
+ storage.delete(&file_with_delimiter).await.unwrap();
+
+ // Test handling of paths containing non-ASCII characters, e.g. emoji
+
+ let emoji_prefix = Path::from("🙀");
+ let emoji_file = Path::from("🙀/😀.parquet");
+ storage
+ .put(&emoji_file, Bytes::from("arbitrary"))
+ .await
+ .unwrap();
+
+ storage.head(&emoji_file).await.unwrap();
+ storage
+ .get(&emoji_file)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+
+ let files = flatten_list_stream(storage, Some(&emoji_prefix))
+ .await
+ .unwrap();
+
+ assert_eq!(files, vec![emoji_file.clone()]);
+
+ storage.delete(&emoji_file).await.unwrap();
+ let files = flatten_list_stream(storage, Some(&emoji_prefix))
+ .await
+ .unwrap();
+ assert!(files.is_empty());
+
+ Ok(())
+ }
+
+ pub(crate) async fn list_uses_directories_correctly(
+ storage: &DynObjectStore,
+ ) -> Result<()> {
+ delete_fixtures(storage).await;
+
+ let content_list = flatten_list_stream(storage, None).await?;
+ assert!(
+ content_list.is_empty(),
+ "Expected list to be empty; found: {:?}",
+ content_list
+ );
+
+ let location1 = Path::from("foo/x.json");
+ let location2 = Path::from("foo.bar/y.json");
+
+ let data = Bytes::from("arbitrary data");
+ storage.put(&location1, data.clone()).await?;
+ storage.put(&location2, data).await?;
+
+ let prefix = Path::from("foo");
+ let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ assert_eq!(content_list, &[location1.clone()]);
+
+ let prefix = Path::from("foo/x");
+ let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+ assert_eq!(content_list, &[]);
+
+ Ok(())
+ }
+
+ pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) -> Result<()> {
+ delete_fixtures(storage).await;
+
+ // ==================== check: store is empty ====================
+ let content_list = flatten_list_stream(storage, None).await?;
+ assert!(content_list.is_empty());
+
+ // ==================== do: create files ====================
+ let data = Bytes::from("arbitrary data");
+
+ let files: Vec<_> = [
+ "test_file",
+ "mydb/wb/000/000/000.segment",
+ "mydb/wb/000/000/001.segment",
+ "mydb/wb/000/000/002.segment",
+ "mydb/wb/001/001/000.segment",
+ "mydb/wb/foo.json",
+ "mydb/wbwbwb/111/222/333.segment",
+ "mydb/data/whatevs",
+ ]
+ .iter()
+ .map(|&s| Path::from(s))
+ .collect();
+
+ for f in &files {
+ let data = data.clone();
+ storage.put(f, data).await.unwrap();
+ }
+
+ // ==================== check: prefix-list `mydb/wb` (directory) ====================
+ let prefix = Path::from("mydb/wb");
+
+ let expected_000 = Path::from("mydb/wb/000");
+ let expected_001 = Path::from("mydb/wb/001");
+ let expected_location = Path::from("mydb/wb/foo.json");
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+
+ assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
+ assert_eq!(result.objects.len(), 1);
+
+ let object = &result.objects[0];
+
+ assert_eq!(object.location, expected_location);
+ assert_eq!(object.size, data.len());
+
+ // ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
+ let prefix = Path::from("mydb/wb/000/000/001");
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert!(result.common_prefixes.is_empty());
+ assert_eq!(result.objects.len(), 0);
+
+ // ==================== check: prefix-list `not_there` (non-existing prefix) ====================
+ let prefix = Path::from("not_there");
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert!(result.common_prefixes.is_empty());
+ assert!(result.objects.is_empty());
+
+ // ==================== do: remove all files ====================
+ for f in &files {
+ storage.delete(f).await.unwrap();
+ }
+
+ // ==================== check: store is empty ====================
+ let content_list = flatten_list_stream(storage, None).await?;
+ assert!(content_list.is_empty());
+
+ Ok(())
+ }
+
+ pub(crate) async fn get_nonexistent_object(
+ storage: &DynObjectStore,
+ location: Option<Path>,
+ ) -> crate::Result<Bytes> {
+ let location =
+ location.unwrap_or_else(|| Path::from("this_file_should_not_exist"));
+
+ let err = storage.head(&location).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }));
+
+ storage.get(&location).await?.bytes().await
+ }
+
+ pub(crate) async fn rename_and_copy(storage: &DynObjectStore) -> Result<()> {
+ // Create two objects
+ let path1 = Path::from("test1");
+ let path2 = Path::from("test2");
+ let contents1 = Bytes::from("cats");
+ let contents2 = Bytes::from("dogs");
+
+ // copy() make both objects identical
+ storage.put(&path1, contents1.clone()).await?;
+ storage.put(&path2, contents2.clone()).await?;
+ storage.copy(&path1, &path2).await?;
+ let new_contents = storage.get(&path2).await?.bytes().await?;
+ assert_eq!(&new_contents, &contents1);
+
+ // rename() copies contents and deletes original
+ storage.put(&path1, contents1.clone()).await?;
+ storage.put(&path2, contents2.clone()).await?;
+ storage.rename(&path1, &path2).await?;
+ let new_contents = storage.get(&path2).await?.bytes().await?;
+ assert_eq!(&new_contents, &contents1);
+ let result = storage.get(&path1).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+ // Clean up
+ storage.delete(&path2).await?;
+
+ Ok(())
+ }
+
+ pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) -> Result<()> {
+ // Create two objects
+ let path1 = Path::from("test1");
+ let path2 = Path::from("test2");
+ let contents1 = Bytes::from("cats");
+ let contents2 = Bytes::from("dogs");
+
+ // copy_if_not_exists() errors if destination already exists
+ storage.put(&path1, contents1.clone()).await?;
+ storage.put(&path2, contents2.clone()).await?;
+ let result = storage.copy_if_not_exists(&path1, &path2).await;
+ assert!(result.is_err());
+ assert!(matches!(
+ result.unwrap_err(),
+ crate::Error::AlreadyExists { .. }
+ ));
+
+ // copy_if_not_exists() copies contents and allows deleting original
+ storage.delete(&path2).await?;
+ storage.copy_if_not_exists(&path1, &path2).await?;
+ storage.delete(&path1).await?;
+ let new_contents = storage.get(&path2).await?.bytes().await?;
+ assert_eq!(&new_contents, &contents1);
+ let result = storage.get(&path1).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+ // Clean up
+ storage.delete(&path2).await?;
+
+ Ok(())
+ }
+
+ async fn delete_fixtures(storage: &DynObjectStore) {
+ let paths = flatten_list_stream(storage, None).await.unwrap();
+
+ for f in &paths {
+ let _ = storage.delete(f).await;
+ }
+ }
+
+ /// Test that the returned stream does not borrow the lifetime of Path
+ async fn list_store<'a, 'b>(
+ store: &'a dyn ObjectStore,
+ path_str: &'b str,
+ ) -> super::Result<BoxStream<'a, super::Result<ObjectMeta>>> {
+ let path = Path::from(path_str);
+ store.list(Some(&path)).await
+ }
+
+ #[tokio::test]
+ async fn test_list_lifetimes() {
+ let store = memory::InMemory::new();
+ let stream = list_store(&store, "path").await.unwrap();
+ assert_eq!(stream.count().await, 0);
+ }
+
+ // Tests TODO:
+ // GET nonexisting location (in_memory/file)
+ // DELETE nonexisting location
+ // PUT overwriting
+}
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
new file mode 100644
index 0000000..8a9462e
--- /dev/null
+++ b/object_store/src/local.rs
@@ -0,0 +1,773 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for a local filesystem
+use crate::{
+ maybe_spawn_blocking,
+ path::{filesystem_path_to_url, Path},
+ GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::{stream::BoxStream, StreamExt};
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
+use std::collections::VecDeque;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom, Write};
+use std::ops::Range;
+use std::sync::Arc;
+use std::{collections::BTreeSet, convert::TryFrom, io};
+use url::Url;
+use walkdir::{DirEntry, WalkDir};
+
+/// A specialized `Error` for filesystem object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub(crate) enum Error {
+ #[snafu(display("File size for {} did not fit in a usize: {}", path, source))]
+ FileSizeOverflowedUsize {
+ source: std::num::TryFromIntError,
+ path: String,
+ },
+
+ #[snafu(display("Unable to walk dir: {}", source))]
+ UnableToWalkDir {
+ source: walkdir::Error,
+ },
+
+ #[snafu(display("Unable to access metadata for {}: {}", path, source))]
+ UnableToAccessMetadata {
+ source: Box<dyn std::error::Error + Send + Sync + 'static>,
+ path: String,
+ },
+
+ #[snafu(display("Unable to copy data to file: {}", source))]
+ UnableToCopyDataToFile {
+ source: io::Error,
+ },
+
+ #[snafu(display("Unable to create dir {}: {}", path.display(), source))]
+ UnableToCreateDir {
+ source: io::Error,
+ path: std::path::PathBuf,
+ },
+
+ #[snafu(display("Unable to create file {}: {}", path.display(), err))]
+ UnableToCreateFile {
+ path: std::path::PathBuf,
+ err: io::Error,
+ },
+
+ #[snafu(display("Unable to delete file {}: {}", path.display(), source))]
+ UnableToDeleteFile {
+ source: io::Error,
+ path: std::path::PathBuf,
+ },
+
+ #[snafu(display("Unable to open file {}: {}", path.display(), source))]
+ UnableToOpenFile {
+ source: io::Error,
+ path: std::path::PathBuf,
+ },
+
+ #[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
+ UnableToReadBytes {
+ source: io::Error,
+ path: std::path::PathBuf,
+ },
+
+ #[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
+ OutOfRange {
+ path: std::path::PathBuf,
+ expected: usize,
+ actual: usize,
+ },
+
+ #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
+ UnableToCopyFile {
+ from: std::path::PathBuf,
+ to: std::path::PathBuf,
+ source: io::Error,
+ },
+
+ NotFound {
+ path: std::path::PathBuf,
+ source: io::Error,
+ },
+
+ #[snafu(display("Error seeking file {}: {}", path.display(), source))]
+ Seek {
+ source: io::Error,
+ path: std::path::PathBuf,
+ },
+
+ #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
+ InvalidUrl {
+ url: Url,
+ },
+
+ AlreadyExists {
+ path: String,
+ source: io::Error,
+ },
+}
+
+impl From<Error> for super::Error {
+ fn from(source: Error) -> Self {
+ match source {
+ Error::NotFound { path, source } => Self::NotFound {
+ path: path.to_string_lossy().to_string(),
+ source: source.into(),
+ },
+ Error::AlreadyExists { path, source } => Self::AlreadyExists {
+ path,
+ source: source.into(),
+ },
+ _ => Self::Generic {
+ store: "LocalFileSystem",
+ source: Box::new(source),
+ },
+ }
+ }
+}
+
+/// Local filesystem storage providing an [`ObjectStore`] interface to files on
+/// local disk. Can optionally be created with a directory prefix
+///
+/// # Path Semantics
+///
+/// This implementation follows the [file URI] scheme outlined in [RFC 3986]. In
+/// particular paths are delimited by `/`
+///
+/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+/// [RFC 3986]: https://www.rfc-editor.org/rfc/rfc3986
+///
+/// # Tokio Compatibility
+///
+/// Tokio discourages performing blocking IO on a tokio worker thread, however,
+/// no major operating systems have stable async file APIs. Therefore if called from
+/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
+/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
+///
+/// If not called from a tokio context, this will perform IO on the current thread with
+/// no additional complexity or overheads
+#[derive(Debug)]
+pub struct LocalFileSystem {
+ config: Arc<Config>,
+}
+
+#[derive(Debug)]
+struct Config {
+ root: Url,
+}
+
+impl std::fmt::Display for LocalFileSystem {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "LocalFileSystem({})", self.config.root)
+ }
+}
+
+impl Default for LocalFileSystem {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl LocalFileSystem {
+ /// Create new filesystem storage with no prefix
+ pub fn new() -> Self {
+ Self {
+ config: Arc::new(Config {
+ root: Url::parse("file:///").unwrap(),
+ }),
+ }
+ }
+
+ /// Create new filesystem storage with `prefix` applied to all paths
+ pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
+ Ok(Self {
+ config: Arc::new(Config {
+ root: filesystem_path_to_url(prefix)?,
+ }),
+ })
+ }
+}
+
+impl Config {
+ /// Return filesystem path of the given location
+ fn path_to_filesystem(&self, location: &Path) -> Result<std::path::PathBuf> {
+ let mut url = self.root.clone();
+ url.path_segments_mut()
+ .expect("url path")
+ .extend(location.parts());
+
+ url.to_file_path()
+ .map_err(|_| Error::InvalidUrl { url }.into())
+ }
+
+ fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
+ Ok(Path::from_filesystem_path_with_base(
+ location,
+ Some(&self.root),
+ )?)
+ }
+}
+
+#[async_trait]
+impl ObjectStore for LocalFileSystem {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ let path = self.config.path_to_filesystem(location)?;
+
+ maybe_spawn_blocking(move || {
+ let mut file = match File::create(&path) {
+ Ok(f) => f,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
+ let parent = path
+ .parent()
+ .context(UnableToCreateFileSnafu { path: &path, err })?;
+ std::fs::create_dir_all(&parent)
+ .context(UnableToCreateDirSnafu { path: parent })?;
+
+ match File::create(&path) {
+ Ok(f) => f,
+ Err(err) => {
+ return Err(Error::UnableToCreateFile { path, err }.into())
+ }
+ }
+ }
+ Err(err) => return Err(Error::UnableToCreateFile { path, err }.into()),
+ };
+
+ file.write_all(&bytes)
+ .context(UnableToCopyDataToFileSnafu)?;
+
+ Ok(())
+ })
+ .await
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let path = self.config.path_to_filesystem(location)?;
+ maybe_spawn_blocking(move || {
+ let file = open_file(&path)?;
+ Ok(GetResult::File(file, path))
+ })
+ .await
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let path = self.config.path_to_filesystem(location)?;
+ maybe_spawn_blocking(move || {
+ let mut file = open_file(&path)?;
+ let to_read = range.end - range.start;
+ file.seek(SeekFrom::Start(range.start as u64))
+ .context(SeekSnafu { path: &path })?;
+
+ let mut buf = Vec::with_capacity(to_read);
+ let read = file
+ .take(to_read as u64)
+ .read_to_end(&mut buf)
+ .context(UnableToReadBytesSnafu { path: &path })?;
+
+ ensure!(
+ read == to_read,
+ OutOfRangeSnafu {
+ path: &path,
+ expected: to_read,
+ actual: read
+ }
+ );
+
+ Ok(buf.into())
+ })
+ .await
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let path = self.config.path_to_filesystem(location)?;
+ let location = location.clone();
+
+ maybe_spawn_blocking(move || {
+ let file = open_file(&path)?;
+ let metadata =
+ file.metadata().map_err(|e| Error::UnableToAccessMetadata {
+ source: e.into(),
+ path: location.to_string(),
+ })?;
+
+ convert_metadata(metadata, location)
+ })
+ .await
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ let path = self.config.path_to_filesystem(location)?;
+ maybe_spawn_blocking(move || {
+ std::fs::remove_file(&path).context(UnableToDeleteFileSnafu { path })?;
+ Ok(())
+ })
+ .await
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let config = Arc::clone(&self.config);
+
+ let root_path = match prefix {
+ Some(prefix) => config.path_to_filesystem(prefix)?,
+ None => self.config.root.to_file_path().unwrap(),
+ };
+
+ let walkdir = WalkDir::new(&root_path)
+ // Don't include the root directory itself
+ .min_depth(1);
+
+ let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
+ match convert_walkdir_result(result_dir_entry) {
+ Err(e) => Some(Err(e)),
+ Ok(None) => None,
+ Ok(entry @ Some(_)) => entry
+ .filter(|dir_entry| dir_entry.file_type().is_file())
+ .map(|entry| {
+ let location = config.filesystem_to_path(entry.path())?;
+ convert_entry(entry, location)
+ }),
+ }
+ });
+
+ // If no tokio context, return iterator directly as no
+ // need to perform chunked spawn_blocking reads
+ if tokio::runtime::Handle::try_current().is_err() {
+ return Ok(futures::stream::iter(s).boxed());
+ }
+
+ // Otherwise list in batches of CHUNK_SIZE
+ const CHUNK_SIZE: usize = 1024;
+
+ let buffer = VecDeque::with_capacity(CHUNK_SIZE);
+ let stream =
+ futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
+ if buffer.is_empty() {
+ (s, buffer) = tokio::task::spawn_blocking(move || {
+ for _ in 0..CHUNK_SIZE {
+ match s.next() {
+ Some(r) => buffer.push_back(r),
+ None => break,
+ }
+ }
+ (s, buffer)
+ })
+ .await?;
+ }
+
+ match buffer.pop_front() {
+ Some(Err(e)) => Err(e),
+ Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
+ None => Ok(None),
+ }
+ });
+
+ Ok(stream.boxed())
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let config = Arc::clone(&self.config);
+
+ let prefix = prefix.cloned().unwrap_or_default();
+ let resolved_prefix = config.path_to_filesystem(&prefix)?;
+
+ maybe_spawn_blocking(move || {
+ let walkdir = WalkDir::new(&resolved_prefix).min_depth(1).max_depth(1);
+
+ let mut common_prefixes = BTreeSet::new();
+ let mut objects = Vec::new();
+
+ for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
+ if let Some(entry) = entry_res? {
+ let is_directory = entry.file_type().is_dir();
+ let entry_location = config.filesystem_to_path(entry.path())?;
+
+ let mut parts = match entry_location.prefix_match(&prefix) {
+ Some(parts) => parts,
+ None => continue,
+ };
+
+ let common_prefix = match parts.next() {
+ Some(p) => p,
+ None => continue,
+ };
+
+ drop(parts);
+
+ if is_directory {
+ common_prefixes.insert(prefix.child(common_prefix));
+ } else {
+ objects.push(convert_entry(entry, entry_location)?);
+ }
+ }
+ }
+
+ Ok(ListResult {
+ common_prefixes: common_prefixes.into_iter().collect(),
+ objects,
+ })
+ })
+ .await
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ let from = self.config.path_to_filesystem(from)?;
+ let to = self.config.path_to_filesystem(to)?;
+
+ maybe_spawn_blocking(move || {
+ std::fs::copy(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
+ Ok(())
+ })
+ .await
+ }
+
+ async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+ let from = self.config.path_to_filesystem(from)?;
+ let to = self.config.path_to_filesystem(to)?;
+ maybe_spawn_blocking(move || {
+ std::fs::rename(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
+ Ok(())
+ })
+ .await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ let from = self.config.path_to_filesystem(from)?;
+ let to = self.config.path_to_filesystem(to)?;
+
+ maybe_spawn_blocking(move || {
+ std::fs::hard_link(&from, &to).map_err(|err| match err.kind() {
+ io::ErrorKind::AlreadyExists => Error::AlreadyExists {
+ path: to.to_str().unwrap().to_string(),
+ source: err,
+ }
+ .into(),
+ _ => Error::UnableToCopyFile {
+ from,
+ to,
+ source: err,
+ }
+ .into(),
+ })
+ })
+ .await
+ }
+}
+
+fn open_file(path: &std::path::PathBuf) -> Result<File> {
+ let file = File::open(path).map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ Error::NotFound {
+ path: path.clone(),
+ source: e,
+ }
+ } else {
+ Error::UnableToOpenFile {
+ path: path.clone(),
+ source: e,
+ }
+ }
+ })?;
+ Ok(file)
+}
+
+fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
+ let metadata = entry
+ .metadata()
+ .map_err(|e| Error::UnableToAccessMetadata {
+ source: e.into(),
+ path: location.to_string(),
+ })?;
+ convert_metadata(metadata, location)
+}
+
+fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
+ let last_modified = metadata
+ .modified()
+ .expect("Modified file time should be supported on this platform")
+ .into();
+
+ let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
+ path: location.as_ref(),
+ })?;
+
+ Ok(ObjectMeta {
+ location,
+ last_modified,
+ size,
+ })
+}
+
+/// Convert walkdir results and converts not-found errors into `None`.
+fn convert_walkdir_result(
+ res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
+) -> Result<Option<walkdir::DirEntry>> {
+ match res {
+ Ok(entry) => Ok(Some(entry)),
+ Err(walkdir_err) => match walkdir_err.io_error() {
+ Some(io_err) => match io_err.kind() {
+ io::ErrorKind::NotFound => Ok(None),
+ _ => Err(Error::UnableToWalkDir {
+ source: walkdir_err,
+ }
+ .into()),
+ },
+ None => Err(Error::UnableToWalkDir {
+ source: walkdir_err,
+ }
+ .into()),
+ },
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test_util::flatten_list_stream;
+ use crate::{
+ tests::{
+ copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
+ list_with_delimiter, put_get_delete_list, rename_and_copy,
+ },
+ Error as ObjectStoreError, ObjectStore,
+ };
+ use tempfile::TempDir;
+
+ #[tokio::test]
+ async fn file_test() {
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ put_get_delete_list(&integration).await.unwrap();
+ list_uses_directories_correctly(&integration).await.unwrap();
+ list_with_delimiter(&integration).await.unwrap();
+ rename_and_copy(&integration).await.unwrap();
+ copy_if_not_exists(&integration).await.unwrap();
+ }
+
+ #[test]
+ fn test_non_tokio() {
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+ futures::executor::block_on(async move {
+ put_get_delete_list(&integration).await.unwrap();
+ list_uses_directories_correctly(&integration).await.unwrap();
+ list_with_delimiter(&integration).await.unwrap();
+ });
+ }
+
+ #[tokio::test]
+ async fn creates_dir_if_not_present() {
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ let location = Path::from("nested/file/test_file");
+
+ let data = Bytes::from("arbitrary data");
+ let expected_data = data.clone();
+
+ integration.put(&location, data).await.unwrap();
+
+ let read_data = integration
+ .get(&location)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ assert_eq!(&*read_data, expected_data);
+ }
+
+ #[tokio::test]
+ async fn unknown_length() {
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ let location = Path::from("some_file");
+
+ let data = Bytes::from("arbitrary data");
+ let expected_data = data.clone();
+
+ integration.put(&location, data).await.unwrap();
+
+ let read_data = integration
+ .get(&location)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ assert_eq!(&*read_data, expected_data);
+ }
+
+ #[tokio::test]
+ #[cfg(target_family = "unix")]
+ // Fails on github actions runner (which runs the tests as root)
+ #[ignore]
+ async fn bubble_up_io_errors() {
+ use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
+
+ let root = TempDir::new().unwrap();
+
+ // make non-readable
+ let metadata = root.path().metadata().unwrap();
+ let mut permissions = metadata.permissions();
+ permissions.set_mode(0o000);
+ set_permissions(root.path(), permissions).unwrap();
+
+ let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ // `list` must fail
+ match store.list(None).await {
+ Err(_) => {
+ // ok, error found
+ }
+ Ok(mut stream) => {
+ let mut any_err = false;
+ while let Some(res) = stream.next().await {
+ if res.is_err() {
+ any_err = true;
+ }
+ }
+ assert!(any_err);
+ }
+ }
+
+ // `list_with_delimiter
+ assert!(store.list_with_delimiter(None).await.is_err());
+ }
+
+ const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+ #[tokio::test]
+ async fn get_nonexistent_location() {
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ let location = Path::from(NON_EXISTENT_NAME);
+
+ let err = get_nonexistent_object(&integration, Some(location))
+ .await
+ .unwrap_err();
+ if let ObjectStoreError::NotFound { path, source } = err {
+ let source_variant = source.downcast_ref::<std::io::Error>();
+ assert!(
+ matches!(source_variant, Some(std::io::Error { .. }),),
+ "got: {:?}",
+ source_variant
+ );
+ assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
+ } else {
+ panic!("unexpected error type: {:?}", err);
+ }
+ }
+
+ #[tokio::test]
+ async fn root() {
+ let integration = LocalFileSystem::new();
+
+ let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
+ let url = Url::from_directory_path(&canonical).unwrap();
+ let path = Path::parse(url.path()).unwrap();
+
+ let roundtrip = integration.config.path_to_filesystem(&path).unwrap();
+
+ // Needed as on Windows canonicalize returns extended length path syntax
+ // C:\Users\circleci -> \\?\C:\Users\circleci
+ let roundtrip = roundtrip.canonicalize().unwrap();
+
+ assert_eq!(roundtrip, canonical);
+
+ integration.head(&path).await.unwrap();
+ }
+
+ #[tokio::test]
+ #[cfg(target_os = "linux")]
+ // macos has some magic in its root '/.VolumeIcon.icns"' which causes this test to fail
+ async fn test_list_root() {
+ let integration = LocalFileSystem::new();
+ let result = integration.list_with_delimiter(None).await;
+ if cfg!(target_family = "windows") {
+ let r = result.unwrap_err().to_string();
+ assert!(
+ r.contains("Unable to convert URL \"file:///\" to filesystem path"),
+ "{}",
+ r
+ );
+ } else {
+ result.unwrap();
+ }
+ }
+
+ #[tokio::test]
+ async fn invalid_path() {
+ let root = TempDir::new().unwrap();
+ let root = root.path().join("🙀");
+ std::fs::create_dir(root.clone()).unwrap();
+
+ // Invalid paths supported above root of store
+ let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
+
+ let directory = Path::from("directory");
+ let object = directory.child("child.txt");
+ let data = Bytes::from("arbitrary");
+ integration.put(&object, data.clone()).await.unwrap();
+ integration.head(&object).await.unwrap();
+ let result = integration.get(&object).await.unwrap();
+ assert_eq!(result.bytes().await.unwrap(), data);
+
+ flatten_list_stream(&integration, None).await.unwrap();
+ flatten_list_stream(&integration, Some(&directory))
+ .await
+ .unwrap();
+
+ let result = integration
+ .list_with_delimiter(Some(&directory))
+ .await
+ .unwrap();
+ assert_eq!(result.objects.len(), 1);
+ assert!(result.common_prefixes.is_empty());
+ assert_eq!(result.objects[0].location, object);
+
+ let illegal = root.join("💀");
+ std::fs::write(illegal, "foo").unwrap();
+
+ // Can list directory that doesn't contain illegal path
+ flatten_list_stream(&integration, Some(&directory))
+ .await
+ .unwrap();
+
+ // Cannot list illegal file
+ let err = flatten_list_stream(&integration, None)
+ .await
+ .unwrap_err()
+ .to_string();
+
+ assert!(
+ err.contains("Invalid path segment - got \"💀\" expected: \"%F0%9F%92%80\""),
+ "{}",
+ err
+ );
+ }
+}
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
new file mode 100644
index 0000000..ffd8e3a
--- /dev/null
+++ b/object_store/src/memory.rs
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An in-memory object store implementation
+use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::Utc;
+use futures::{stream::BoxStream, StreamExt};
+use parking_lot::RwLock;
+use snafu::{ensure, OptionExt, Snafu};
+use std::collections::BTreeMap;
+use std::collections::BTreeSet;
+use std::ops::Range;
+
+/// A specialized `Error` for in-memory object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+ #[snafu(display("No data in memory found. Location: {path}"))]
+ NoDataInMemory { path: String },
+
+ #[snafu(display("Out of range"))]
+ OutOfRange,
+
+ #[snafu(display("Bad range"))]
+ BadRange,
+
+ #[snafu(display("Object already exists at that location: {path}"))]
+ AlreadyExists { path: String },
+}
+
+impl From<Error> for super::Error {
+ fn from(source: Error) -> Self {
+ match source {
+ Error::NoDataInMemory { ref path } => Self::NotFound {
+ path: path.into(),
+ source: source.into(),
+ },
+ Error::AlreadyExists { ref path } => Self::AlreadyExists {
+ path: path.into(),
+ source: source.into(),
+ },
+ _ => Self::Generic {
+ store: "InMemory",
+ source: Box::new(source),
+ },
+ }
+ }
+}
+
+/// In-memory storage suitable for testing or for opting out of using a cloud
+/// storage provider.
+#[derive(Debug, Default)]
+pub struct InMemory {
+ storage: RwLock<BTreeMap<Path, Bytes>>,
+}
+
+impl std::fmt::Display for InMemory {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "InMemory")
+ }
+}
+
+#[async_trait]
+impl ObjectStore for InMemory {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ self.storage.write().insert(location.clone(), bytes);
+ Ok(())
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ let data = self.get_bytes(location).await?;
+
+ Ok(GetResult::Stream(
+ futures::stream::once(async move { Ok(data) }).boxed(),
+ ))
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let data = self.get_bytes(location).await?;
+ ensure!(range.end <= data.len(), OutOfRangeSnafu);
+ ensure!(range.start <= range.end, BadRangeSnafu);
+
+ Ok(data.slice(range))
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ let last_modified = Utc::now();
+ let bytes = self.get_bytes(location).await?;
+ Ok(ObjectMeta {
+ location: location.clone(),
+ last_modified,
+ size: bytes.len(),
+ })
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ self.storage.write().remove(location);
+ Ok(())
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let last_modified = Utc::now();
+
+ let storage = self.storage.read();
+ let values: Vec<_> = storage
+ .iter()
+ .filter(move |(key, _)| prefix.map(|p| key.prefix_matches(p)).unwrap_or(true))
+ .map(move |(key, value)| {
+ Ok(ObjectMeta {
+ location: key.clone(),
+ last_modified,
+ size: value.len(),
+ })
+ })
+ .collect();
+
+ Ok(futures::stream::iter(values).boxed())
+ }
+
+ /// The memory implementation returns all results, as opposed to the cloud
+ /// versions which limit their results to 1k or more because of API
+ /// limitations.
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ let root = Path::default();
+ let prefix = prefix.unwrap_or(&root);
+
+ let mut common_prefixes = BTreeSet::new();
+ let last_modified = Utc::now();
+
+ // Only objects in this base level should be returned in the
+ // response. Otherwise, we just collect the common prefixes.
+ let mut objects = vec![];
+ for (k, v) in self.storage.read().range((prefix)..) {
+ let mut parts = match k.prefix_match(prefix) {
+ Some(parts) => parts,
+ None => break,
+ };
+
+ // Pop first element
+ let common_prefix = match parts.next() {
+ Some(p) => p,
+ None => continue,
+ };
+
+ if parts.next().is_some() {
+ common_prefixes.insert(prefix.child(common_prefix));
+ } else {
+ let object = ObjectMeta {
+ location: k.clone(),
+ last_modified,
+ size: v.len(),
+ };
+ objects.push(object);
+ }
+ }
+
+ Ok(ListResult {
+ objects,
+ common_prefixes: common_prefixes.into_iter().collect(),
+ })
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ let data = self.get_bytes(from).await?;
+ self.storage.write().insert(to.clone(), data);
+ Ok(())
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ let data = self.get_bytes(from).await?;
+ let mut storage = self.storage.write();
+ if storage.contains_key(to) {
+ return Err(Error::AlreadyExists {
+ path: to.to_string(),
+ }
+ .into());
+ }
+ storage.insert(to.clone(), data);
+ Ok(())
+ }
+}
+
+impl InMemory {
+ /// Create new in-memory storage.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Creates a clone of the store
+ pub async fn clone(&self) -> Self {
+ let storage = self.storage.read();
+ let storage = storage.clone();
+
+ Self {
+ storage: RwLock::new(storage),
+ }
+ }
+
+ async fn get_bytes(&self, location: &Path) -> Result<Bytes> {
+ let storage = self.storage.read();
+ let bytes = storage
+ .get(location)
+ .cloned()
+ .context(NoDataInMemorySnafu {
+ path: location.to_string(),
+ })?;
+ Ok(bytes)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use crate::{
+ tests::{
+ copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
+ list_with_delimiter, put_get_delete_list, rename_and_copy,
+ },
+ Error as ObjectStoreError, ObjectStore,
+ };
+
+ #[tokio::test]
+ async fn in_memory_test() {
+ let integration = InMemory::new();
+
+ put_get_delete_list(&integration).await.unwrap();
+ list_uses_directories_correctly(&integration).await.unwrap();
+ list_with_delimiter(&integration).await.unwrap();
+ rename_and_copy(&integration).await.unwrap();
+ copy_if_not_exists(&integration).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn unknown_length() {
+ let integration = InMemory::new();
+
+ let location = Path::from("some_file");
+
+ let data = Bytes::from("arbitrary data");
+ let expected_data = data.clone();
+
+ integration.put(&location, data).await.unwrap();
+
+ let read_data = integration
+ .get(&location)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ assert_eq!(&*read_data, expected_data);
+ }
+
+ const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+ #[tokio::test]
+ async fn nonexistent_location() {
+ let integration = InMemory::new();
+
+ let location = Path::from(NON_EXISTENT_NAME);
+
+ let err = get_nonexistent_object(&integration, Some(location))
+ .await
+ .unwrap_err();
+ if let ObjectStoreError::NotFound { path, source } = err {
+ let source_variant = source.downcast_ref::<Error>();
+ assert!(
+ matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
+ "got: {:?}",
+ source_variant
+ );
+ assert_eq!(path, NON_EXISTENT_NAME);
+ } else {
+ panic!("unexpected error type: {:?}", err);
+ }
+ }
+}
diff --git a/object_store/src/oauth.rs b/object_store/src/oauth.rs
new file mode 100644
index 0000000..273e37b
--- /dev/null
+++ b/object_store/src/oauth.rs
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::token::TemporaryToken;
+use reqwest::{Client, Method};
+use ring::signature::RsaKeyPair;
+use snafu::{ResultExt, Snafu};
+use std::time::{Duration, Instant};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("No RSA key found in pem file"))]
+ MissingKey,
+
+ #[snafu(display("Invalid RSA key: {}", source), context(false))]
+ InvalidKey { source: ring::error::KeyRejected },
+
+ #[snafu(display("Error signing jwt: {}", source))]
+ Sign { source: ring::error::Unspecified },
+
+ #[snafu(display("Error encoding jwt payload: {}", source))]
+ Encode { source: serde_json::Error },
+
+ #[snafu(display("Unsupported key encoding: {}", encoding))]
+ UnsupportedKey { encoding: String },
+
+ #[snafu(display("Error performing token request: {}", source))]
+ TokenRequest { source: reqwest::Error },
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
+#[derive(Debug, Default, serde::Serialize)]
+pub struct JwtHeader {
+ /// The type of JWS: it can only be "JWT" here
+ ///
+ /// Defined in [RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9).
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub typ: Option<String>,
+ /// The algorithm used
+ ///
+ /// Defined in [RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1).
+ pub alg: String,
+ /// Content type
+ ///
+ /// Defined in [RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2).
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub cty: Option<String>,
+ /// JSON Key URL
+ ///
+ /// Defined in [RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2).
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub jku: Option<String>,
+ /// Key ID
+ ///
+ /// Defined in [RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4).
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub kid: Option<String>,
+ /// X.509 URL
+ ///
+ /// Defined in [RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5).
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub x5u: Option<String>,
+ /// X.509 certificate thumbprint
+ ///
+ /// Defined in [RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7).
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub x5t: Option<String>,
+}
+
+#[derive(serde::Serialize)]
+struct TokenClaims<'a> {
+ iss: &'a str,
+ scope: &'a str,
+ aud: &'a str,
+ exp: u64,
+ iat: u64,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct TokenResponse {
+ access_token: String,
+ expires_in: u64,
+}
+
+/// Encapsulates the logic to perform an OAuth token challenge
+#[derive(Debug)]
+pub struct OAuthProvider {
+ issuer: String,
+ scope: String,
+ audience: String,
+ key_pair: RsaKeyPair,
+ jwt_header: String,
+ random: ring::rand::SystemRandom,
+}
+
+impl OAuthProvider {
+ /// Create a new [`OAuthProvider`]
+ pub fn new(
+ issuer: String,
+ private_key_pem: String,
+ scope: String,
+ audience: String,
+ ) -> Result<Self> {
+ let key_pair = decode_first_rsa_key(private_key_pem)?;
+ let jwt_header = b64_encode_obj(&JwtHeader {
+ alg: "RS256".to_string(),
+ ..Default::default()
+ })?;
+
+ Ok(Self {
+ issuer,
+ key_pair,
+ scope,
+ audience,
+ jwt_header,
+ random: ring::rand::SystemRandom::new(),
+ })
+ }
+
+ /// Fetch a fresh token
+ pub async fn fetch_token(&self, client: &Client) -> Result<TemporaryToken<String>> {
+ let now = seconds_since_epoch();
+ let exp = now + 3600;
+
+ let claims = TokenClaims {
+ iss: &self.issuer,
+ scope: &self.scope,
+ aud: &self.audience,
+ exp,
+ iat: now,
+ };
+
+ let claim_str = b64_encode_obj(&claims)?;
+ let message = [self.jwt_header.as_ref(), claim_str.as_ref()].join(".");
+ let mut sig_bytes = vec![0; self.key_pair.public_modulus_len()];
+ self.key_pair
+ .sign(
+ &ring::signature::RSA_PKCS1_SHA256,
+ &self.random,
+ message.as_bytes(),
+ &mut sig_bytes,
+ )
+ .context(SignSnafu)?;
+
+ let signature = base64::encode_config(&sig_bytes, base64::URL_SAFE_NO_PAD);
+ let jwt = [message, signature].join(".");
+
+ let body = [
+ ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
+ ("assertion", &jwt),
+ ];
+
+ let response: TokenResponse = client
+ .request(Method::POST, &self.audience)
+ .form(&body)
+ .send()
+ .await
+ .context(TokenRequestSnafu)?
+ .error_for_status()
+ .context(TokenRequestSnafu)?
+ .json()
+ .await
+ .context(TokenRequestSnafu)?;
+
+ let token = TemporaryToken {
+ token: response.access_token,
+ expiry: Instant::now() + Duration::from_secs(response.expires_in),
+ };
+
+ Ok(token)
+ }
+}
+
+/// Returns the number of seconds since unix epoch
+fn seconds_since_epoch() -> u64 {
+ std::time::SystemTime::now()
+ .duration_since(std::time::SystemTime::UNIX_EPOCH)
+ .unwrap()
+ .as_secs()
+}
+
+fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
+ use rustls_pemfile::Item;
+ use std::io::{BufReader, Cursor};
+
+ let mut cursor = Cursor::new(private_key_pem);
+ let mut reader = BufReader::new(&mut cursor);
+
+ // Reading from string is infallible
+ match rustls_pemfile::read_one(&mut reader).unwrap() {
+ Some(Item::PKCS8Key(key)) => Ok(RsaKeyPair::from_pkcs8(&key)?),
+ Some(Item::RSAKey(key)) => Ok(RsaKeyPair::from_der(&key)?),
+ _ => Err(Error::MissingKey),
+ }
+}
+
+fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
+ let string = serde_json::to_string(obj).context(EncodeSnafu)?;
+ Ok(base64::encode_config(string, base64::URL_SAFE_NO_PAD))
+}
diff --git a/object_store/src/path/mod.rs b/object_store/src/path/mod.rs
new file mode 100644
index 0000000..23488ef
--- /dev/null
+++ b/object_store/src/path/mod.rs
@@ -0,0 +1,531 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Path abstraction for Object Storage
+
+use itertools::Itertools;
+use percent_encoding::percent_decode;
+use snafu::{ensure, ResultExt, Snafu};
+use std::fmt::Formatter;
+use url::Url;
+
+/// The delimiter to separate object namespaces, creating a directory structure.
+pub const DELIMITER: &str = "/";
+
+/// The path delimiter as a single byte
+pub const DELIMITER_BYTE: u8 = DELIMITER.as_bytes()[0];
+
+mod parts;
+
+pub use parts::{InvalidPart, PathPart};
+
+/// Error returned by [`Path::parse`]
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub enum Error {
+ #[snafu(display("Path \"{}\" contained empty path segment", path))]
+ EmptySegment { path: String },
+
+ #[snafu(display("Error parsing Path \"{}\": {}", path, source))]
+ BadSegment { path: String, source: InvalidPart },
+
+ #[snafu(display("Failed to canonicalize path \"{}\": {}", path.display(), source))]
+ Canonicalize {
+ path: std::path::PathBuf,
+ source: std::io::Error,
+ },
+
+ #[snafu(display("Unable to convert path \"{}\" to URL", path.display()))]
+ InvalidPath { path: std::path::PathBuf },
+
+ #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
+ NonUnicode {
+ path: String,
+ source: std::str::Utf8Error,
+ },
+
+ #[snafu(display("Path {} does not start with prefix {}", path, prefix))]
+ PrefixMismatch { path: String, prefix: String },
+}
+
+/// A parsed path representation that can be safely written to object storage
+///
+/// # Path Safety
+///
+/// In theory object stores support any UTF-8 character sequence, however, certain character
+/// sequences cause compatibility problems with some applications and protocols. As such the
+/// naming guidelines for [S3], [GCS] and [Azure Blob Storage] all recommend sticking to a
+/// limited character subset.
+///
+/// [S3]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
+/// [GCS]: https://cloud.google.com/storage/docs/naming-objects
+/// [Azure Blob Storage]: https://docs.microsoft.com/en-us/rest/api/storageservices/Naming-and-Referencing-Containers--Blobs--and-Metadata#blob-names
+///
+/// This presents libraries with two options for consistent path handling:
+///
+/// 1. Allow constructing unsafe paths, allowing for both reading and writing of data to paths
+/// that may not be consistently understood or supported
+/// 2. Disallow constructing unsafe paths, ensuring data written can be consistently handled by
+/// all other systems, but preventing interaction with objects at unsafe paths
+///
+/// This library takes the second approach, in particular:
+///
+/// * Paths are delimited by `/`
+/// * Paths do not start with a `/`
+/// * Empty path segments are discarded (e.g. `//` is treated as though it were `/`)
+/// * Relative path segments, i.e. `.` and `..` are percent encoded
+/// * Unsafe characters are percent encoded, as described by [RFC 1738]
+/// * All paths are relative to the root of the object store
+///
+/// In order to provide these guarantees there are two ways to safely construct a [`Path`]
+///
+/// # Encode
+///
+/// A string containing potentially illegal path segments can be encoded to a [`Path`]
+/// using [`Path::from`] or [`Path::from_iter`].
+///
+/// ```
+/// # use object_store::path::Path;
+/// assert_eq!(Path::from("foo/bar").as_ref(), "foo/bar");
+/// assert_eq!(Path::from("foo//bar").as_ref(), "foo/bar");
+/// assert_eq!(Path::from("foo/../bar").as_ref(), "foo/%2E%2E/bar");
+/// assert_eq!(Path::from_iter(["foo", "foo/bar"]).as_ref(), "foo/foo%2Fbar");
+/// ```
+///
+/// Note: if provided with an already percent encoded string, this will encode it again
+///
+/// ```
+/// # use object_store::path::Path;
+/// assert_eq!(Path::from("foo/foo%2Fbar").as_ref(), "foo/foo%252Fbar");
+/// ```
+///
+/// # Parse
+///
+/// Alternatively a [`Path`] can be created from an existing string, returning an
+/// error if it is invalid. Unlike the encoding methods, this will permit
+/// valid percent encoded sequences.
+///
+/// ```
+/// # use object_store::path::Path;
+///
+/// assert_eq!(Path::parse("/foo/foo%2Fbar").unwrap().as_ref(), "foo/foo%2Fbar");
+/// Path::parse("..").unwrap_err();
+/// Path::parse("/foo//").unwrap_err();
+/// Path::parse("😀").unwrap_err();
+/// Path::parse("%Q").unwrap_err();
+/// ```
+///
+/// [RFC 1738]: https://www.ietf.org/rfc/rfc1738.txt
+#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Ord, PartialOrd)]
+pub struct Path {
+ /// The raw path with no leading or trailing delimiters
+ raw: String,
+}
+
+impl Path {
+ /// Parse a string as a [`Path`], returning a [`Error`] if invalid,
+ /// as defined on the docstring for [`Path`]
+ ///
+ /// Note: this will strip any leading `/` or trailing `/`
+ pub fn parse(path: impl AsRef<str>) -> Result<Self, Error> {
+ let path = path.as_ref();
+
+ let stripped = path.strip_prefix(DELIMITER).unwrap_or(path);
+ if stripped.is_empty() {
+ return Ok(Default::default());
+ }
+
+ let stripped = stripped.strip_suffix(DELIMITER).unwrap_or(stripped);
+
+ for segment in stripped.split(DELIMITER) {
+ ensure!(!segment.is_empty(), EmptySegmentSnafu { path });
+ PathPart::parse(segment).context(BadSegmentSnafu { path })?;
+ }
+
+ Ok(Self {
+ raw: stripped.to_string(),
+ })
+ }
+
+ /// Convert a filesystem path to a [`Path`] relative to the filesystem root
+ ///
+ /// This will return an error if the path does not exist, or contains illegal
+ /// character sequences as defined by [`Path::parse`]
+ pub fn from_filesystem_path(
+ path: impl AsRef<std::path::Path>,
+ ) -> Result<Self, Error> {
+ Self::from_filesystem_path_with_base(path, None)
+ }
+
+ /// Convert a filesystem path to a [`Path`] relative to the provided base
+ ///
+ /// This will return an error if the path does not exist on the local filesystem,
+ /// contains illegal character sequences as defined by [`Path::parse`], or `base`
+ /// does not refer to a parent path of `path`
+ pub(crate) fn from_filesystem_path_with_base(
+ path: impl AsRef<std::path::Path>,
+ base: Option<&Url>,
+ ) -> Result<Self, Error> {
+ let url = filesystem_path_to_url(path)?;
+ let path = match base {
+ Some(prefix) => url.path().strip_prefix(prefix.path()).ok_or_else(|| {
+ Error::PrefixMismatch {
+ path: url.path().to_string(),
+ prefix: prefix.to_string(),
+ }
+ })?,
+ None => url.path(),
+ };
+
+ // Reverse any percent encoding performed by conversion to URL
+ let decoded = percent_decode(path.as_bytes())
+ .decode_utf8()
+ .context(NonUnicodeSnafu { path })?;
+
+ Self::parse(decoded)
+ }
+
+ /// Returns the [`PathPart`] of this [`Path`]
+ pub fn parts(&self) -> impl Iterator<Item = PathPart<'_>> {
+ match self.raw.is_empty() {
+ true => itertools::Either::Left(std::iter::empty()),
+ false => itertools::Either::Right(
+ self.raw
+ .split(DELIMITER)
+ .map(|s| PathPart { raw: s.into() }),
+ ),
+ }
+ }
+
+ /// Returns an iterator of the [`PathPart`] of this [`Path`] after `prefix`
+ ///
+ /// Returns `None` if the prefix does not match
+ pub fn prefix_match(
+ &self,
+ prefix: &Self,
+ ) -> Option<impl Iterator<Item = PathPart<'_>> + '_> {
+ let diff = itertools::diff_with(self.parts(), prefix.parts(), |a, b| a == b);
+
+ match diff {
+ // Both were equal
+ None => Some(itertools::Either::Left(std::iter::empty())),
+ // Mismatch or prefix was longer => None
+ Some(
+ itertools::Diff::FirstMismatch(_, _, _) | itertools::Diff::Longer(_, _),
+ ) => None,
+ // Match with remaining
+ Some(itertools::Diff::Shorter(_, back)) => {
+ Some(itertools::Either::Right(back))
+ }
+ }
+ }
+
+ /// Returns true if this [`Path`] starts with `prefix`
+ pub fn prefix_matches(&self, prefix: &Self) -> bool {
+ self.prefix_match(prefix).is_some()
+ }
+
+ /// Creates a new child of this [`Path`]
+ pub fn child<'a>(&self, child: impl Into<PathPart<'a>>) -> Self {
+ let raw = match self.raw.is_empty() {
+ true => format!("{}", child.into().raw),
+ false => format!("{}{}{}", self.raw, DELIMITER, child.into().raw),
+ };
+
+ Self { raw }
+ }
+}
+
+impl AsRef<str> for Path {
+ fn as_ref(&self) -> &str {
+ &self.raw
+ }
+}
+
+impl From<&str> for Path {
+ fn from(path: &str) -> Self {
+ Self::from_iter(path.split(DELIMITER))
+ }
+}
+
+impl From<String> for Path {
+ fn from(path: String) -> Self {
+ Self::from_iter(path.split(DELIMITER))
+ }
+}
+
+impl From<Path> for String {
+ fn from(path: Path) -> Self {
+ path.raw
+ }
+}
+
+impl std::fmt::Display for Path {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ self.raw.fmt(f)
+ }
+}
+
+impl<'a, I> FromIterator<I> for Path
+where
+ I: Into<PathPart<'a>>,
+{
+ fn from_iter<T: IntoIterator<Item = I>>(iter: T) -> Self {
+ let raw = T::into_iter(iter)
+ .map(|s| s.into())
+ .filter(|s| !s.raw.is_empty())
+ .map(|s| s.raw)
+ .join(DELIMITER);
+
+ Self { raw }
+ }
+}
+
+/// Given a filesystem path, convert it to its canonical URL representation,
+/// returning an error if the file doesn't exist on the local filesystem
+pub(crate) fn filesystem_path_to_url(
+ path: impl AsRef<std::path::Path>,
+) -> Result<Url, Error> {
+ let path = path.as_ref().canonicalize().context(CanonicalizeSnafu {
+ path: path.as_ref(),
+ })?;
+
+ match path.is_dir() {
+ true => Url::from_directory_path(&path),
+ false => Url::from_file_path(&path),
+ }
+ .map_err(|_| Error::InvalidPath { path })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn cloud_prefix_with_trailing_delimiter() {
+ // Use case: files exist in object storage named `foo/bar.json` and
+ // `foo_test.json`. A search for the prefix `foo/` should return
+ // `foo/bar.json` but not `foo_test.json'.
+ let prefix = Path::from_iter(["test"]);
+ assert_eq!(prefix.as_ref(), "test");
+ }
+
+ #[test]
+ fn push_encodes() {
+ let location = Path::from_iter(["foo/bar", "baz%2Ftest"]);
+ assert_eq!(location.as_ref(), "foo%2Fbar/baz%252Ftest");
+ }
+
+ #[test]
+ fn test_parse() {
+ assert_eq!(Path::parse("/").unwrap().as_ref(), "");
+ assert_eq!(Path::parse("").unwrap().as_ref(), "");
+
+ let err = Path::parse("//").unwrap_err();
+ assert!(matches!(err, Error::EmptySegment { .. }));
+
+ assert_eq!(Path::parse("/foo/bar/").unwrap().as_ref(), "foo/bar");
+ assert_eq!(Path::parse("foo/bar/").unwrap().as_ref(), "foo/bar");
+ assert_eq!(Path::parse("foo/bar").unwrap().as_ref(), "foo/bar");
+
+ let err = Path::parse("foo///bar").unwrap_err();
+ assert!(matches!(err, Error::EmptySegment { .. }));
+ }
+
+ #[test]
+ fn convert_raw_before_partial_eq() {
+ // dir and file_name
+ let cloud = Path::from("test_dir/test_file.json");
+ let built = Path::from_iter(["test_dir", "test_file.json"]);
+
+ assert_eq!(built, cloud);
+
+ // dir and file_name w/o dot
+ let cloud = Path::from("test_dir/test_file");
+ let built = Path::from_iter(["test_dir", "test_file"]);
+
+ assert_eq!(built, cloud);
+
+ // dir, no file
+ let cloud = Path::from("test_dir/");
+ let built = Path::from_iter(["test_dir"]);
+ assert_eq!(built, cloud);
+
+ // file_name, no dir
+ let cloud = Path::from("test_file.json");
+ let built = Path::from_iter(["test_file.json"]);
+ assert_eq!(built, cloud);
+
+ // empty
+ let cloud = Path::from("");
+ let built = Path::from_iter(["", ""]);
+
+ assert_eq!(built, cloud);
+ }
+
+ #[test]
+ fn parts_after_prefix_behavior() {
+ let existing_path = Path::from("apple/bear/cow/dog/egg.json");
+
+ // Prefix with one directory
+ let prefix = Path::from("apple");
+ let expected_parts: Vec<PathPart<'_>> = vec!["bear", "cow", "dog", "egg.json"]
+ .into_iter()
+ .map(Into::into)
+ .collect();
+ let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect();
+ assert_eq!(parts, expected_parts);
+
+ // Prefix with two directories
+ let prefix = Path::from("apple/bear");
+ let expected_parts: Vec<PathPart<'_>> = vec!["cow", "dog", "egg.json"]
+ .into_iter()
+ .map(Into::into)
+ .collect();
+ let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect();
+ assert_eq!(parts, expected_parts);
+
+ // Not a prefix
+ let prefix = Path::from("cow");
+ assert!(existing_path.prefix_match(&prefix).is_none());
+
+ // Prefix with a partial directory
+ let prefix = Path::from("ap");
+ assert!(existing_path.prefix_match(&prefix).is_none());
+
+ // Prefix matches but there aren't any parts after it
+ let existing_path = Path::from("apple/bear/cow/dog");
+
+ let prefix = existing_path.clone();
+ assert_eq!(existing_path.prefix_match(&prefix).unwrap().count(), 0);
+ }
+
+ #[test]
+ fn prefix_matches() {
+ let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something"]);
+ let needle = haystack.clone();
+ // self starts with self
+ assert!(
+ haystack.prefix_matches(&haystack),
+ "{:?} should have started with {:?}",
+ haystack,
+ haystack
+ );
+
+ // a longer prefix doesn't match
+ let needle = needle.child("longer now");
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} shouldn't have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // one dir prefix matches
+ let needle = Path::from_iter(["foo/bar"]);
+ assert!(
+ haystack.prefix_matches(&needle),
+ "{:?} should have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // two dir prefix matches
+ let needle = needle.child("baz%2Ftest");
+ assert!(
+ haystack.prefix_matches(&needle),
+ "{:?} should have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // partial dir prefix doesn't match
+ let needle = Path::from_iter(["f"]);
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} should not have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // one dir and one partial dir doesn't match
+ let needle = Path::from_iter(["foo/bar", "baz"]);
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} should not have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // empty prefix matches
+ let needle = Path::from("");
+ assert!(
+ haystack.prefix_matches(&needle),
+ "{:?} should have started with {:?}",
+ haystack,
+ needle
+ );
+ }
+
+ #[test]
+ fn prefix_matches_with_file_name() {
+ let haystack =
+ Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo.segment"]);
+
+ // All directories match and file name is a prefix
+ let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo"]);
+
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} should not have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // All directories match but file name is not a prefix
+ let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "e"]);
+
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} should not have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // Not all directories match; file name is a prefix of the next directory; this
+ // does not match
+ let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "s"]);
+
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} should not have started with {:?}",
+ haystack,
+ needle
+ );
+
+ // Not all directories match; file name is NOT a prefix of the next directory;
+ // no match
+ let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "p"]);
+
+ assert!(
+ !haystack.prefix_matches(&needle),
+ "{:?} should not have started with {:?}",
+ haystack,
+ needle
+ );
+ }
+}
diff --git a/object_store/src/path/parts.rs b/object_store/src/path/parts.rs
new file mode 100644
index 0000000..e73b184
--- /dev/null
+++ b/object_store/src/path/parts.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use percent_encoding::{percent_decode, percent_encode, AsciiSet, CONTROLS};
+use std::borrow::Cow;
+
+use crate::path::DELIMITER_BYTE;
+use snafu::Snafu;
+
+/// Error returned by [`PathPart::parse`]
+#[derive(Debug, Snafu)]
+#[snafu(display("Invalid path segment - got \"{}\" expected: \"{}\"", actual, expected))]
+#[allow(missing_copy_implementations)]
+pub struct InvalidPart {
+ actual: String,
+ expected: String,
+}
+
+/// The PathPart type exists to validate the directory/file names that form part
+/// of a path.
+///
+/// A PathPart instance is guaranteed to to contain no illegal characters (e.g. `/`)
+/// as it can only be constructed by going through the `from` impl.
+#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)]
+pub struct PathPart<'a> {
+ pub(super) raw: Cow<'a, str>,
+}
+
+impl<'a> PathPart<'a> {
+ /// Parse the provided path segment as a [`PathPart`] returning an error if invalid
+ pub fn parse(segment: &'a str) -> Result<Self, InvalidPart> {
+ let decoded: Cow<'a, [u8]> = percent_decode(segment.as_bytes()).into();
+ let part = PathPart::from(decoded.as_ref());
+ if segment != part.as_ref() {
+ return Err(InvalidPart {
+ actual: segment.to_string(),
+ expected: part.raw.to_string(),
+ });
+ }
+
+ Ok(Self {
+ raw: segment.into(),
+ })
+ }
+}
+
+/// Characters we want to encode.
+const INVALID: &AsciiSet = &CONTROLS
+ // The delimiter we are reserving for internal hierarchy
+ .add(DELIMITER_BYTE)
+ // Characters AWS recommends avoiding for object keys
+ // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
+ .add(b'\\')
+ .add(b'{')
+ .add(b'^')
+ .add(b'}')
+ .add(b'%')
+ .add(b'`')
+ .add(b']')
+ .add(b'"') // " <-- my editor is confused about double quotes within single quotes
+ .add(b'>')
+ .add(b'[')
+ .add(b'~')
+ .add(b'<')
+ .add(b'#')
+ .add(b'|')
+ // Characters Google Cloud Storage recommends avoiding for object names
+ // https://cloud.google.com/storage/docs/naming-objects
+ .add(b'\r')
+ .add(b'\n')
+ .add(b'*')
+ .add(b'?');
+
+impl<'a> From<&'a [u8]> for PathPart<'a> {
+ fn from(v: &'a [u8]) -> Self {
+ let inner = match v {
+ // We don't want to encode `.` generally, but we do want to disallow parts of paths
+ // to be equal to `.` or `..` to prevent file system traversal shenanigans.
+ b"." => "%2E".into(),
+ b".." => "%2E%2E".into(),
+ other => percent_encode(other, INVALID).into(),
+ };
+ Self { raw: inner }
+ }
+}
+
+impl<'a> From<&'a str> for PathPart<'a> {
+ fn from(v: &'a str) -> Self {
+ Self::from(v.as_bytes())
+ }
+}
+
+impl From<String> for PathPart<'static> {
+ fn from(s: String) -> Self {
+ Self {
+ raw: Cow::Owned(PathPart::from(s.as_str()).raw.into_owned()),
+ }
+ }
+}
+
+impl<'a> AsRef<str> for PathPart<'a> {
+ fn as_ref(&self) -> &str {
+ self.raw.as_ref()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn path_part_delimiter_gets_encoded() {
+ let part: PathPart<'_> = "foo/bar".into();
+ assert_eq!(part.raw, "foo%2Fbar");
+ }
+
+ #[test]
+ fn path_part_given_already_encoded_string() {
+ let part: PathPart<'_> = "foo%2Fbar".into();
+ assert_eq!(part.raw, "foo%252Fbar");
+ }
+
+ #[test]
+ fn path_part_cant_be_one_dot() {
+ let part: PathPart<'_> = ".".into();
+ assert_eq!(part.raw, "%2E");
+ }
+
+ #[test]
+ fn path_part_cant_be_two_dots() {
+ let part: PathPart<'_> = "..".into();
+ assert_eq!(part.raw, "%2E%2E");
+ }
+}
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
new file mode 100644
index 0000000..7a54a06
--- /dev/null
+++ b/object_store/src/throttle.rs
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A throttling object store wrapper
+use parking_lot::Mutex;
+use std::ops::Range;
+use std::{convert::TryInto, sync::Arc};
+
+use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::{stream::BoxStream, StreamExt};
+use std::time::Duration;
+
+/// Configuration settings for throttled store
+#[derive(Debug, Default, Clone, Copy)]
+pub struct ThrottleConfig {
+ /// Sleep duration for every call to [`delete`](ThrottledStore::delete).
+ ///
+ /// Sleeping is done before the underlying store is called and independently of the success of
+ /// the operation.
+ pub wait_delete_per_call: Duration,
+
+ /// Sleep duration for every byte received during [`get`](ThrottledStore::get).
+ ///
+ /// Sleeping is performed after the underlying store returned and only for successful gets. The
+ /// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
+ ///
+ /// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
+ /// there be an intermediate failure (i.e. after partly consuming the output bytes), the
+ /// resulting sleep time will be partial as well.
+ pub wait_get_per_byte: Duration,
+
+ /// Sleep duration for every call to [`get`](ThrottledStore::get).
+ ///
+ /// Sleeping is done before the underlying store is called and independently of the success of
+ /// the operation. The sleep duration is additive to
+ /// [`wait_get_per_byte`](Self::wait_get_per_byte).
+ pub wait_get_per_call: Duration,
+
+ /// Sleep duration for every call to [`list`](ThrottledStore::list).
+ ///
+ /// Sleeping is done before the underlying store is called and independently of the success of
+ /// the operation. The sleep duration is additive to
+ /// [`wait_list_per_entry`](Self::wait_list_per_entry).
+ pub wait_list_per_call: Duration,
+
+ /// Sleep duration for every entry received during [`list`](ThrottledStore::list).
+ ///
+ /// Sleeping is performed after the underlying store returned and only for successful lists.
+ /// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
+ ///
+ /// Note that the per-entry sleep only happens as the user consumes the output entries. Should
+ /// there be an intermediate failure (i.e. after partly consuming the output entries), the
+ /// resulting sleep time will be partial as well.
+ pub wait_list_per_entry: Duration,
+
+ /// Sleep duration for every call to
+ /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
+ ///
+ /// Sleeping is done before the underlying store is called and independently of the success of
+ /// the operation. The sleep duration is additive to
+ /// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
+ pub wait_list_with_delimiter_per_call: Duration,
+
+ /// Sleep duration for every entry received during
+ /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
+ ///
+ /// Sleeping is performed after the underlying store returned and only for successful gets. The
+ /// sleep duration is additive to
+ /// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
+ pub wait_list_with_delimiter_per_entry: Duration,
+
+ /// Sleep duration for every call to [`put`](ThrottledStore::put).
+ ///
+ /// Sleeping is done before the underlying store is called and independently of the success of
+ /// the operation.
+ pub wait_put_per_call: Duration,
+}
+
+/// Sleep only if non-zero duration
+async fn sleep(duration: Duration) {
+ if !duration.is_zero() {
+ tokio::time::sleep(duration).await
+ }
+}
+
+/// Store wrapper that wraps an inner store with some `sleep` calls.
+///
+/// This can be used for performance testing.
+///
+/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
+/// conditions!**
+#[derive(Debug)]
+pub struct ThrottledStore<T: ObjectStore> {
+ inner: T,
+ config: Arc<Mutex<ThrottleConfig>>,
+}
+
+impl<T: ObjectStore> ThrottledStore<T> {
+ /// Create new wrapper with zero waiting times.
+ pub fn new(inner: T, config: ThrottleConfig) -> Self {
+ Self {
+ inner,
+ config: Arc::new(Mutex::new(config)),
+ }
+ }
+
+ /// Mutate config.
+ pub fn config_mut<F>(&self, f: F)
+ where
+ F: Fn(&mut ThrottleConfig),
+ {
+ let mut guard = self.config.lock();
+ f(&mut guard)
+ }
+
+ /// Return copy of current config.
+ pub fn config(&self) -> ThrottleConfig {
+ *self.config.lock()
+ }
+}
+
+impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "ThrottledStore({})", self.inner)
+ }
+}
+
+#[async_trait]
+impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ sleep(self.config().wait_put_per_call).await;
+
+ self.inner.put(location, bytes).await
+ }
+
+ async fn get(&self, location: &Path) -> Result<GetResult> {
+ sleep(self.config().wait_get_per_call).await;
+
+ // need to copy to avoid moving / referencing `self`
+ let wait_get_per_byte = self.config().wait_get_per_byte;
+
+ self.inner.get(location).await.map(|result| {
+ let s = match result {
+ GetResult::Stream(s) => s,
+ GetResult::File(_, _) => unimplemented!(),
+ };
+
+ GetResult::Stream(
+ s.then(move |bytes_result| async move {
+ match bytes_result {
+ Ok(bytes) => {
+ let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
+ sleep(wait_get_per_byte * bytes_len).await;
+ Ok(bytes)
+ }
+ Err(err) => Err(err),
+ }
+ })
+ .boxed(),
+ )
+ })
+ }
+
+ async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+ let config = self.config();
+
+ let sleep_duration = config.wait_delete_per_call
+ + config.wait_get_per_byte * (range.end - range.start) as u32;
+
+ sleep(sleep_duration).await;
+
+ self.inner.get_range(location, range).await
+ }
+
+ async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+ sleep(self.config().wait_put_per_call).await;
+ self.inner.head(location).await
+ }
+
+ async fn delete(&self, location: &Path) -> Result<()> {
+ sleep(self.config().wait_delete_per_call).await;
+
+ self.inner.delete(location).await
+ }
+
+ async fn list(
+ &self,
+ prefix: Option<&Path>,
+ ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ sleep(self.config().wait_list_per_call).await;
+
+ // need to copy to avoid moving / referencing `self`
+ let wait_list_per_entry = self.config().wait_list_per_entry;
+
+ self.inner.list(prefix).await.map(|stream| {
+ stream
+ .then(move |result| async move {
+ match result {
+ Ok(entry) => {
+ sleep(wait_list_per_entry).await;
+ Ok(entry)
+ }
+ Err(err) => Err(err),
+ }
+ })
+ .boxed()
+ })
+ }
+
+ async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+ sleep(self.config().wait_list_with_delimiter_per_call).await;
+
+ match self.inner.list_with_delimiter(prefix).await {
+ Ok(list_result) => {
+ let entries_len = usize_to_u32_saturate(list_result.objects.len());
+ sleep(self.config().wait_list_with_delimiter_per_entry * entries_len)
+ .await;
+ Ok(list_result)
+ }
+ Err(err) => Err(err),
+ }
+ }
+
+ async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+ sleep(self.config().wait_put_per_call).await;
+
+ self.inner.copy(from, to).await
+ }
+
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ sleep(self.config().wait_put_per_call).await;
+
+ self.inner.copy_if_not_exists(from, to).await
+ }
+}
+
+/// Saturated `usize` to `u32` cast.
+fn usize_to_u32_saturate(x: usize) -> u32 {
+ x.try_into().unwrap_or(u32::MAX)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{
+ memory::InMemory,
+ tests::{
+ copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
+ put_get_delete_list, rename_and_copy,
+ },
+ };
+ use bytes::Bytes;
+ use futures::TryStreamExt;
+ use tokio::time::Duration;
+ use tokio::time::Instant;
+
+ const WAIT_TIME: Duration = Duration::from_millis(100);
+ const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant
+
+ macro_rules! assert_bounds {
+ ($d:expr, $lower:expr) => {
+ assert_bounds!($d, $lower, $lower + 1);
+ };
+ ($d:expr, $lower:expr, $upper:expr) => {
+ let d = $d;
+ let lower = $lower * WAIT_TIME;
+ let upper = $upper * WAIT_TIME;
+ assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
+ assert!(d < upper, "{:?} must be < than {:?}", d, upper);
+ };
+ }
+
+ #[tokio::test]
+ async fn throttle_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ put_get_delete_list(&store).await.unwrap();
+ list_uses_directories_correctly(&store).await.unwrap();
+ list_with_delimiter(&store).await.unwrap();
+ rename_and_copy(&store).await.unwrap();
+ copy_if_not_exists(&store).await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn delete_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ assert_bounds!(measure_delete(&store, None).await, 0);
+ assert_bounds!(measure_delete(&store, Some(0)).await, 0);
+ assert_bounds!(measure_delete(&store, Some(10)).await, 0);
+
+ store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
+ assert_bounds!(measure_delete(&store, None).await, 1);
+ assert_bounds!(measure_delete(&store, Some(0)).await, 1);
+ assert_bounds!(measure_delete(&store, Some(10)).await, 1);
+ }
+
+ #[tokio::test]
+ // macos github runner is so slow it can't complete within WAIT_TIME*2
+ #[cfg(target_os = "linux")]
+ async fn get_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ assert_bounds!(measure_get(&store, None).await, 0);
+ assert_bounds!(measure_get(&store, Some(0)).await, 0);
+ assert_bounds!(measure_get(&store, Some(10)).await, 0);
+
+ store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
+ assert_bounds!(measure_get(&store, None).await, 1);
+ assert_bounds!(measure_get(&store, Some(0)).await, 1);
+ assert_bounds!(measure_get(&store, Some(10)).await, 1);
+
+ store.config_mut(|cfg| {
+ cfg.wait_get_per_call = ZERO;
+ cfg.wait_get_per_byte = WAIT_TIME;
+ });
+ assert_bounds!(measure_get(&store, Some(2)).await, 2);
+
+ store.config_mut(|cfg| {
+ cfg.wait_get_per_call = WAIT_TIME;
+ cfg.wait_get_per_byte = WAIT_TIME;
+ });
+ assert_bounds!(measure_get(&store, Some(2)).await, 3);
+ }
+
+ #[tokio::test]
+ // macos github runner is so slow it can't complete within WAIT_TIME*2
+ #[cfg(target_os = "linux")]
+ async fn list_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ assert_bounds!(measure_list(&store, 0).await, 0);
+ assert_bounds!(measure_list(&store, 10).await, 0);
+
+ store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
+ assert_bounds!(measure_list(&store, 0).await, 1);
+ assert_bounds!(measure_list(&store, 10).await, 1);
+
+ store.config_mut(|cfg| {
+ cfg.wait_list_per_call = ZERO;
+ cfg.wait_list_per_entry = WAIT_TIME;
+ });
+ assert_bounds!(measure_list(&store, 2).await, 2);
+
+ store.config_mut(|cfg| {
+ cfg.wait_list_per_call = WAIT_TIME;
+ cfg.wait_list_per_entry = WAIT_TIME;
+ });
+ assert_bounds!(measure_list(&store, 2).await, 3);
+ }
+
+ #[tokio::test]
+ // macos github runner is so slow it can't complete within WAIT_TIME*2
+ #[cfg(target_os = "linux")]
+ async fn list_with_delimiter_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
+ assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
+
+ store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
+ assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
+ assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
+
+ store.config_mut(|cfg| {
+ cfg.wait_list_with_delimiter_per_call = ZERO;
+ cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
+ });
+ assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
+
+ store.config_mut(|cfg| {
+ cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
+ cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
+ });
+ assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
+ }
+
+ #[tokio::test]
+ async fn put_test() {
+ let inner = InMemory::new();
+ let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+ assert_bounds!(measure_put(&store, 0).await, 0);
+ assert_bounds!(measure_put(&store, 10).await, 0);
+
+ store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
+ assert_bounds!(measure_put(&store, 0).await, 1);
+ assert_bounds!(measure_put(&store, 10).await, 1);
+
+ store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
+ assert_bounds!(measure_put(&store, 0).await, 0);
+ }
+
+ async fn place_test_object(
+ store: &ThrottledStore<InMemory>,
+ n_bytes: Option<usize>,
+ ) -> Path {
+ let path = Path::from("foo");
+
+ if let Some(n_bytes) = n_bytes {
+ let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
+ let bytes = Bytes::from(data);
+ store.put(&path, bytes).await.unwrap();
+ } else {
+ // ensure object is absent
+ store.delete(&path).await.unwrap();
+ }
+
+ path
+ }
+
+ async fn place_test_objects(
+ store: &ThrottledStore<InMemory>,
+ n_entries: usize,
+ ) -> Path {
+ let prefix = Path::from("foo");
+
+ // clean up store
+ let entries: Vec<_> = store
+ .list(Some(&prefix))
+ .await
+ .unwrap()
+ .try_collect()
+ .await
+ .unwrap();
+
+ for entry in entries {
+ store.delete(&entry.location).await.unwrap();
+ }
+
+ // create new entries
+ for i in 0..n_entries {
+ let path = prefix.child(i.to_string().as_str());
+
+ let data = Bytes::from("bar");
+ store.put(&path, data).await.unwrap();
+ }
+
+ prefix
+ }
+
+ async fn measure_delete(
+ store: &ThrottledStore<InMemory>,
+ n_bytes: Option<usize>,
+ ) -> Duration {
+ let path = place_test_object(store, n_bytes).await;
+
+ let t0 = Instant::now();
+ store.delete(&path).await.unwrap();
+
+ t0.elapsed()
+ }
+
+ async fn measure_get(
+ store: &ThrottledStore<InMemory>,
+ n_bytes: Option<usize>,
+ ) -> Duration {
+ let path = place_test_object(store, n_bytes).await;
+
+ let t0 = Instant::now();
+ let res = store.get(&path).await;
+ if n_bytes.is_some() {
+ // need to consume bytes to provoke sleep times
+ let s = match res.unwrap() {
+ GetResult::Stream(s) => s,
+ GetResult::File(_, _) => unimplemented!(),
+ };
+
+ s.map_ok(|b| bytes::BytesMut::from(&b[..]))
+ .try_concat()
+ .await
+ .unwrap();
+ } else {
+ assert!(res.is_err());
+ }
+
+ t0.elapsed()
+ }
+
+ async fn measure_list(
+ store: &ThrottledStore<InMemory>,
+ n_entries: usize,
+ ) -> Duration {
+ let prefix = place_test_objects(store, n_entries).await;
+
+ let t0 = Instant::now();
+ store
+ .list(Some(&prefix))
+ .await
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ t0.elapsed()
+ }
+
+ async fn measure_list_with_delimiter(
+ store: &ThrottledStore<InMemory>,
+ n_entries: usize,
+ ) -> Duration {
+ let prefix = place_test_objects(store, n_entries).await;
+
+ let t0 = Instant::now();
+ store.list_with_delimiter(Some(&prefix)).await.unwrap();
+
+ t0.elapsed()
+ }
+
+ async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
+ let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
+ let bytes = Bytes::from(data);
+
+ let t0 = Instant::now();
+ store.put(&Path::from("foo"), bytes).await.unwrap();
+
+ t0.elapsed()
+ }
+}
diff --git a/object_store/src/token.rs b/object_store/src/token.rs
new file mode 100644
index 0000000..a56a294
--- /dev/null
+++ b/object_store/src/token.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::future::Future;
+use std::time::Instant;
+use tokio::sync::Mutex;
+
+/// A temporary authentication token with an associated expiry
+#[derive(Debug, Clone)]
+pub struct TemporaryToken<T> {
+ /// The temporary credential
+ pub token: T,
+ /// The instant at which this credential is no longer valid
+ pub expiry: Instant,
+}
+
+/// Provides [`TokenCache::get_or_insert_with`] which can be used to cache a
+/// [`TemporaryToken`] based on its expiry
+#[derive(Debug, Default)]
+pub struct TokenCache<T> {
+ cache: Mutex<Option<TemporaryToken<T>>>,
+}
+
+impl<T: Clone + Send> TokenCache<T> {
+ pub async fn get_or_insert_with<F, Fut, E>(&self, f: F) -> Result<T, E>
+ where
+ F: FnOnce() -> Fut + Send,
+ Fut: Future<Output = Result<TemporaryToken<T>, E>> + Send,
+ {
+ let now = Instant::now();
+ let mut locked = self.cache.lock().await;
+
+ if let Some(cached) = locked.as_ref() {
+ let delta = cached
+ .expiry
+ .checked_duration_since(now)
+ .unwrap_or_default();
+
+ if delta.as_secs() > 300 {
+ return Ok(cached.token.clone());
+ }
+ }
+
+ let cached = f().await?;
+ let token = cached.token.clone();
+ *locked = Some(cached);
+
+ Ok(token)
+ }
+}
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
new file mode 100644
index 0000000..4f3ed86
--- /dev/null
+++ b/object_store/src/util.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common logic for interacting with remote object stores
+use super::Result;
+use bytes::Bytes;
+use futures::{stream::StreamExt, Stream};
+
+/// Returns the prefix to be passed to an object store
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
+ prefix
+ .filter(|x| !x.as_ref().is_empty())
+ .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
+}
+
+/// Returns a formatted HTTP range header as per
+/// <https://httpwg.org/specs/rfc7233.html#header.range>
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub fn format_http_range(range: std::ops::Range<usize>) -> String {
+ format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
+}
+
+/// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk
+pub async fn collect_bytes<S>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes>
+where
+ S: Stream<Item = Result<Bytes>> + Send + Unpin,
+{
+ let first = stream.next().await.transpose()?.unwrap_or_default();
+
+ // Avoid copying if single response
+ match stream.next().await.transpose()? {
+ None => Ok(first),
+ Some(second) => {
+ let size_hint = size_hint.unwrap_or_else(|| first.len() + second.len());
+
+ let mut buf = Vec::with_capacity(size_hint);
+ buf.extend_from_slice(&first);
+ buf.extend_from_slice(&second);
+ while let Some(maybe_bytes) = stream.next().await {
+ buf.extend_from_slice(&maybe_bytes?);
+ }
+
+ Ok(buf.into())
+ }
+ }
+}
+
+/// Takes a function and spawns it to a tokio blocking pool if available
+pub async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
+where
+ F: FnOnce() -> Result<T> + Send + 'static,
+ T: Send + 'static,
+{
+ match tokio::runtime::Handle::try_current() {
+ Ok(runtime) => runtime.spawn_blocking(f).await?,
+ Err(_) => f(),
+ }
+}