Donate `object_store` code from object_store_rs to arrow-rs (#2081)

* Import https://github.com/influxdata/object_store_rs/commit/3c51870ac41a90942c2e45bb499a893d514ed1da

* Add object_store to workspace, update notes and readme

* Remove old github items

* Remove old gitignore

* Remove kodiak config

* Remove redundant license files

* Remove influx specific security policy

* Remove redudant rust-toolchain and rustfmt

* Add Apache License (RAT)

* ignore bubble_up_io_errors test

* Fix list_store with explicit lifetime, only run `test_list_root` on linux

* Only run object_store throttle tests on a mac
diff --git a/Cargo.toml b/Cargo.toml
index 2837f02..9bf55c0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,6 +23,7 @@
         "parquet_derive_test",
         "arrow-flight",
         "integration-testing",
+        "object_store",
 ]
 # Enable the version 2 feature resolver, which avoids unifying features for targets that are not being built
 #
diff --git a/object_store/.circleci/config.yml b/object_store/.circleci/config.yml
new file mode 100644
index 0000000..b4dff6d
--- /dev/null
+++ b/object_store/.circleci/config.yml
@@ -0,0 +1,262 @@
+---
+# CI Overview
+# -----------
+#
+# Each night:
+#
+#   A build image is created (ci_image) from `docker/Dockerfile.ci` and is
+#   pushed to `quay.io/influxdb/rust:ci`. This build image is then used to run
+#   the CI tasks for the day.
+#
+# Every commit:
+#
+# The CI for every PR and merge to main runs tests, fmt, lints and compiles debug binaries
+#
+# On main if all these checks pass it will then additionally compile in "release" mode and
+# publish a docker image to quay.io/influxdb/iox:$COMMIT_SHA
+#
+# Manual CI Image:
+#
+# It is possible to manually trigger a rebuild of the image used in CI. To do this, navigate to
+# https://app.circleci.com/pipelines/github/influxdata/influxdb_iox?branch=main (overriding the
+# branch name if desired). Then:
+# - Click "Run Pipeline" in the top-right
+# - Expand "Add Parameters"
+# - Add a "boolean" parameter called "ci_image" with the value true
+# - Click "Run Pipeline"
+#
+# If you refresh the page you should see a newly running ci_image workflow
+#
+
+version: 2.1
+
+orbs:
+  win: circleci/windows@4.1
+
+commands:
+  rust_components:
+    description: Verify installed components
+    steps:
+      - run:
+          name: Verify installed components
+          command: |
+            rustup --version
+            rustup show
+            cargo fmt --version
+            cargo clippy --version
+
+  cache_restore:
+    description: Restore Cargo Cache
+    steps:
+      - restore_cache:
+          name: Restoring Cargo Cache
+          keys:
+            - cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.lock" }}
+            - cargo-cache-{{ arch }}-{{ .Branch }}
+            - cargo-cache
+  cache_save:
+    description: Save Cargo Cache
+    steps:
+      - save_cache:
+          name: Save Cargo Cache
+          paths:
+            - /usr/local/cargo/registry
+          key: cargo-cache-{{ arch }}-{{ .Branch }}-{{ checksum "Cargo.lock" }}
+
+jobs:
+  fmt:
+    docker:
+      - image: quay.io/influxdb/rust:ci
+    environment:
+      # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+      CARGO_INCREMENTAL: "0"
+      # Disable full debug symbol generation to speed up CI build
+      # "1" means line tables only, which is useful for panic tracebacks.
+      RUSTFLAGS: "-C debuginfo=1"
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+    steps:
+      - checkout
+      - rust_components
+      - cache_restore
+      - run:
+          name: Rust fmt
+          command: cargo fmt --all -- --check
+      - cache_save
+  lint:
+    docker:
+      - image: quay.io/influxdb/rust:ci
+    environment:
+      # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+      CARGO_INCREMENTAL: "0"
+      # Disable full debug symbol generation to speed up CI build
+      # "1" means line tables only, which is useful for panic tracebacks.
+      RUSTFLAGS: "-C debuginfo=1"
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+    steps:
+      - checkout
+      - rust_components
+      - cache_restore
+      - run:
+          name: Clippy
+          command: cargo clippy --all-targets --all-features --workspace -- -D warnings
+      - cache_save
+  cargo_audit:
+    docker:
+      - image: quay.io/influxdb/rust:ci
+    environment:
+      # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+      CARGO_INCREMENTAL: "0"
+      # Disable full debug symbol generation to speed up CI build
+      # "1" means line tables only, which is useful for panic tracebacks.
+      RUSTFLAGS: "-C debuginfo=1"
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+    steps:
+      - checkout
+      - rust_components
+      - cache_restore
+      - run:
+          name: Install cargo-deny
+          command: cargo install --force cargo-deny
+      - run:
+          name: cargo-deny Checks
+          command: cargo deny check -s
+      - cache_save
+  check:
+    docker:
+      - image: quay.io/influxdb/rust:ci
+    environment:
+      # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+      CARGO_INCREMENTAL: "0"
+      # Disable full debug symbol generation to speed up CI build
+      # "1" means line tables only, which is useful for panic tracebacks.
+      RUSTFLAGS: "-C debuginfo=1"
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+    steps:
+      - checkout
+      - rust_components
+      - cache_restore
+      - run:
+          name: Install cargo-hack
+          command: cargo install cargo-hack
+      - run:
+          name: Check all features
+          command: cargo hack check --feature-powerset --no-dev-deps --workspace
+      - cache_save
+  doc:
+    docker:
+      - image: quay.io/influxdb/rust:ci
+    environment:
+      # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+      CARGO_INCREMENTAL: "0"
+      # Disable full debug symbol generation to speed up CI build
+      # "1" means line tables only, which is useful for panic tracebacks.
+      RUSTFLAGS: "-C debuginfo=1"
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+    steps:
+      - checkout
+      - rust_components
+      - cache_restore
+      - run:
+          name: Cargo doc
+          # excluding datafusion because it's effectively a dependency masqueraded as workspace crate.
+          command: cargo doc --document-private-items --no-deps --workspace --exclude datafusion
+      - cache_save
+      - run:
+          name: Compress Docs
+          command: tar -cvzf rustdoc.tar.gz target/doc/
+      - store_artifacts:
+          path: rustdoc.tar.gz
+  test:
+    # setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
+    docker:
+      - image: quay.io/influxdb/rust:ci
+      - image: localstack/localstack:0.14.4
+      - image: mcr.microsoft.com/azure-storage/azurite
+      - image: fsouza/fake-gcs-server
+        command:
+          - "-scheme"
+          - "http"
+    resource_class: 2xlarge # use of a smaller executor tends crashes on link
+    environment:
+      # Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
+      CARGO_INCREMENTAL: "0"
+      # Disable full debug symbol generation to speed up CI build
+      # "1" means line tables only, which is useful for panic tracebacks.
+      RUSTFLAGS: "-C debuginfo=1"
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+      RUST_BACKTRACE: "1"
+      # Run integration tests
+      TEST_INTEGRATION: 1
+      AWS_DEFAULT_REGION: "us-east-1"
+      AWS_ACCESS_KEY_ID: test
+      AWS_SECRET_ACCESS_KEY: test
+      AWS_ENDPOINT: http://127.0.0.1:4566
+      AZURE_USE_EMULATOR: "1"
+      GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
+      OBJECT_STORE_BUCKET: test-bucket
+    steps:
+      - run:
+          name: Setup localstack (AWS emulation)
+          command: |
+            cd /tmp
+            curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
+            unzip awscliv2.zip
+            sudo ./aws/install
+            aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
+      - run:
+          name: Setup Azurite (Azure emulation)
+          # the magical connection string is from https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings
+          command: |
+            curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
+            az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
+      - run:
+          name: Setup fake GCS server
+          command: |
+            curl -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
+            echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT"
+      - checkout
+      - rust_components
+      - cache_restore
+      - run:
+          name: Cargo test
+          command: cargo test --workspace --features=aws,azure,azure_test,gcp
+      - cache_save
+
+  test_windows:
+    executor:
+      name: win/default
+      size: medium
+    environment:
+      # https://github.com/rust-lang/cargo/issues/10280
+      CARGO_NET_GIT_FETCH_WITH_CLI: "true"
+    steps:
+      - checkout
+      - run:
+          name: Download rustup
+          command: wget https://win.rustup.rs/x86_64 -O rustup-init.exe
+      - run:
+          name: Install rustup
+          command: .\rustup-init.exe -y --default-host=x86_64-pc-windows-msvc
+      - run:
+          name: Cargo test
+          command: cargo test --workspace
+
+workflows:
+  version: 2
+
+  # CI for all pull requests.
+  ci:
+    jobs:
+      - check
+      - fmt
+      - lint
+      - cargo_audit
+      - test
+      - test_windows
+      - doc
diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
new file mode 100644
index 0000000..2e216dd
--- /dev/null
+++ b/object_store/CONTRIBUTING.md
@@ -0,0 +1,94 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Development instructions
+
+## Running Tests
+
+Tests can be run using `cargo`
+
+```shell
+cargo test
+```
+
+## Running Integration Tests
+
+By default, integration tests are not run. To run them you will need to set `TEST_INTEGRATION=1` and then provide the
+necessary configuration for that object store
+
+### AWS
+
+To test the S3 integration against [localstack](https://localstack.cloud/)
+
+First start up a container running localstack
+
+```
+$ podman run --rm -it -p 4566:4566 -p 4510-4559:4510-4559 localstack/localstack
+```
+
+Setup environment
+
+```
+export TEST_INTEGRATION=1
+export AWS_DEFAULT_REGION=us-east-1
+export AWS_ACCESS_KEY_ID=test
+export AWS_SECRET_ACCESS_KEY=test
+export AWS_ENDPOINT=http://127.0.0.1:4566
+export OBJECT_STORE_BUCKET=test-bucket
+```
+
+Create a bucket using the AWS CLI
+
+```
+podman run --net=host --env-host amazon/aws-cli --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
+```
+
+Run tests
+
+```
+$ cargo test --features aws
+```
+
+### Azure
+
+To test the Azure integration
+against [azurite](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio)
+
+Startup azurite
+
+```
+$ podman run -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite
+```
+
+Create a bucket
+
+```
+$ podman run --net=host mcr.microsoft.com/azure-cli az storage container create -n test-bucket --connection-string 'DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;'
+```
+
+Run tests
+
+```
+$ cargo test --features azure
+```
+
+### GCP
+
+We don't have a good story yet for testing the GCP integration locally. You will need to create a GCS bucket, a
+service account that has access to it, and use this to run the tests.
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
new file mode 100644
index 0000000..613b6ab
--- /dev/null
+++ b/object_store/Cargo.toml
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "object_store"
+version = "0.3.0"
+edition = "2021"
+license = "MIT/Apache-2.0"
+readme = "README.md"
+description = "A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage and Azure Blob Storage"
+keywords = [
+    "object",
+    "storage",
+    "cloud",
+]
+repository = "https://github.com/apache/arrow-rs"
+
+[package.metadata.docs.rs]
+all-features = true
+
+[dependencies] # In alphabetical order
+async-trait = "0.1.53"
+# Microsoft Azure Blob storage integration
+azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] }
+azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] }
+bytes = "1.0"
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+# Google Cloud Storage integration
+futures = "0.3"
+serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
+serde_json = { version = "1.0", default-features = false, optional = true }
+rustls-pemfile = { version = "1.0", default-features = false, optional = true }
+ring = { version = "0.16", default-features = false, features = ["std"] }
+base64 = { version = "0.13", default-features = false, optional = true }
+# for rusoto
+hyper = { version = "0.14", optional = true, default-features = false }
+# for rusoto
+hyper-rustls = { version = "0.23.0", optional = true, default-features = false, features = ["webpki-tokio", "http1", "http2", "tls12"] }
+itertools = "0.10.1"
+percent-encoding = "2.1"
+# rusoto crates are for Amazon S3 integration
+rusoto_core = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
+rusoto_credential = { version = "0.48.0", optional = true, default-features = false }
+rusoto_s3 = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"] }
+rusoto_sts = { version = "0.48.0", optional = true, default-features = false, features = ["rustls"]  }
+snafu = "0.7"
+tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time"] }
+tracing = { version = "0.1" }
+reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls"] }
+parking_lot = { version = "0.12" }
+# Filesystem integration
+url = "2.2"
+walkdir = "2"
+
+[features]
+azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"]
+azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"]
+gcp = ["serde", "serde_json", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64"]
+aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"]
+
+[dev-dependencies] # In alphabetical order
+dotenv = "0.15.0"
+tempfile = "3.1.0"
+futures-test = "0.3"
diff --git a/object_store/README.md b/object_store/README.md
new file mode 100644
index 0000000..313588b
--- /dev/null
+++ b/object_store/README.md
@@ -0,0 +1,26 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Rust Object Store
+
+A crate providing a generic interface to object stores, such as S3, Azure Blob Storage and Google Cloud Storage.
+
+Originally developed for [InfluxDB IOx](https://github.com/influxdata/influxdb_iox/) and later split out and donated to Apache Arrow.
+
+See [docs.rs](https://docs.rs/object_store) for usage instructions
diff --git a/object_store/deny.toml b/object_store/deny.toml
new file mode 100644
index 0000000..bfd060a
--- /dev/null
+++ b/object_store/deny.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Configuration documentation:
+#   https://embarkstudios.github.io/cargo-deny/index.html
+
+[advisories]
+vulnerability = "deny"
+yanked = "deny"
+unmaintained = "warn"
+notice = "warn"
+ignore = [
+]
+git-fetch-with-cli = true
+
+[licenses]
+default = "allow"
+unlicensed = "allow"
+copyleft = "allow"
+
+[bans]
+multiple-versions = "warn"
+deny = [
+    # We are using rustls as the TLS implementation, so we shouldn't be linking
+    # in OpenSSL too.
+    #
+    # If you're hitting this, you might want to take a look at what new
+    # dependencies you have introduced and check if there's a way to depend on
+    # rustls instead of OpenSSL (tip: check the crate's feature flags).
+    { name = "openssl-sys" }
+]
diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs
new file mode 100644
index 0000000..7ebcc2a
--- /dev/null
+++ b/object_store/src/aws.rs
@@ -0,0 +1,1041 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for S3
+use crate::util::format_http_range;
+use crate::{
+    collect_bytes,
+    path::{Path, DELIMITER},
+    util::format_prefix,
+    GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{
+    stream::{self, BoxStream},
+    Future, Stream, StreamExt, TryStreamExt,
+};
+use hyper::client::Builder as HyperBuilder;
+use rusoto_core::ByteStream;
+use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
+use rusoto_s3::S3;
+use rusoto_sts::WebIdentityProvider;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::ops::Range;
+use std::{
+    convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration,
+};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+use tracing::{debug, warn};
+
+/// The maximum number of times a request will be retried in the case of an AWS server error
+pub const MAX_NUM_RETRIES: u32 = 3;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+    #[snafu(display(
+        "Expected streamed data to have length {}, got {}",
+        expected,
+        actual
+    ))]
+    DataDoesNotMatchLength { expected: usize, actual: usize },
+
+    #[snafu(display(
+        "Did not receive any data. Bucket: {}, Location: {}",
+        bucket,
+        path
+    ))]
+    NoData { bucket: String, path: String },
+
+    #[snafu(display(
+        "Unable to DELETE data. Bucket: {}, Location: {}, Error: {} ({:?})",
+        bucket,
+        path,
+        source,
+        source,
+    ))]
+    UnableToDeleteData {
+        source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
+        bucket: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to GET data. Bucket: {}, Location: {}, Error: {} ({:?})",
+        bucket,
+        path,
+        source,
+        source,
+    ))]
+    UnableToGetData {
+        source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
+        bucket: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to HEAD data. Bucket: {}, Location: {}, Error: {} ({:?})",
+        bucket,
+        path,
+        source,
+        source,
+    ))]
+    UnableToHeadData {
+        source: rusoto_core::RusotoError<rusoto_s3::HeadObjectError>,
+        bucket: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to GET part of the data. Bucket: {}, Location: {}, Error: {} ({:?})",
+        bucket,
+        path,
+        source,
+        source,
+    ))]
+    UnableToGetPieceOfData {
+        source: std::io::Error,
+        bucket: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
+        bucket,
+        path,
+        source,
+        source,
+    ))]
+    UnableToPutData {
+        source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
+        bucket: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to list data. Bucket: {}, Error: {} ({:?})",
+        bucket,
+        source,
+        source,
+    ))]
+    UnableToListData {
+        source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>,
+        bucket: String,
+    },
+
+    #[snafu(display(
+        "Unable to copy object. Bucket: {}, From: {}, To: {}, Error: {}",
+        bucket,
+        from,
+        to,
+        source,
+    ))]
+    UnableToCopyObject {
+        source: rusoto_core::RusotoError<rusoto_s3::CopyObjectError>,
+        bucket: String,
+        from: String,
+        to: String,
+    },
+
+    #[snafu(display(
+        "Unable to parse last modified date. Bucket: {}, Error: {} ({:?})",
+        bucket,
+        source,
+        source,
+    ))]
+    UnableToParseLastModified {
+        source: chrono::ParseError,
+        bucket: String,
+    },
+
+    #[snafu(display(
+        "Unable to buffer data into temporary file, Error: {} ({:?})",
+        source,
+        source,
+    ))]
+    UnableToBufferStream { source: std::io::Error },
+
+    #[snafu(display(
+        "Could not parse `{}` as an AWS region. Regions should look like `us-east-2`. {} ({:?})",
+        region,
+        source,
+        source,
+    ))]
+    InvalidRegion {
+        region: String,
+        source: rusoto_core::region::ParseRegionError,
+    },
+
+    #[snafu(display("Missing aws-access-key"))]
+    MissingAccessKey,
+
+    #[snafu(display("Missing aws-secret-access-key"))]
+    MissingSecretAccessKey,
+
+    NotFound {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+}
+
+impl From<Error> for super::Error {
+    fn from(source: Error) -> Self {
+        match source {
+            Error::NotFound { path, source } => Self::NotFound { path, source },
+            _ => Self::Generic {
+                store: "S3",
+                source: Box::new(source),
+            },
+        }
+    }
+}
+
+/// Configuration for connecting to [Amazon S3](https://aws.amazon.com/s3/).
+pub struct AmazonS3 {
+    /// S3 client w/o any connection limit.
+    ///
+    /// You should normally use [`Self::client`] instead.
+    client_unrestricted: rusoto_s3::S3Client,
+
+    /// Semaphore that limits the usage of [`client_unrestricted`](Self::client_unrestricted).
+    connection_semaphore: Arc<Semaphore>,
+
+    /// Bucket name used by this object store client.
+    bucket_name: String,
+}
+
+impl fmt::Debug for AmazonS3 {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("AmazonS3")
+            .field("client", &"rusoto_s3::S3Client")
+            .field("bucket_name", &self.bucket_name)
+            .finish()
+    }
+}
+
+impl fmt::Display for AmazonS3 {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "AmazonS3({})", self.bucket_name)
+    }
+}
+
+#[async_trait]
+impl ObjectStore for AmazonS3 {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let bucket_name = self.bucket_name.clone();
+        let request_factory = move || {
+            let bytes = bytes.clone();
+
+            let length = bytes.len();
+            let stream_data = Ok(bytes);
+            let stream = futures::stream::once(async move { stream_data });
+            let byte_stream = ByteStream::new_with_size(stream, length);
+
+            rusoto_s3::PutObjectRequest {
+                bucket: bucket_name.clone(),
+                key: location.to_string(),
+                body: Some(byte_stream),
+                ..Default::default()
+            }
+        };
+
+        let s3 = self.client().await;
+
+        s3_request(move || {
+            let (s3, request_factory) = (s3.clone(), request_factory.clone());
+
+            async move { s3.put_object(request_factory()).await }
+        })
+        .await
+        .context(UnableToPutDataSnafu {
+            bucket: &self.bucket_name,
+            path: location.as_ref(),
+        })?;
+
+        Ok(())
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        Ok(GetResult::Stream(
+            self.get_object(location, None).await?.boxed(),
+        ))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let size_hint = range.end - range.start;
+        let stream = self.get_object(location, Some(range)).await?;
+        collect_bytes(stream, Some(size_hint)).await
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let key = location.to_string();
+        let head_request = rusoto_s3::HeadObjectRequest {
+            bucket: self.bucket_name.clone(),
+            key: key.clone(),
+            ..Default::default()
+        };
+        let s = self
+            .client()
+            .await
+            .head_object(head_request)
+            .await
+            .map_err(|e| match e {
+                rusoto_core::RusotoError::Service(
+                    rusoto_s3::HeadObjectError::NoSuchKey(_),
+                ) => Error::NotFound {
+                    path: key.clone(),
+                    source: e.into(),
+                },
+                rusoto_core::RusotoError::Unknown(h) if h.status.as_u16() == 404 => {
+                    Error::NotFound {
+                        path: key.clone(),
+                        source: "resource not found".into(),
+                    }
+                }
+                _ => Error::UnableToHeadData {
+                    bucket: self.bucket_name.to_owned(),
+                    path: key.clone(),
+                    source: e,
+                },
+            })?;
+
+        // Note: GetObject and HeadObject return a different date format from ListObjects
+        //
+        // S3 List returns timestamps in the form
+        //     <LastModified>2013-09-17T18:07:53.000Z</LastModified>
+        // S3 GetObject returns timestamps in the form
+        //            Last-Modified: Sun, 1 Jan 2006 12:00:00 GMT
+        let last_modified = match s.last_modified {
+            Some(lm) => DateTime::parse_from_rfc2822(&lm)
+                .context(UnableToParseLastModifiedSnafu {
+                    bucket: &self.bucket_name,
+                })?
+                .with_timezone(&Utc),
+            None => Utc::now(),
+        };
+
+        Ok(ObjectMeta {
+            last_modified,
+            location: location.clone(),
+            size: usize::try_from(s.content_length.unwrap_or(0))
+                .expect("unsupported size on this platform"),
+        })
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        let bucket_name = self.bucket_name.clone();
+
+        let request_factory = move || rusoto_s3::DeleteObjectRequest {
+            bucket: bucket_name.clone(),
+            key: location.to_string(),
+            ..Default::default()
+        };
+
+        let s3 = self.client().await;
+
+        s3_request(move || {
+            let (s3, request_factory) = (s3.clone(), request_factory.clone());
+
+            async move { s3.delete_object(request_factory()).await }
+        })
+        .await
+        .context(UnableToDeleteDataSnafu {
+            bucket: &self.bucket_name,
+            path: location.as_ref(),
+        })?;
+
+        Ok(())
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        Ok(self
+            .list_objects_v2(prefix, None)
+            .await?
+            .map_ok(move |list_objects_v2_result| {
+                let contents = list_objects_v2_result.contents.unwrap_or_default();
+                let iter = contents
+                    .into_iter()
+                    .map(|object| convert_object_meta(object, &self.bucket_name));
+
+                futures::stream::iter(iter)
+            })
+            .try_flatten()
+            .boxed())
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        Ok(self
+            .list_objects_v2(prefix, Some(DELIMITER.to_string()))
+            .await?
+            .try_fold(
+                ListResult {
+                    common_prefixes: vec![],
+                    objects: vec![],
+                },
+                |acc, list_objects_v2_result| async move {
+                    let mut res = acc;
+                    let contents = list_objects_v2_result.contents.unwrap_or_default();
+                    let mut objects = contents
+                        .into_iter()
+                        .map(|object| convert_object_meta(object, &self.bucket_name))
+                        .collect::<Result<Vec<_>>>()?;
+
+                    res.objects.append(&mut objects);
+
+                    let prefixes =
+                        list_objects_v2_result.common_prefixes.unwrap_or_default();
+                    res.common_prefixes.reserve(prefixes.len());
+
+                    for p in prefixes {
+                        let prefix =
+                            p.prefix.expect("can't have a prefix without a value");
+                        res.common_prefixes.push(Path::parse(prefix)?);
+                    }
+
+                    Ok(res)
+                },
+            )
+            .await?)
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let from = from.as_ref();
+        let to = to.as_ref();
+        let bucket_name = self.bucket_name.clone();
+
+        let request_factory = move || rusoto_s3::CopyObjectRequest {
+            bucket: bucket_name.clone(),
+            copy_source: format!("{}/{}", &bucket_name, from),
+            key: to.to_string(),
+            ..Default::default()
+        };
+
+        let s3 = self.client().await;
+
+        s3_request(move || {
+            let (s3, request_factory) = (s3.clone(), request_factory.clone());
+
+            async move { s3.copy_object(request_factory()).await }
+        })
+        .await
+        .context(UnableToCopyObjectSnafu {
+            bucket: &self.bucket_name,
+            from,
+            to,
+        })?;
+
+        Ok(())
+    }
+
+    async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) -> Result<()> {
+        // Will need dynamodb_lock
+        Err(crate::Error::NotImplemented)
+    }
+}
+
+fn convert_object_meta(object: rusoto_s3::Object, bucket: &str) -> Result<ObjectMeta> {
+    let key = object.key.expect("object doesn't exist without a key");
+    let location = Path::parse(key)?;
+    let last_modified = match object.last_modified {
+        Some(lm) => DateTime::parse_from_rfc3339(&lm)
+            .context(UnableToParseLastModifiedSnafu { bucket })?
+            .with_timezone(&Utc),
+        None => Utc::now(),
+    };
+    let size = usize::try_from(object.size.unwrap_or(0))
+        .expect("unsupported size on this platform");
+
+    Ok(ObjectMeta {
+        location,
+        last_modified,
+        size,
+    })
+}
+
+/// Configure a connection to Amazon S3 using the specified credentials in
+/// the specified Amazon region and bucket.
+#[allow(clippy::too_many_arguments)]
+pub fn new_s3(
+    access_key_id: Option<impl Into<String>>,
+    secret_access_key: Option<impl Into<String>>,
+    region: impl Into<String>,
+    bucket_name: impl Into<String>,
+    endpoint: Option<impl Into<String>>,
+    session_token: Option<impl Into<String>>,
+    max_connections: NonZeroUsize,
+    allow_http: bool,
+) -> Result<AmazonS3> {
+    let region = region.into();
+    let region: rusoto_core::Region = match endpoint {
+        None => region.parse().context(InvalidRegionSnafu { region })?,
+        Some(endpoint) => rusoto_core::Region::Custom {
+            name: region,
+            endpoint: endpoint.into(),
+        },
+    };
+
+    let mut builder = HyperBuilder::default();
+    builder.pool_max_idle_per_host(max_connections.get());
+
+    let connector = if allow_http {
+        hyper_rustls::HttpsConnectorBuilder::new()
+            .with_webpki_roots()
+            .https_or_http()
+            .enable_http1()
+            .enable_http2()
+            .build()
+    } else {
+        hyper_rustls::HttpsConnectorBuilder::new()
+            .with_webpki_roots()
+            .https_only()
+            .enable_http1()
+            .enable_http2()
+            .build()
+    };
+
+    let http_client = rusoto_core::request::HttpClient::from_builder(builder, connector);
+
+    let client = match (access_key_id, secret_access_key, session_token) {
+        (Some(access_key_id), Some(secret_access_key), Some(session_token)) => {
+            let credentials_provider = StaticProvider::new(
+                access_key_id.into(),
+                secret_access_key.into(),
+                Some(session_token.into()),
+                None,
+            );
+            rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
+        }
+        (Some(access_key_id), Some(secret_access_key), None) => {
+            let credentials_provider = StaticProvider::new_minimal(
+                access_key_id.into(),
+                secret_access_key.into(),
+            );
+            rusoto_s3::S3Client::new_with(http_client, credentials_provider, region)
+        }
+        (None, Some(_), _) => return Err(Error::MissingAccessKey.into()),
+        (Some(_), None, _) => return Err(Error::MissingSecretAccessKey.into()),
+        _ if std::env::var_os("AWS_WEB_IDENTITY_TOKEN_FILE").is_some() => {
+            rusoto_s3::S3Client::new_with(
+                http_client,
+                WebIdentityProvider::from_k8s_env(),
+                region,
+            )
+        }
+        _ => rusoto_s3::S3Client::new_with(
+            http_client,
+            InstanceMetadataProvider::new(),
+            region,
+        ),
+    };
+
+    Ok(AmazonS3 {
+        client_unrestricted: client,
+        connection_semaphore: Arc::new(Semaphore::new(max_connections.get())),
+        bucket_name: bucket_name.into(),
+    })
+}
+
+/// Create a new [`AmazonS3`] that always errors
+pub fn new_failing_s3() -> Result<AmazonS3> {
+    new_s3(
+        Some("foo"),
+        Some("bar"),
+        "us-east-1",
+        "bucket",
+        None as Option<&str>,
+        None as Option<&str>,
+        NonZeroUsize::new(16).unwrap(),
+        true,
+    )
+}
+
+/// S3 client bundled w/ a semaphore permit.
+#[derive(Clone)]
+struct SemaphoreClient {
+    /// Permit for this specific use of the client.
+    ///
+    /// Note that this field is never read and therefore considered "dead code" by rustc.
+    #[allow(dead_code)]
+    permit: Arc<OwnedSemaphorePermit>,
+
+    inner: rusoto_s3::S3Client,
+}
+
+impl Deref for SemaphoreClient {
+    type Target = rusoto_s3::S3Client;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+impl AmazonS3 {
+    /// Get a client according to the current connection limit.
+    async fn client(&self) -> SemaphoreClient {
+        let permit = Arc::clone(&self.connection_semaphore)
+            .acquire_owned()
+            .await
+            .expect("semaphore shouldn't be closed yet");
+        SemaphoreClient {
+            permit: Arc::new(permit),
+            inner: self.client_unrestricted.clone(),
+        }
+    }
+
+    async fn get_object(
+        &self,
+        location: &Path,
+        range: Option<Range<usize>>,
+    ) -> Result<impl Stream<Item = Result<Bytes>>> {
+        let key = location.to_string();
+        let get_request = rusoto_s3::GetObjectRequest {
+            bucket: self.bucket_name.clone(),
+            key: key.clone(),
+            range: range.map(format_http_range),
+            ..Default::default()
+        };
+        let bucket_name = self.bucket_name.clone();
+        let stream = self
+            .client()
+            .await
+            .get_object(get_request)
+            .await
+            .map_err(|e| match e {
+                rusoto_core::RusotoError::Service(
+                    rusoto_s3::GetObjectError::NoSuchKey(_),
+                ) => Error::NotFound {
+                    path: key.clone(),
+                    source: e.into(),
+                },
+                _ => Error::UnableToGetData {
+                    bucket: self.bucket_name.to_owned(),
+                    path: key.clone(),
+                    source: e,
+                },
+            })?
+            .body
+            .context(NoDataSnafu {
+                bucket: self.bucket_name.to_owned(),
+                path: key.clone(),
+            })?
+            .map_err(move |source| Error::UnableToGetPieceOfData {
+                source,
+                bucket: bucket_name.clone(),
+                path: key.clone(),
+            })
+            .err_into();
+
+        Ok(stream)
+    }
+
+    async fn list_objects_v2(
+        &self,
+        prefix: Option<&Path>,
+        delimiter: Option<String>,
+    ) -> Result<BoxStream<'_, Result<rusoto_s3::ListObjectsV2Output>>> {
+        enum ListState {
+            Start,
+            HasMore(String),
+            Done,
+        }
+
+        let prefix = format_prefix(prefix);
+        let bucket = self.bucket_name.clone();
+
+        let request_factory = move || rusoto_s3::ListObjectsV2Request {
+            bucket,
+            prefix,
+            delimiter,
+            ..Default::default()
+        };
+        let s3 = self.client().await;
+
+        Ok(stream::unfold(ListState::Start, move |state| {
+            let request_factory = request_factory.clone();
+            let s3 = s3.clone();
+
+            async move {
+                let continuation_token = match &state {
+                    ListState::HasMore(continuation_token) => Some(continuation_token),
+                    ListState::Done => {
+                        return None;
+                    }
+                    // If this is the first request we've made, we don't need to make any
+                    // modifications to the request
+                    ListState::Start => None,
+                };
+
+                let resp = s3_request(move || {
+                    let (s3, request_factory, continuation_token) = (
+                        s3.clone(),
+                        request_factory.clone(),
+                        continuation_token.cloned(),
+                    );
+
+                    async move {
+                        s3.list_objects_v2(rusoto_s3::ListObjectsV2Request {
+                            continuation_token,
+                            ..request_factory()
+                        })
+                        .await
+                    }
+                })
+                .await;
+
+                let resp = match resp {
+                    Ok(resp) => resp,
+                    Err(e) => return Some((Err(e), state)),
+                };
+
+                // The AWS response contains a field named `is_truncated` as well as
+                // `next_continuation_token`, and we're assuming that `next_continuation_token`
+                // is only set when `is_truncated` is true (and therefore not
+                // checking `is_truncated`).
+                let next_state = if let Some(next_continuation_token) =
+                    &resp.next_continuation_token
+                {
+                    ListState::HasMore(next_continuation_token.to_string())
+                } else {
+                    ListState::Done
+                };
+
+                Some((Ok(resp), next_state))
+            }
+        })
+        .map_err(move |e| {
+            Error::UnableToListData {
+                source: e,
+                bucket: self.bucket_name.clone(),
+            }
+            .into()
+        })
+        .boxed())
+    }
+}
+
+/// Handles retrying a request to S3 up to `MAX_NUM_RETRIES` times if S3 returns 5xx server errors.
+///
+/// The `future_factory` argument is a function `F` that takes no arguments and, when called, will
+/// return a `Future` (type `G`) that, when `await`ed, will perform a request to S3 through
+/// `rusoto` and return a `Result` that returns some type `R` on success and some
+/// `rusoto_core::RusotoError<E>` on error.
+///
+/// If the executed `Future` returns success, this function will return that success.
+/// If the executed `Future` returns a 5xx server error, this function will wait an amount of
+/// time that increases exponentially with the number of times it has retried, get a new `Future` by
+/// calling `future_factory` again, and retry the request by `await`ing the `Future` again.
+/// The retries will continue until the maximum number of retries has been attempted. In that case,
+/// this function will return the last encountered error.
+///
+/// Client errors (4xx) will never be retried by this function.
+async fn s3_request<E, F, G, R>(
+    future_factory: F,
+) -> Result<R, rusoto_core::RusotoError<E>>
+where
+    E: std::error::Error + Send,
+    F: Fn() -> G + Send,
+    G: Future<Output = Result<R, rusoto_core::RusotoError<E>>> + Send,
+    R: Send,
+{
+    let mut attempts = 0;
+
+    loop {
+        let request = future_factory();
+
+        let result = request.await;
+
+        match result {
+            Ok(r) => return Ok(r),
+            Err(error) => {
+                attempts += 1;
+
+                let should_retry = matches!(
+                    error,
+                    rusoto_core::RusotoError::Unknown(ref response)
+                        if response.status.is_server_error()
+                );
+
+                if attempts > MAX_NUM_RETRIES {
+                    warn!(
+                        ?error,
+                        attempts, "maximum number of retries exceeded for AWS S3 request"
+                    );
+                    return Err(error);
+                } else if !should_retry {
+                    return Err(error);
+                } else {
+                    debug!(?error, attempts, "retrying AWS S3 request");
+                    let wait_time = Duration::from_millis(2u64.pow(attempts) * 50);
+                    tokio::time::sleep(wait_time).await;
+                }
+            }
+        }
+    }
+}
+
+impl Error {
+    #[cfg(test)]
+    fn s3_error_due_to_credentials(&self) -> bool {
+        use rusoto_core::RusotoError;
+        use Error::*;
+
+        matches!(
+            self,
+            UnableToPutData {
+                source: RusotoError::Credentials(_),
+                bucket: _,
+                path: _,
+            } | UnableToGetData {
+                source: RusotoError::Credentials(_),
+                bucket: _,
+                path: _,
+            } | UnableToDeleteData {
+                source: RusotoError::Credentials(_),
+                bucket: _,
+                path: _,
+            } | UnableToListData {
+                source: RusotoError::Credentials(_),
+                bucket: _,
+            }
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{
+        tests::{
+            get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
+            put_get_delete_list, rename_and_copy,
+        },
+        Error as ObjectStoreError, ObjectStore,
+    };
+    use bytes::Bytes;
+    use std::env;
+
+    type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
+    type Result<T, E = TestError> = std::result::Result<T, E>;
+
+    const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+    #[derive(Debug)]
+    struct AwsConfig {
+        access_key_id: String,
+        secret_access_key: String,
+        region: String,
+        bucket: String,
+        endpoint: Option<String>,
+        token: Option<String>,
+    }
+
+    // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set.
+    macro_rules! maybe_skip_integration {
+        () => {{
+            dotenv::dotenv().ok();
+
+            let required_vars = [
+                "AWS_DEFAULT_REGION",
+                "OBJECT_STORE_BUCKET",
+                "AWS_ACCESS_KEY_ID",
+                "AWS_SECRET_ACCESS_KEY",
+            ];
+            let unset_vars: Vec<_> = required_vars
+                .iter()
+                .filter_map(|&name| match env::var(name) {
+                    Ok(_) => None,
+                    Err(_) => Some(name),
+                })
+                .collect();
+            let unset_var_names = unset_vars.join(", ");
+
+            let force = env::var("TEST_INTEGRATION");
+
+            if force.is_ok() && !unset_var_names.is_empty() {
+                panic!(
+                    "TEST_INTEGRATION is set, \
+                            but variable(s) {} need to be set",
+                    unset_var_names
+                );
+            } else if force.is_err() {
+                eprintln!(
+                    "skipping AWS integration test - set {}TEST_INTEGRATION to run",
+                    if unset_var_names.is_empty() {
+                        String::new()
+                    } else {
+                        format!("{} and ", unset_var_names)
+                    }
+                );
+                return;
+            } else {
+                AwsConfig {
+                    access_key_id: env::var("AWS_ACCESS_KEY_ID")
+                        .expect("already checked AWS_ACCESS_KEY_ID"),
+                    secret_access_key: env::var("AWS_SECRET_ACCESS_KEY")
+                        .expect("already checked AWS_SECRET_ACCESS_KEY"),
+                    region: env::var("AWS_DEFAULT_REGION")
+                        .expect("already checked AWS_DEFAULT_REGION"),
+                    bucket: env::var("OBJECT_STORE_BUCKET")
+                        .expect("already checked OBJECT_STORE_BUCKET"),
+                    endpoint: env::var("AWS_ENDPOINT").ok(),
+                    token: env::var("AWS_SESSION_TOKEN").ok(),
+                }
+            }
+        }};
+    }
+
+    fn check_credentials<T>(r: Result<T>) -> Result<T> {
+        if let Err(e) = &r {
+            let e = &**e;
+            if let Some(e) = e.downcast_ref::<Error>() {
+                if e.s3_error_due_to_credentials() {
+                    eprintln!(
+                        "Try setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY \
+                               environment variables"
+                    );
+                }
+            }
+        }
+
+        r
+    }
+
+    fn make_integration(config: AwsConfig) -> AmazonS3 {
+        new_s3(
+            Some(config.access_key_id),
+            Some(config.secret_access_key),
+            config.region,
+            config.bucket,
+            config.endpoint,
+            config.token,
+            NonZeroUsize::new(16).unwrap(),
+            true,
+        )
+        .expect("Valid S3 config")
+    }
+
+    #[tokio::test]
+    async fn s3_test() {
+        let config = maybe_skip_integration!();
+        let integration = make_integration(config);
+
+        check_credentials(put_get_delete_list(&integration).await).unwrap();
+        check_credentials(list_uses_directories_correctly(&integration).await).unwrap();
+        check_credentials(list_with_delimiter(&integration).await).unwrap();
+        check_credentials(rename_and_copy(&integration).await).unwrap();
+    }
+
+    #[tokio::test]
+    async fn s3_test_get_nonexistent_location() {
+        let config = maybe_skip_integration!();
+        let integration = make_integration(config);
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = get_nonexistent_object(&integration, Some(location))
+            .await
+            .unwrap_err();
+        if let ObjectStoreError::NotFound { path, source } = err {
+            let source_variant = source.downcast_ref::<rusoto_core::RusotoError<_>>();
+            assert!(
+                matches!(
+                    source_variant,
+                    Some(rusoto_core::RusotoError::Service(
+                        rusoto_s3::GetObjectError::NoSuchKey(_)
+                    )),
+                ),
+                "got: {:?}",
+                source_variant
+            );
+            assert_eq!(path, NON_EXISTENT_NAME);
+        } else {
+            panic!("unexpected error type: {:?}", err);
+        }
+    }
+
+    #[tokio::test]
+    async fn s3_test_get_nonexistent_bucket() {
+        let mut config = maybe_skip_integration!();
+        config.bucket = NON_EXISTENT_NAME.into();
+        let integration = make_integration(config);
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = integration.get(&location).await.unwrap_err().to_string();
+        assert!(
+            err.contains("The specified bucket does not exist"),
+            "{}",
+            err
+        )
+    }
+
+    #[tokio::test]
+    async fn s3_test_put_nonexistent_bucket() {
+        let mut config = maybe_skip_integration!();
+        config.bucket = NON_EXISTENT_NAME.into();
+        let integration = make_integration(config);
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+        let data = Bytes::from("arbitrary data");
+
+        let err = integration
+            .put(&location, data)
+            .await
+            .unwrap_err()
+            .to_string();
+
+        assert!(
+            err.contains("The specified bucket does not exist")
+                && err.contains("Unable to PUT data"),
+            "{}",
+            err
+        )
+    }
+
+    #[tokio::test]
+    async fn s3_test_delete_nonexistent_location() {
+        let config = maybe_skip_integration!();
+        let integration = make_integration(config);
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        integration.delete(&location).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn s3_test_delete_nonexistent_bucket() {
+        let mut config = maybe_skip_integration!();
+        config.bucket = NON_EXISTENT_NAME.into();
+        let integration = make_integration(config);
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = integration.delete(&location).await.unwrap_err().to_string();
+        assert!(
+            err.contains("The specified bucket does not exist")
+                && err.contains("Unable to DELETE data"),
+            "{}",
+            err
+        )
+    }
+}
diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs
new file mode 100644
index 0000000..5f43279
--- /dev/null
+++ b/object_store/src/azure.rs
@@ -0,0 +1,646 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for Azure blob storage
+use crate::{
+    path::{Path, DELIMITER},
+    util::format_prefix,
+    GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+use async_trait::async_trait;
+use azure_core::{prelude::*, HttpClient};
+use azure_storage::core::prelude::{AsStorageClient, StorageAccountClient};
+use azure_storage_blobs::blob::responses::ListBlobsResponse;
+use azure_storage_blobs::blob::Blob;
+use azure_storage_blobs::{
+    prelude::{AsBlobClient, AsContainerClient, ContainerClient},
+    DeleteSnapshotsMethod,
+};
+use bytes::Bytes;
+use futures::{
+    stream::{self, BoxStream},
+    StreamExt, TryStreamExt,
+};
+use snafu::{ResultExt, Snafu};
+use std::collections::BTreeSet;
+use std::{convert::TryInto, sync::Arc};
+
+/// A specialized `Error` for Azure object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+    #[snafu(display(
+        "Unable to DELETE data. Container: {}, Location: {}, Error: {} ({:?})",
+        container,
+        path,
+        source,
+        source,
+    ))]
+    UnableToDeleteData {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to GET data. Container: {}, Location: {}, Error: {} ({:?})",
+        container,
+        path,
+        source,
+        source,
+    ))]
+    UnableToGetData {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to HEAD data. Container: {}, Location: {}, Error: {} ({:?})",
+        container,
+        path,
+        source,
+        source,
+    ))]
+    UnableToHeadData {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to GET part of the data. Container: {}, Location: {}, Error: {} ({:?})",
+        container,
+        path,
+        source,
+        source,
+    ))]
+    UnableToGetPieceOfData {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})",
+        container,
+        path,
+        source,
+        source,
+    ))]
+    UnableToPutData {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+        path: String,
+    },
+
+    #[snafu(display(
+        "Unable to list data. Bucket: {}, Error: {} ({:?})",
+        container,
+        source,
+        source,
+    ))]
+    UnableToListData {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+    },
+
+    #[snafu(display(
+        "Unable to copy object. Container: {}, From: {}, To: {}, Error: {}",
+        container,
+        from,
+        to,
+        source
+    ))]
+    UnableToCopyFile {
+        source: Box<dyn std::error::Error + Send + Sync>,
+        container: String,
+        from: String,
+        to: String,
+    },
+
+    #[snafu(display(
+        "Unable parse source url. Container: {}, Error: {}",
+        container,
+        source
+    ))]
+    UnableToParseUrl {
+        source: url::ParseError,
+        container: String,
+    },
+
+    NotFound {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    AlreadyExists {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    #[cfg(not(feature = "azure_test"))]
+    #[snafu(display(
+        "Azurite (azure emulator) support not compiled in, please add `azure_test` feature"
+    ))]
+    NoEmulatorFeature,
+}
+
+impl From<Error> for super::Error {
+    fn from(source: Error) -> Self {
+        match source {
+            Error::NotFound { path, source } => Self::NotFound { path, source },
+            Error::AlreadyExists { path, source } => Self::AlreadyExists { path, source },
+            _ => Self::Generic {
+                store: "Azure Blob Storage",
+                source: Box::new(source),
+            },
+        }
+    }
+}
+
+/// Configuration for connecting to [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
+#[derive(Debug)]
+pub struct MicrosoftAzure {
+    container_client: Arc<ContainerClient>,
+    container_name: String,
+    blob_base_url: String,
+    is_emulator: bool,
+}
+
+impl std::fmt::Display for MicrosoftAzure {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self.is_emulator {
+            true => write!(f, "MicrosoftAzureEmulator({})", self.container_name),
+            false => write!(f, "MicrosoftAzure({})", self.container_name),
+        }
+    }
+}
+
+#[allow(clippy::borrowed_box)]
+fn check_err_not_found(err: &Box<dyn std::error::Error + Send + Sync>) -> bool {
+    if let Some(azure_core::HttpError::StatusCode { status, .. }) =
+        err.downcast_ref::<azure_core::HttpError>()
+    {
+        return status.as_u16() == 404;
+    };
+    false
+}
+
+#[async_trait]
+impl ObjectStore for MicrosoftAzure {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let bytes = bytes::BytesMut::from(&*bytes);
+
+        self.container_client
+            .as_blob_client(location.as_ref())
+            .put_block_blob(bytes)
+            .execute()
+            .await
+            .context(UnableToPutDataSnafu {
+                container: &self.container_name,
+                path: location.to_owned(),
+            })?;
+
+        Ok(())
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let blob = self
+            .container_client
+            .as_blob_client(location.as_ref())
+            .get()
+            .execute()
+            .await
+            .map_err(|err| {
+                if check_err_not_found(&err) {
+                    return Error::NotFound {
+                        source: err,
+                        path: location.to_string(),
+                    };
+                };
+                Error::UnableToGetData {
+                    source: err,
+                    container: self.container_name.clone(),
+                    path: location.to_string(),
+                }
+            })?;
+
+        Ok(GetResult::Stream(
+            futures::stream::once(async move { Ok(blob.data) }).boxed(),
+        ))
+    }
+
+    async fn get_range(
+        &self,
+        location: &Path,
+        range: std::ops::Range<usize>,
+    ) -> Result<Bytes> {
+        let blob = self
+            .container_client
+            .as_blob_client(location.as_ref())
+            .get()
+            .range(range)
+            .execute()
+            .await
+            .map_err(|err| {
+                if check_err_not_found(&err) {
+                    return Error::NotFound {
+                        source: err,
+                        path: location.to_string(),
+                    };
+                };
+                Error::UnableToGetPieceOfData {
+                    source: err,
+                    container: self.container_name.clone(),
+                    path: location.to_string(),
+                }
+            })?;
+
+        Ok(blob.data)
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let res = self
+            .container_client
+            .as_blob_client(location.as_ref())
+            .get_properties()
+            .execute()
+            .await
+            .map_err(|err| {
+                if check_err_not_found(&err) {
+                    return Error::NotFound {
+                        source: err,
+                        path: location.to_string(),
+                    };
+                };
+                Error::UnableToHeadData {
+                    source: err,
+                    container: self.container_name.clone(),
+                    path: location.to_string(),
+                }
+            })?;
+
+        convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound {
+            path: location.to_string(),
+            source: "is directory".to_string().into(),
+        })
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.container_client
+            .as_blob_client(location.as_ref())
+            .delete()
+            .delete_snapshots_method(DeleteSnapshotsMethod::Include)
+            .execute()
+            .await
+            .context(UnableToDeleteDataSnafu {
+                container: &self.container_name,
+                path: location.to_string(),
+            })?;
+
+        Ok(())
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let stream = self
+            .list_impl(prefix, false)
+            .await?
+            .map_ok(|resp| {
+                let names = resp
+                    .blobs
+                    .blobs
+                    .into_iter()
+                    .filter_map(|blob| convert_object_meta(blob).transpose());
+                futures::stream::iter(names)
+            })
+            .try_flatten()
+            .boxed();
+
+        Ok(stream)
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let mut stream = self.list_impl(prefix, true).await?;
+
+        let mut common_prefixes = BTreeSet::new();
+        let mut objects = Vec::new();
+
+        while let Some(res) = stream.next().await {
+            let response = res?;
+
+            let prefixes = response.blobs.blob_prefix.unwrap_or_default();
+            for p in prefixes {
+                common_prefixes.insert(Path::parse(&p.name)?);
+            }
+
+            let blobs = response.blobs.blobs;
+            objects.reserve(blobs.len());
+            for blob in blobs {
+                if let Some(meta) = convert_object_meta(blob)? {
+                    objects.push(meta);
+                }
+            }
+        }
+
+        Ok(ListResult {
+            common_prefixes: common_prefixes.into_iter().collect(),
+            objects,
+        })
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let from_url = self.get_copy_from_url(from)?;
+        self.container_client
+            .as_blob_client(to.as_ref())
+            .copy(&from_url)
+            .execute()
+            .await
+            .context(UnableToCopyFileSnafu {
+                container: &self.container_name,
+                from: from.as_ref(),
+                to: to.as_ref(),
+            })?;
+        Ok(())
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let from_url = self.get_copy_from_url(from)?;
+        self.container_client
+            .as_blob_client(to.as_ref())
+            .copy(&from_url)
+            .if_match_condition(IfMatchCondition::NotMatch("*".to_string()))
+            .execute()
+            .await
+            .map_err(|err| {
+                if let Some(azure_core::HttpError::StatusCode { status, .. }) =
+                    err.downcast_ref::<azure_core::HttpError>()
+                {
+                    if status.as_u16() == 409 {
+                        return Error::AlreadyExists {
+                            source: err,
+                            path: to.to_string(),
+                        };
+                    };
+                };
+                Error::UnableToCopyFile {
+                    source: err,
+                    container: self.container_name.clone(),
+                    from: from.to_string(),
+                    to: to.to_string(),
+                }
+            })?;
+        Ok(())
+    }
+}
+
+impl MicrosoftAzure {
+    /// helper function to create a source url for copy function
+    fn get_copy_from_url(&self, from: &Path) -> Result<reqwest::Url> {
+        Ok(reqwest::Url::parse(&format!(
+            "{}/{}/{}",
+            &self.blob_base_url, self.container_name, from
+        ))
+        .context(UnableToParseUrlSnafu {
+            container: &self.container_name,
+        })?)
+    }
+
+    async fn list_impl(
+        &self,
+        prefix: Option<&Path>,
+        delimiter: bool,
+    ) -> Result<BoxStream<'_, Result<ListBlobsResponse>>> {
+        enum ListState {
+            Start,
+            HasMore(String),
+            Done,
+        }
+
+        let prefix_raw = format_prefix(prefix);
+
+        Ok(stream::unfold(ListState::Start, move |state| {
+            let mut request = self.container_client.list_blobs();
+
+            if let Some(p) = prefix_raw.as_deref() {
+                request = request.prefix(p);
+            }
+
+            if delimiter {
+                request = request.delimiter(Delimiter::new(DELIMITER));
+            }
+
+            async move {
+                match state {
+                    ListState::HasMore(ref marker) => {
+                        request = request.next_marker(marker as &str);
+                    }
+                    ListState::Done => {
+                        return None;
+                    }
+                    ListState::Start => {}
+                }
+
+                let resp = match request.execute().await.context(UnableToListDataSnafu {
+                    container: &self.container_name,
+                }) {
+                    Ok(resp) => resp,
+                    Err(err) => return Some((Err(crate::Error::from(err)), state)),
+                };
+
+                let next_state = if let Some(marker) = &resp.next_marker {
+                    ListState::HasMore(marker.as_str().to_string())
+                } else {
+                    ListState::Done
+                };
+
+                Some((Ok(resp), next_state))
+            }
+        })
+        .boxed())
+    }
+}
+
+/// Returns `None` if is a directory
+fn convert_object_meta(blob: Blob) -> Result<Option<ObjectMeta>> {
+    let location = Path::parse(blob.name)?;
+    let last_modified = blob.properties.last_modified;
+    let size = blob
+        .properties
+        .content_length
+        .try_into()
+        .expect("unsupported size on this platform");
+
+    // This is needed to filter out gen2 directories
+    // https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-known-issues#blob-storage-apis
+    Ok((size > 0).then(|| ObjectMeta {
+        location,
+        last_modified,
+        size,
+    }))
+}
+
+#[cfg(feature = "azure_test")]
+fn check_if_emulator_works() -> Result<()> {
+    Ok(())
+}
+
+#[cfg(not(feature = "azure_test"))]
+fn check_if_emulator_works() -> Result<()> {
+    Err(Error::NoEmulatorFeature.into())
+}
+
+/// Configure a connection to container with given name on Microsoft Azure
+/// Blob store.
+///
+/// The credentials `account` and `access_key` must provide access to the
+/// store.
+pub fn new_azure(
+    account: impl Into<String>,
+    access_key: impl Into<String>,
+    container_name: impl Into<String>,
+    use_emulator: bool,
+) -> Result<MicrosoftAzure> {
+    let account = account.into();
+    let access_key = access_key.into();
+    let http_client: Arc<dyn HttpClient> = Arc::new(reqwest::Client::new());
+
+    let (is_emulator, storage_account_client) = if use_emulator {
+        check_if_emulator_works()?;
+        (true, StorageAccountClient::new_emulator_default())
+    } else {
+        (
+            false,
+            StorageAccountClient::new_access_key(
+                Arc::clone(&http_client),
+                &account,
+                &access_key,
+            ),
+        )
+    };
+
+    let storage_client = storage_account_client.as_storage_client();
+    let blob_base_url = storage_account_client
+        .blob_storage_url()
+        .as_ref()
+        // make url ending consistent between the emulator and remote storage account
+        .trim_end_matches('/')
+        .to_string();
+
+    let container_name = container_name.into();
+
+    let container_client = storage_client.as_container_client(&container_name);
+
+    Ok(MicrosoftAzure {
+        container_client,
+        container_name,
+        blob_base_url,
+        is_emulator,
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::azure::new_azure;
+    use crate::tests::{
+        copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
+        put_get_delete_list, rename_and_copy,
+    };
+    use std::env;
+
+    #[derive(Debug)]
+    struct AzureConfig {
+        storage_account: String,
+        access_key: String,
+        bucket: String,
+        use_emulator: bool,
+    }
+
+    // Helper macro to skip tests if TEST_INTEGRATION and the Azure environment
+    // variables are not set.
+    macro_rules! maybe_skip_integration {
+        () => {{
+            dotenv::dotenv().ok();
+
+            let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok();
+
+            let mut required_vars = vec!["OBJECT_STORE_BUCKET"];
+            if !use_emulator {
+                required_vars.push("AZURE_STORAGE_ACCOUNT");
+                required_vars.push("AZURE_STORAGE_ACCESS_KEY");
+            }
+            let unset_vars: Vec<_> = required_vars
+                .iter()
+                .filter_map(|&name| match env::var(name) {
+                    Ok(_) => None,
+                    Err(_) => Some(name),
+                })
+                .collect();
+            let unset_var_names = unset_vars.join(", ");
+
+            let force = std::env::var("TEST_INTEGRATION");
+
+            if force.is_ok() && !unset_var_names.is_empty() {
+                panic!(
+                    "TEST_INTEGRATION is set, \
+                        but variable(s) {} need to be set",
+                    unset_var_names
+                )
+            } else if force.is_err() {
+                eprintln!(
+                    "skipping Azure integration test - set {}TEST_INTEGRATION to run",
+                    if unset_var_names.is_empty() {
+                        String::new()
+                    } else {
+                        format!("{} and ", unset_var_names)
+                    }
+                );
+                return;
+            } else {
+                AzureConfig {
+                    storage_account: env::var("AZURE_STORAGE_ACCOUNT")
+                        .unwrap_or_default(),
+                    access_key: env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(),
+                    bucket: env::var("OBJECT_STORE_BUCKET")
+                        .expect("already checked OBJECT_STORE_BUCKET"),
+                    use_emulator,
+                }
+            }
+        }};
+    }
+
+    #[tokio::test]
+    async fn azure_blob_test() {
+        let config = maybe_skip_integration!();
+        let integration = new_azure(
+            config.storage_account,
+            config.access_key,
+            config.bucket,
+            config.use_emulator,
+        )
+        .unwrap();
+
+        put_get_delete_list(&integration).await.unwrap();
+        list_uses_directories_correctly(&integration).await.unwrap();
+        list_with_delimiter(&integration).await.unwrap();
+        rename_and_copy(&integration).await.unwrap();
+        copy_if_not_exists(&integration).await.unwrap();
+    }
+}
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
new file mode 100644
index 0000000..84fb572
--- /dev/null
+++ b/object_store/src/gcp.rs
@@ -0,0 +1,721 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for Google Cloud Storage
+use std::collections::BTreeSet;
+use std::fs::File;
+use std::io::BufReader;
+use std::ops::Range;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{stream::BoxStream, StreamExt, TryStreamExt};
+use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
+use reqwest::header::RANGE;
+use reqwest::{header, Client, Method, Response, StatusCode};
+use snafu::{ResultExt, Snafu};
+
+use crate::util::format_http_range;
+use crate::{
+    oauth::OAuthProvider,
+    path::{Path, DELIMITER},
+    token::TokenCache,
+    util::format_prefix,
+    GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+
+#[derive(Debug, Snafu)]
+enum Error {
+    #[snafu(display("Unable to open service account file: {}", source))]
+    OpenCredentials { source: std::io::Error },
+
+    #[snafu(display("Unable to decode service account file: {}", source))]
+    DecodeCredentials { source: serde_json::Error },
+
+    #[snafu(display("Error performing list request: {}", source))]
+    ListRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing get request {}: {}", path, source))]
+    GetRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing delete request {}: {}", path, source))]
+    DeleteRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing copy request {}: {}", path, source))]
+    CopyRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing put request: {}", source))]
+    PutRequest { source: reqwest::Error },
+
+    #[snafu(display("Error decoding object size: {}", source))]
+    InvalidSize { source: std::num::ParseIntError },
+}
+
+impl From<Error> for super::Error {
+    fn from(err: Error) -> Self {
+        match err {
+            Error::GetRequest { source, path }
+            | Error::DeleteRequest { source, path }
+            | Error::CopyRequest { source, path }
+                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
+            {
+                Self::NotFound {
+                    path,
+                    source: Box::new(source),
+                }
+            }
+            _ => Self::Generic {
+                store: "GCS",
+                source: Box::new(err),
+            },
+        }
+    }
+}
+
+/// A deserialized `service-account-********.json`-file.
+#[derive(serde::Deserialize, Debug)]
+struct ServiceAccountCredentials {
+    /// The private key in RSA format.
+    pub private_key: String,
+
+    /// The email address associated with the service account.
+    pub client_email: String,
+
+    /// Base URL for GCS
+    #[serde(default = "default_gcs_base_url")]
+    pub gcs_base_url: String,
+
+    /// Disable oauth and use empty tokens.
+    #[serde(default = "default_disable_oauth")]
+    pub disable_oauth: bool,
+}
+
+fn default_gcs_base_url() -> String {
+    "https://storage.googleapis.com".to_owned()
+}
+
+fn default_disable_oauth() -> bool {
+    false
+}
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(rename_all = "camelCase")]
+struct ListResponse {
+    next_page_token: Option<String>,
+    #[serde(default)]
+    prefixes: Vec<String>,
+    #[serde(default)]
+    items: Vec<Object>,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct Object {
+    name: String,
+    size: String,
+    updated: DateTime<Utc>,
+}
+
+/// Configuration for connecting to [Google Cloud Storage](https://cloud.google.com/storage/).
+#[derive(Debug)]
+pub struct GoogleCloudStorage {
+    client: Client,
+    base_url: String,
+
+    oauth_provider: Option<OAuthProvider>,
+    token_cache: TokenCache<String>,
+
+    bucket_name: String,
+    bucket_name_encoded: String,
+
+    // TODO: Hook this up in tests
+    max_list_results: Option<String>,
+}
+
+impl std::fmt::Display for GoogleCloudStorage {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "GoogleCloudStorage({})", self.bucket_name)
+    }
+}
+
+impl GoogleCloudStorage {
+    async fn get_token(&self) -> Result<String> {
+        if let Some(oauth_provider) = &self.oauth_provider {
+            Ok(self
+                .token_cache
+                .get_or_insert_with(|| oauth_provider.fetch_token(&self.client))
+                .await?)
+        } else {
+            Ok("".to_owned())
+        }
+    }
+
+    fn object_url(&self, path: &Path) -> String {
+        let encoded =
+            percent_encoding::utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
+        format!(
+            "{}/storage/v1/b/{}/o/{}",
+            self.base_url, self.bucket_name_encoded, encoded
+        )
+    }
+
+    /// Perform a get request <https://cloud.google.com/storage/docs/json_api/v1/objects/get>
+    async fn get_request(
+        &self,
+        path: &Path,
+        range: Option<Range<usize>>,
+        head: bool,
+    ) -> Result<Response> {
+        let token = self.get_token().await?;
+        let url = self.object_url(path);
+
+        let mut builder = self.client.request(Method::GET, url);
+
+        if let Some(range) = range {
+            builder = builder.header(RANGE, format_http_range(range));
+        }
+
+        let alt = match head {
+            true => "json",
+            false => "media",
+        };
+
+        let response = builder
+            .bearer_auth(token)
+            .query(&[("alt", alt)])
+            .send()
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?
+            .error_for_status()
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+
+    /// Perform a put request <https://cloud.google.com/storage/docs/json_api/v1/objects/insert>
+    async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
+        let token = self.get_token().await?;
+        let url = format!(
+            "{}/upload/storage/v1/b/{}/o",
+            self.base_url, self.bucket_name_encoded
+        );
+
+        self.client
+            .request(Method::POST, url)
+            .bearer_auth(token)
+            .header(header::CONTENT_TYPE, "application/octet-stream")
+            .header(header::CONTENT_LENGTH, payload.len())
+            .query(&[("uploadType", "media"), ("name", path.as_ref())])
+            .body(payload)
+            .send()
+            .await
+            .context(PutRequestSnafu)?
+            .error_for_status()
+            .context(PutRequestSnafu)?;
+
+        Ok(())
+    }
+
+    /// Perform a delete request <https://cloud.google.com/storage/docs/json_api/v1/objects/delete>
+    async fn delete_request(&self, path: &Path) -> Result<()> {
+        let token = self.get_token().await?;
+        let url = self.object_url(path);
+
+        let builder = self.client.request(Method::DELETE, url);
+        builder
+            .bearer_auth(token)
+            .send()
+            .await
+            .context(DeleteRequestSnafu {
+                path: path.as_ref(),
+            })?
+            .error_for_status()
+            .context(DeleteRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(())
+    }
+
+    /// Perform a copy request <https://cloud.google.com/storage/docs/json_api/v1/objects/copy>
+    async fn copy_request(
+        &self,
+        from: &Path,
+        to: &Path,
+        if_not_exists: bool,
+    ) -> Result<()> {
+        let token = self.get_token().await?;
+
+        let source =
+            percent_encoding::utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
+        let destination =
+            percent_encoding::utf8_percent_encode(to.as_ref(), NON_ALPHANUMERIC);
+        let url = format!(
+            "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
+            self.base_url,
+            self.bucket_name_encoded,
+            source,
+            self.bucket_name_encoded,
+            destination
+        );
+
+        let mut builder = self.client.request(Method::POST, url);
+
+        if if_not_exists {
+            builder = builder.query(&[("ifGenerationMatch", "0")]);
+        }
+
+        builder
+            .bearer_auth(token)
+            .send()
+            .await
+            .context(CopyRequestSnafu {
+                path: from.as_ref(),
+            })?
+            .error_for_status()
+            .context(CopyRequestSnafu {
+                path: from.as_ref(),
+            })?;
+
+        Ok(())
+    }
+
+    /// Perform a list request <https://cloud.google.com/storage/docs/json_api/v1/objects/list>
+    async fn list_request(
+        &self,
+        prefix: Option<&str>,
+        delimiter: bool,
+        page_token: Option<&str>,
+    ) -> Result<ListResponse> {
+        let token = self.get_token().await?;
+
+        let url = format!(
+            "{}/storage/v1/b/{}/o",
+            self.base_url, self.bucket_name_encoded
+        );
+
+        let mut query = Vec::with_capacity(4);
+        if delimiter {
+            query.push(("delimiter", DELIMITER))
+        }
+
+        if let Some(prefix) = &prefix {
+            query.push(("prefix", prefix))
+        }
+
+        if let Some(page_token) = page_token {
+            query.push(("pageToken", page_token))
+        }
+
+        if let Some(max_results) = &self.max_list_results {
+            query.push(("maxResults", max_results))
+        }
+
+        let response: ListResponse = self
+            .client
+            .request(Method::GET, url)
+            .query(&query)
+            .bearer_auth(token)
+            .send()
+            .await
+            .context(ListRequestSnafu)?
+            .error_for_status()
+            .context(ListRequestSnafu)?
+            .json()
+            .await
+            .context(ListRequestSnafu)?;
+
+        Ok(response)
+    }
+
+    /// Perform a list operation automatically handling pagination
+    fn list_paginated(
+        &self,
+        prefix: Option<&Path>,
+        delimiter: bool,
+    ) -> Result<BoxStream<'_, Result<ListResponse>>> {
+        let prefix = format_prefix(prefix);
+
+        enum ListState {
+            Start,
+            HasMore(String),
+            Done,
+        }
+
+        Ok(futures::stream::unfold(ListState::Start, move |state| {
+            let prefix = prefix.clone();
+
+            async move {
+                let page_token = match &state {
+                    ListState::Start => None,
+                    ListState::HasMore(page_token) => Some(page_token.as_str()),
+                    ListState::Done => {
+                        return None;
+                    }
+                };
+
+                let resp = match self
+                    .list_request(prefix.as_deref(), delimiter, page_token)
+                    .await
+                {
+                    Ok(resp) => resp,
+                    Err(e) => return Some((Err(e), state)),
+                };
+
+                let next_state = match &resp.next_page_token {
+                    Some(token) => ListState::HasMore(token.clone()),
+                    None => ListState::Done,
+                };
+
+                Some((Ok(resp), next_state))
+            }
+        })
+        .boxed())
+    }
+}
+
+#[async_trait]
+impl ObjectStore for GoogleCloudStorage {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.put_request(location, bytes).await
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let response = self.get_request(location, None, false).await?;
+        let stream = response
+            .bytes_stream()
+            .map_err(|source| crate::Error::Generic {
+                store: "GCS",
+                source: Box::new(source),
+            })
+            .boxed();
+
+        Ok(GetResult::Stream(stream))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let response = self.get_request(location, Some(range), false).await?;
+        Ok(response.bytes().await.context(GetRequestSnafu {
+            path: location.as_ref(),
+        })?)
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let response = self.get_request(location, None, true).await?;
+        let object = response.json().await.context(GetRequestSnafu {
+            path: location.as_ref(),
+        })?;
+        convert_object_meta(&object)
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.delete_request(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let stream = self
+            .list_paginated(prefix, false)?
+            .map_ok(|r| {
+                futures::stream::iter(
+                    r.items.into_iter().map(|x| convert_object_meta(&x)),
+                )
+            })
+            .try_flatten()
+            .boxed();
+
+        Ok(stream)
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let mut stream = self.list_paginated(prefix, true)?;
+
+        let mut common_prefixes = BTreeSet::new();
+        let mut objects = Vec::new();
+
+        while let Some(result) = stream.next().await {
+            let response = result?;
+
+            for p in response.prefixes {
+                common_prefixes.insert(Path::parse(p)?);
+            }
+
+            objects.reserve(response.items.len());
+            for object in &response.items {
+                objects.push(convert_object_meta(object)?);
+            }
+        }
+
+        Ok(ListResult {
+            common_prefixes: common_prefixes.into_iter().collect(),
+            objects,
+        })
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        self.copy_request(from, to, false).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.copy_request(from, to, true).await
+    }
+}
+
+fn reader_credentials_file(
+    service_account_path: impl AsRef<std::path::Path>,
+) -> Result<ServiceAccountCredentials> {
+    let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
+    let reader = BufReader::new(file);
+    Ok(serde_json::from_reader(reader).context(DecodeCredentialsSnafu)?)
+}
+
+/// Configure a connection to Google Cloud Storage.
+pub fn new_gcs(
+    service_account_path: impl AsRef<std::path::Path>,
+    bucket_name: impl Into<String>,
+) -> Result<GoogleCloudStorage> {
+    let credentials = reader_credentials_file(service_account_path)?;
+    let client = Client::new();
+
+    // TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes
+    let scope = "https://www.googleapis.com/auth/devstorage.full_control";
+    let audience = "https://www.googleapis.com/oauth2/v4/token".to_string();
+
+    let oauth_provider = (!credentials.disable_oauth)
+        .then(|| {
+            OAuthProvider::new(
+                credentials.client_email,
+                credentials.private_key,
+                scope.to_string(),
+                audience,
+            )
+        })
+        .transpose()?;
+
+    let bucket_name = bucket_name.into();
+    let encoded_bucket_name =
+        percent_encode(bucket_name.as_bytes(), NON_ALPHANUMERIC).to_string();
+
+    // The cloud storage crate currently only supports authentication via
+    // environment variables. Set the environment variable explicitly so
+    // that we can optionally accept command line arguments instead.
+    Ok(GoogleCloudStorage {
+        client,
+        base_url: credentials.gcs_base_url,
+        oauth_provider,
+        token_cache: Default::default(),
+        bucket_name,
+        bucket_name_encoded: encoded_bucket_name,
+        max_list_results: None,
+    })
+}
+
+fn convert_object_meta(object: &Object) -> Result<ObjectMeta> {
+    let location = Path::parse(&object.name)?;
+    let last_modified = object.updated;
+    let size = object.size.parse().context(InvalidSizeSnafu)?;
+
+    Ok(ObjectMeta {
+        location,
+        last_modified,
+        size,
+    })
+}
+
+#[cfg(test)]
+mod test {
+    use std::env;
+
+    use bytes::Bytes;
+
+    use crate::{
+        tests::{
+            get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
+            put_get_delete_list, rename_and_copy,
+        },
+        Error as ObjectStoreError, ObjectStore,
+    };
+
+    use super::*;
+
+    const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+    #[derive(Debug)]
+    struct GoogleCloudConfig {
+        bucket: String,
+        service_account: String,
+    }
+
+    // Helper macro to skip tests if TEST_INTEGRATION and the GCP environment variables are not set.
+    macro_rules! maybe_skip_integration {
+        () => {{
+            dotenv::dotenv().ok();
+
+            let required_vars = ["OBJECT_STORE_BUCKET", "GOOGLE_SERVICE_ACCOUNT"];
+            let unset_vars: Vec<_> = required_vars
+                .iter()
+                .filter_map(|&name| match env::var(name) {
+                    Ok(_) => None,
+                    Err(_) => Some(name),
+                })
+                .collect();
+            let unset_var_names = unset_vars.join(", ");
+
+            let force = std::env::var("TEST_INTEGRATION");
+
+            if force.is_ok() && !unset_var_names.is_empty() {
+                panic!(
+                    "TEST_INTEGRATION is set, \
+                            but variable(s) {} need to be set",
+                    unset_var_names
+                )
+            } else if force.is_err() {
+                eprintln!(
+                    "skipping Google Cloud integration test - set {}TEST_INTEGRATION to run",
+                    if unset_var_names.is_empty() {
+                        String::new()
+                    } else {
+                        format!("{} and ", unset_var_names)
+                    }
+                );
+                return;
+            } else {
+                GoogleCloudConfig {
+                    bucket: env::var("OBJECT_STORE_BUCKET")
+                        .expect("already checked OBJECT_STORE_BUCKET"),
+                    service_account: env::var("GOOGLE_SERVICE_ACCOUNT")
+                        .expect("already checked GOOGLE_SERVICE_ACCOUNT"),
+                }
+            }
+        }};
+    }
+
+    #[tokio::test]
+    async fn gcs_test() {
+        let config = maybe_skip_integration!();
+        let integration = new_gcs(config.service_account, config.bucket).unwrap();
+
+        put_get_delete_list(&integration).await.unwrap();
+        list_uses_directories_correctly(&integration).await.unwrap();
+        list_with_delimiter(&integration).await.unwrap();
+        rename_and_copy(&integration).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn gcs_test_get_nonexistent_location() {
+        let config = maybe_skip_integration!();
+        let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = integration.get(&location).await.unwrap_err();
+
+        assert!(
+            matches!(err, ObjectStoreError::NotFound { .. }),
+            "unexpected error type: {}",
+            err
+        );
+    }
+
+    #[tokio::test]
+    async fn gcs_test_get_nonexistent_bucket() {
+        let mut config = maybe_skip_integration!();
+        config.bucket = NON_EXISTENT_NAME.into();
+        let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = get_nonexistent_object(&integration, Some(location))
+            .await
+            .unwrap_err();
+
+        assert!(
+            matches!(err, ObjectStoreError::NotFound { .. }),
+            "unexpected error type: {}",
+            err
+        );
+    }
+
+    #[tokio::test]
+    async fn gcs_test_delete_nonexistent_location() {
+        let config = maybe_skip_integration!();
+        let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = integration.delete(&location).await.unwrap_err();
+        assert!(
+            matches!(err, ObjectStoreError::NotFound { .. }),
+            "unexpected error type: {}",
+            err
+        );
+    }
+
+    #[tokio::test]
+    async fn gcs_test_delete_nonexistent_bucket() {
+        let mut config = maybe_skip_integration!();
+        config.bucket = NON_EXISTENT_NAME.into();
+        let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+
+        let err = integration.delete(&location).await.unwrap_err();
+        assert!(
+            matches!(err, ObjectStoreError::NotFound { .. }),
+            "unexpected error type: {}",
+            err
+        );
+    }
+
+    #[tokio::test]
+    async fn gcs_test_put_nonexistent_bucket() {
+        let mut config = maybe_skip_integration!();
+        config.bucket = NON_EXISTENT_NAME.into();
+        let integration = new_gcs(config.service_account, &config.bucket).unwrap();
+
+        let location = Path::from_iter([NON_EXISTENT_NAME]);
+        let data = Bytes::from("arbitrary data");
+
+        let err = integration
+            .put(&location, data)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert!(
+            err.contains(
+                "Error performing put request: HTTP status client error (404 Not Found)"
+            ),
+            "{}",
+            err
+        )
+    }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
new file mode 100644
index 0000000..4a56b03
--- /dev/null
+++ b/object_store/src/lib.rs
@@ -0,0 +1,706 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
+#![warn(
+    missing_copy_implementations,
+    missing_debug_implementations,
+    missing_docs,
+    clippy::explicit_iter_loop,
+    clippy::future_not_send,
+    clippy::use_self,
+    clippy::clone_on_ref_ptr
+)]
+
+//! # object_store
+//!
+//! This crate provides APIs for interacting with object storage services.
+//!
+//! It currently supports PUT, GET, DELETE, HEAD and list for:
+//!
+//! * [Google Cloud Storage](https://cloud.google.com/storage/)
+//! * [Amazon S3](https://aws.amazon.com/s3/)
+//! * [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/#overview)
+//! * In-memory
+//! * Local file storage
+//!
+
+#[cfg(feature = "aws")]
+pub mod aws;
+#[cfg(feature = "azure")]
+pub mod azure;
+#[cfg(feature = "gcp")]
+pub mod gcp;
+pub mod local;
+pub mod memory;
+pub mod path;
+pub mod throttle;
+
+#[cfg(feature = "gcp")]
+mod oauth;
+
+#[cfg(feature = "gcp")]
+mod token;
+
+mod util;
+
+use crate::path::Path;
+use crate::util::{collect_bytes, maybe_spawn_blocking};
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::{stream::BoxStream, StreamExt};
+use snafu::Snafu;
+use std::fmt::{Debug, Formatter};
+use std::io::{Read, Seek, SeekFrom};
+use std::ops::Range;
+
+/// An alias for a dynamically dispatched object store implementation.
+pub type DynObjectStore = dyn ObjectStore;
+
+/// Universal API to multiple object store services.
+#[async_trait]
+pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
+    /// Save the provided bytes to the specified location.
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
+
+    /// Return the bytes that are stored at the specified location.
+    async fn get(&self, location: &Path) -> Result<GetResult>;
+
+    /// Return the bytes that are stored at the specified location
+    /// in the given byte range
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;
+
+    /// Return the metadata for the specified location
+    async fn head(&self, location: &Path) -> Result<ObjectMeta>;
+
+    /// Delete the object at the specified location.
+    async fn delete(&self, location: &Path) -> Result<()>;
+
+    /// List all the objects with the given prefix.
+    ///
+    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
+    /// `foo/bar_baz/x`.
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
+
+    /// List objects with the given prefix and an implementation specific
+    /// delimiter. Returns common prefixes (directories) in addition to object
+    /// metadata.
+    ///
+    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
+    /// `foo/bar_baz/x`.
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
+
+    /// Copy an object from one path to another in the same object store.
+    ///
+    /// If there exists an object at the destination, it will be overwritten.
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
+
+    /// Move an object from one path to another in the same object store.
+    ///
+    /// By default, this is implemented as a copy and then delete source. It may not
+    /// check when deleting source that it was the same object that was originally copied.
+    ///
+    /// If there exists an object at the destination, it will be overwritten.
+    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+        self.copy(from, to).await?;
+        self.delete(from).await
+    }
+
+    /// Copy an object from one path to another, only if destination is empty.
+    ///
+    /// Will return an error if the destination already has an object.
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
+
+    /// Move an object from one path to another in the same object store.
+    ///
+    /// Will return an error if the destination already has an object.
+    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.copy_if_not_exists(from, to).await?;
+        self.delete(from).await
+    }
+}
+
+/// Result of a list call that includes objects, prefixes (directories) and a
+/// token for the next set of results. Individual result sets may be limited to
+/// 1,000 objects based on the underlying object storage's limitations.
+#[derive(Debug)]
+pub struct ListResult {
+    /// Prefixes that are common (like directories)
+    pub common_prefixes: Vec<Path>,
+    /// Object metadata for the listing
+    pub objects: Vec<ObjectMeta>,
+}
+
+/// The metadata that describes an object.
+#[derive(Debug, Clone, PartialEq)]
+pub struct ObjectMeta {
+    /// The full path to the object
+    pub location: Path,
+    /// The last modified time
+    pub last_modified: DateTime<Utc>,
+    /// The size in bytes of the object
+    pub size: usize,
+}
+
+/// Result for a get request
+///
+/// This special cases the case of a local file, as some systems may
+/// be able to optimise the case of a file already present on local disk
+pub enum GetResult {
+    /// A file and its path on the local filesystem
+    File(std::fs::File, std::path::PathBuf),
+    /// An asynchronous stream
+    Stream(BoxStream<'static, Result<Bytes>>),
+}
+
+impl Debug for GetResult {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::File(_, _) => write!(f, "GetResult(File)"),
+            Self::Stream(_) => write!(f, "GetResult(Stream)"),
+        }
+    }
+}
+
+impl GetResult {
+    /// Collects the data into a [`Bytes`]
+    pub async fn bytes(self) -> Result<Bytes> {
+        match self {
+            Self::File(mut file, path) => {
+                maybe_spawn_blocking(move || {
+                    let len = file.seek(SeekFrom::End(0)).map_err(|source| {
+                        local::Error::Seek {
+                            source,
+                            path: path.clone(),
+                        }
+                    })?;
+
+                    file.seek(SeekFrom::Start(0)).map_err(|source| {
+                        local::Error::Seek {
+                            source,
+                            path: path.clone(),
+                        }
+                    })?;
+
+                    let mut buffer = Vec::with_capacity(len as usize);
+                    file.read_to_end(&mut buffer).map_err(|source| {
+                        local::Error::UnableToReadBytes { source, path }
+                    })?;
+
+                    Ok(buffer.into())
+                })
+                .await
+            }
+            Self::Stream(s) => collect_bytes(s, None).await,
+        }
+    }
+
+    /// Converts this into a byte stream
+    ///
+    /// If the result is [`Self::File`] will perform chunked reads of the file, otherwise
+    /// will return the [`Self::Stream`].
+    ///
+    /// # Tokio Compatibility
+    ///
+    /// Tokio discourages performing blocking IO on a tokio worker thread, however,
+    /// no major operating systems have stable async file APIs. Therefore if called from
+    /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
+    /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
+    ///
+    /// If not called from a tokio context, this will perform IO on the current thread with
+    /// no additional complexity or overheads
+    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
+        match self {
+            Self::File(file, path) => {
+                const CHUNK_SIZE: usize = 8 * 1024;
+
+                futures::stream::try_unfold(
+                    (file, path, false),
+                    |(mut file, path, finished)| {
+                        maybe_spawn_blocking(move || {
+                            if finished {
+                                return Ok(None);
+                            }
+
+                            let mut buffer = Vec::with_capacity(CHUNK_SIZE);
+                            let read = file
+                                .by_ref()
+                                .take(CHUNK_SIZE as u64)
+                                .read_to_end(&mut buffer)
+                                .map_err(|e| local::Error::UnableToReadBytes {
+                                    source: e,
+                                    path: path.clone(),
+                                })?;
+
+                            Ok(Some((buffer.into(), (file, path, read != CHUNK_SIZE))))
+                        })
+                    },
+                )
+                .boxed()
+            }
+            Self::Stream(s) => s,
+        }
+    }
+}
+
+/// A specialized `Result` for object store-related errors
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub enum Error {
+    #[snafu(display("Generic {} error: {}", store, source))]
+    Generic {
+        store: &'static str,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    #[snafu(display("Object at location {} not found: {}", path, source))]
+    NotFound {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    #[snafu(
+        display("Encountered object with invalid path: {}", source),
+        context(false)
+    )]
+    InvalidPath { source: path::Error },
+
+    #[snafu(display("Error joining spawned task: {}", source), context(false))]
+    JoinError { source: tokio::task::JoinError },
+
+    #[snafu(display("Operation not supported: {}", source))]
+    NotSupported {
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    #[snafu(display("Object at location {} already exists: {}", path, source))]
+    AlreadyExists {
+        path: String,
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
+    #[snafu(display("Operation not yet implemented."))]
+    NotImplemented,
+
+    #[cfg(feature = "gcp")]
+    #[snafu(display("OAuth error: {}", source), context(false))]
+    OAuth { source: oauth::Error },
+}
+
+#[cfg(test)]
+mod test_util {
+    use super::*;
+    use futures::TryStreamExt;
+
+    pub async fn flatten_list_stream(
+        storage: &DynObjectStore,
+        prefix: Option<&Path>,
+    ) -> Result<Vec<Path>> {
+        storage
+            .list(prefix)
+            .await?
+            .map_ok(|meta| meta.location)
+            .try_collect::<Vec<Path>>()
+            .await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::test_util::flatten_list_stream;
+
+    type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
+    type Result<T, E = Error> = std::result::Result<T, E>;
+
+    pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) -> Result<()> {
+        let store_str = storage.to_string();
+
+        delete_fixtures(storage).await;
+
+        let content_list = flatten_list_stream(storage, None).await?;
+        assert!(
+            content_list.is_empty(),
+            "Expected list to be empty; found: {:?}",
+            content_list
+        );
+
+        let location = Path::from("test_dir/test_file.json");
+
+        let data = Bytes::from("arbitrary data");
+        let expected_data = data.clone();
+        storage.put(&location, data).await?;
+
+        let root = Path::from("/");
+
+        // List everything
+        let content_list = flatten_list_stream(storage, None).await?;
+        assert_eq!(content_list, &[location.clone()]);
+
+        // Should behave the same as no prefix
+        let content_list = flatten_list_stream(storage, Some(&root)).await?;
+        assert_eq!(content_list, &[location.clone()]);
+
+        // List with delimiter
+        let result = storage.list_with_delimiter(None).await.unwrap();
+        assert_eq!(&result.objects, &[]);
+        assert_eq!(result.common_prefixes.len(), 1);
+        assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+        // Should behave the same as no prefix
+        let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
+        assert!(result.objects.is_empty());
+        assert_eq!(result.common_prefixes.len(), 1);
+        assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+        // List everything starting with a prefix that should return results
+        let prefix = Path::from("test_dir");
+        let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+        assert_eq!(content_list, &[location.clone()]);
+
+        // List everything starting with a prefix that shouldn't return results
+        let prefix = Path::from("something");
+        let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+        assert!(content_list.is_empty());
+
+        let read_data = storage.get(&location).await?.bytes().await?;
+        assert_eq!(&*read_data, expected_data);
+
+        // Test range request
+        let range = 3..7;
+        let range_result = storage.get_range(&location, range.clone()).await;
+
+        let out_of_range = 200..300;
+        let out_of_range_result = storage.get_range(&location, out_of_range).await;
+
+        if store_str.starts_with("MicrosoftAzureEmulator") {
+            // Azurite doesn't support x-ms-range-get-content-crc64 set by Azure SDK
+            // https://github.com/Azure/Azurite/issues/444
+            let err = range_result.unwrap_err().to_string();
+            assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
+
+            let err = out_of_range_result.unwrap_err().to_string();
+            assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
+        } else {
+            let bytes = range_result.unwrap();
+            assert_eq!(bytes, expected_data.slice(range));
+
+            // Should be a non-fatal error
+            out_of_range_result.unwrap_err();
+        }
+
+        let head = storage.head(&location).await?;
+        assert_eq!(head.size, expected_data.len());
+
+        storage.delete(&location).await?;
+
+        let content_list = flatten_list_stream(storage, None).await?;
+        assert!(content_list.is_empty());
+
+        let err = storage.get(&location).await.unwrap_err();
+        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+        let err = storage.head(&location).await.unwrap_err();
+        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+        // Test handling of paths containing an encoded delimiter
+
+        let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
+        storage
+            .put(&file_with_delimiter, Bytes::from("arbitrary"))
+            .await
+            .unwrap();
+
+        let files = flatten_list_stream(storage, None).await.unwrap();
+        assert_eq!(files, vec![file_with_delimiter.clone()]);
+
+        let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
+            .await
+            .unwrap();
+        assert!(files.is_empty());
+
+        let files = storage
+            .list_with_delimiter(Some(&Path::from("a/b")))
+            .await
+            .unwrap();
+        assert!(files.common_prefixes.is_empty());
+        assert!(files.objects.is_empty());
+
+        let files = storage
+            .list_with_delimiter(Some(&Path::from("a")))
+            .await
+            .unwrap();
+        assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
+        assert!(files.objects.is_empty());
+
+        let files = storage
+            .list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
+            .await
+            .unwrap();
+        assert!(files.common_prefixes.is_empty());
+        assert_eq!(files.objects.len(), 1);
+        assert_eq!(files.objects[0].location, file_with_delimiter);
+
+        storage.delete(&file_with_delimiter).await.unwrap();
+
+        // Test handling of paths containing non-ASCII characters, e.g. emoji
+
+        let emoji_prefix = Path::from("🙀");
+        let emoji_file = Path::from("🙀/😀.parquet");
+        storage
+            .put(&emoji_file, Bytes::from("arbitrary"))
+            .await
+            .unwrap();
+
+        storage.head(&emoji_file).await.unwrap();
+        storage
+            .get(&emoji_file)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+
+        let files = flatten_list_stream(storage, Some(&emoji_prefix))
+            .await
+            .unwrap();
+
+        assert_eq!(files, vec![emoji_file.clone()]);
+
+        storage.delete(&emoji_file).await.unwrap();
+        let files = flatten_list_stream(storage, Some(&emoji_prefix))
+            .await
+            .unwrap();
+        assert!(files.is_empty());
+
+        Ok(())
+    }
+
+    pub(crate) async fn list_uses_directories_correctly(
+        storage: &DynObjectStore,
+    ) -> Result<()> {
+        delete_fixtures(storage).await;
+
+        let content_list = flatten_list_stream(storage, None).await?;
+        assert!(
+            content_list.is_empty(),
+            "Expected list to be empty; found: {:?}",
+            content_list
+        );
+
+        let location1 = Path::from("foo/x.json");
+        let location2 = Path::from("foo.bar/y.json");
+
+        let data = Bytes::from("arbitrary data");
+        storage.put(&location1, data.clone()).await?;
+        storage.put(&location2, data).await?;
+
+        let prefix = Path::from("foo");
+        let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+        assert_eq!(content_list, &[location1.clone()]);
+
+        let prefix = Path::from("foo/x");
+        let content_list = flatten_list_stream(storage, Some(&prefix)).await?;
+        assert_eq!(content_list, &[]);
+
+        Ok(())
+    }
+
+    pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) -> Result<()> {
+        delete_fixtures(storage).await;
+
+        // ==================== check: store is empty ====================
+        let content_list = flatten_list_stream(storage, None).await?;
+        assert!(content_list.is_empty());
+
+        // ==================== do: create files ====================
+        let data = Bytes::from("arbitrary data");
+
+        let files: Vec<_> = [
+            "test_file",
+            "mydb/wb/000/000/000.segment",
+            "mydb/wb/000/000/001.segment",
+            "mydb/wb/000/000/002.segment",
+            "mydb/wb/001/001/000.segment",
+            "mydb/wb/foo.json",
+            "mydb/wbwbwb/111/222/333.segment",
+            "mydb/data/whatevs",
+        ]
+        .iter()
+        .map(|&s| Path::from(s))
+        .collect();
+
+        for f in &files {
+            let data = data.clone();
+            storage.put(f, data).await.unwrap();
+        }
+
+        // ==================== check: prefix-list `mydb/wb` (directory) ====================
+        let prefix = Path::from("mydb/wb");
+
+        let expected_000 = Path::from("mydb/wb/000");
+        let expected_001 = Path::from("mydb/wb/001");
+        let expected_location = Path::from("mydb/wb/foo.json");
+
+        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+
+        assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
+        assert_eq!(result.objects.len(), 1);
+
+        let object = &result.objects[0];
+
+        assert_eq!(object.location, expected_location);
+        assert_eq!(object.size, data.len());
+
+        // ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
+        let prefix = Path::from("mydb/wb/000/000/001");
+
+        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+        assert!(result.common_prefixes.is_empty());
+        assert_eq!(result.objects.len(), 0);
+
+        // ==================== check: prefix-list `not_there` (non-existing prefix) ====================
+        let prefix = Path::from("not_there");
+
+        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+        assert!(result.common_prefixes.is_empty());
+        assert!(result.objects.is_empty());
+
+        // ==================== do: remove all files ====================
+        for f in &files {
+            storage.delete(f).await.unwrap();
+        }
+
+        // ==================== check: store is empty ====================
+        let content_list = flatten_list_stream(storage, None).await?;
+        assert!(content_list.is_empty());
+
+        Ok(())
+    }
+
+    pub(crate) async fn get_nonexistent_object(
+        storage: &DynObjectStore,
+        location: Option<Path>,
+    ) -> crate::Result<Bytes> {
+        let location =
+            location.unwrap_or_else(|| Path::from("this_file_should_not_exist"));
+
+        let err = storage.head(&location).await.unwrap_err();
+        assert!(matches!(err, crate::Error::NotFound { .. }));
+
+        storage.get(&location).await?.bytes().await
+    }
+
+    pub(crate) async fn rename_and_copy(storage: &DynObjectStore) -> Result<()> {
+        // Create two objects
+        let path1 = Path::from("test1");
+        let path2 = Path::from("test2");
+        let contents1 = Bytes::from("cats");
+        let contents2 = Bytes::from("dogs");
+
+        // copy() make both objects identical
+        storage.put(&path1, contents1.clone()).await?;
+        storage.put(&path2, contents2.clone()).await?;
+        storage.copy(&path1, &path2).await?;
+        let new_contents = storage.get(&path2).await?.bytes().await?;
+        assert_eq!(&new_contents, &contents1);
+
+        // rename() copies contents and deletes original
+        storage.put(&path1, contents1.clone()).await?;
+        storage.put(&path2, contents2.clone()).await?;
+        storage.rename(&path1, &path2).await?;
+        let new_contents = storage.get(&path2).await?.bytes().await?;
+        assert_eq!(&new_contents, &contents1);
+        let result = storage.get(&path1).await;
+        assert!(result.is_err());
+        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+        // Clean up
+        storage.delete(&path2).await?;
+
+        Ok(())
+    }
+
+    pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) -> Result<()> {
+        // Create two objects
+        let path1 = Path::from("test1");
+        let path2 = Path::from("test2");
+        let contents1 = Bytes::from("cats");
+        let contents2 = Bytes::from("dogs");
+
+        // copy_if_not_exists() errors if destination already exists
+        storage.put(&path1, contents1.clone()).await?;
+        storage.put(&path2, contents2.clone()).await?;
+        let result = storage.copy_if_not_exists(&path1, &path2).await;
+        assert!(result.is_err());
+        assert!(matches!(
+            result.unwrap_err(),
+            crate::Error::AlreadyExists { .. }
+        ));
+
+        // copy_if_not_exists() copies contents and allows deleting original
+        storage.delete(&path2).await?;
+        storage.copy_if_not_exists(&path1, &path2).await?;
+        storage.delete(&path1).await?;
+        let new_contents = storage.get(&path2).await?.bytes().await?;
+        assert_eq!(&new_contents, &contents1);
+        let result = storage.get(&path1).await;
+        assert!(result.is_err());
+        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+        // Clean up
+        storage.delete(&path2).await?;
+
+        Ok(())
+    }
+
+    async fn delete_fixtures(storage: &DynObjectStore) {
+        let paths = flatten_list_stream(storage, None).await.unwrap();
+
+        for f in &paths {
+            let _ = storage.delete(f).await;
+        }
+    }
+
+    /// Test that the returned stream does not borrow the lifetime of Path
+    async fn list_store<'a, 'b>(
+        store: &'a dyn ObjectStore,
+        path_str: &'b str,
+    ) -> super::Result<BoxStream<'a, super::Result<ObjectMeta>>> {
+        let path = Path::from(path_str);
+        store.list(Some(&path)).await
+    }
+
+    #[tokio::test]
+    async fn test_list_lifetimes() {
+        let store = memory::InMemory::new();
+        let stream = list_store(&store, "path").await.unwrap();
+        assert_eq!(stream.count().await, 0);
+    }
+
+    // Tests TODO:
+    // GET nonexisting location (in_memory/file)
+    // DELETE nonexisting location
+    // PUT overwriting
+}
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
new file mode 100644
index 0000000..8a9462e
--- /dev/null
+++ b/object_store/src/local.rs
@@ -0,0 +1,773 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for a local filesystem
+use crate::{
+    maybe_spawn_blocking,
+    path::{filesystem_path_to_url, Path},
+    GetResult, ListResult, ObjectMeta, ObjectStore, Result,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::{stream::BoxStream, StreamExt};
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
+use std::collections::VecDeque;
+use std::fs::File;
+use std::io::{Read, Seek, SeekFrom, Write};
+use std::ops::Range;
+use std::sync::Arc;
+use std::{collections::BTreeSet, convert::TryFrom, io};
+use url::Url;
+use walkdir::{DirEntry, WalkDir};
+
+/// A specialized `Error` for filesystem object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub(crate) enum Error {
+    #[snafu(display("File size for {} did not fit in a usize: {}", path, source))]
+    FileSizeOverflowedUsize {
+        source: std::num::TryFromIntError,
+        path: String,
+    },
+
+    #[snafu(display("Unable to walk dir: {}", source))]
+    UnableToWalkDir {
+        source: walkdir::Error,
+    },
+
+    #[snafu(display("Unable to access metadata for {}: {}", path, source))]
+    UnableToAccessMetadata {
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+        path: String,
+    },
+
+    #[snafu(display("Unable to copy data to file: {}", source))]
+    UnableToCopyDataToFile {
+        source: io::Error,
+    },
+
+    #[snafu(display("Unable to create dir {}: {}", path.display(), source))]
+    UnableToCreateDir {
+        source: io::Error,
+        path: std::path::PathBuf,
+    },
+
+    #[snafu(display("Unable to create file {}: {}", path.display(), err))]
+    UnableToCreateFile {
+        path: std::path::PathBuf,
+        err: io::Error,
+    },
+
+    #[snafu(display("Unable to delete file {}: {}", path.display(), source))]
+    UnableToDeleteFile {
+        source: io::Error,
+        path: std::path::PathBuf,
+    },
+
+    #[snafu(display("Unable to open file {}: {}", path.display(), source))]
+    UnableToOpenFile {
+        source: io::Error,
+        path: std::path::PathBuf,
+    },
+
+    #[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
+    UnableToReadBytes {
+        source: io::Error,
+        path: std::path::PathBuf,
+    },
+
+    #[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
+    OutOfRange {
+        path: std::path::PathBuf,
+        expected: usize,
+        actual: usize,
+    },
+
+    #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
+    UnableToCopyFile {
+        from: std::path::PathBuf,
+        to: std::path::PathBuf,
+        source: io::Error,
+    },
+
+    NotFound {
+        path: std::path::PathBuf,
+        source: io::Error,
+    },
+
+    #[snafu(display("Error seeking file {}: {}", path.display(), source))]
+    Seek {
+        source: io::Error,
+        path: std::path::PathBuf,
+    },
+
+    #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
+    InvalidUrl {
+        url: Url,
+    },
+
+    AlreadyExists {
+        path: String,
+        source: io::Error,
+    },
+}
+
+impl From<Error> for super::Error {
+    fn from(source: Error) -> Self {
+        match source {
+            Error::NotFound { path, source } => Self::NotFound {
+                path: path.to_string_lossy().to_string(),
+                source: source.into(),
+            },
+            Error::AlreadyExists { path, source } => Self::AlreadyExists {
+                path,
+                source: source.into(),
+            },
+            _ => Self::Generic {
+                store: "LocalFileSystem",
+                source: Box::new(source),
+            },
+        }
+    }
+}
+
+/// Local filesystem storage providing an [`ObjectStore`] interface to files on
+/// local disk. Can optionally be created with a directory prefix
+///
+/// # Path Semantics
+///
+/// This implementation follows the [file URI] scheme outlined in [RFC 3986]. In
+/// particular paths are delimited by `/`
+///
+/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+/// [RFC 3986]: https://www.rfc-editor.org/rfc/rfc3986
+///
+/// # Tokio Compatibility
+///
+/// Tokio discourages performing blocking IO on a tokio worker thread, however,
+/// no major operating systems have stable async file APIs. Therefore if called from
+/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
+/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
+///
+/// If not called from a tokio context, this will perform IO on the current thread with
+/// no additional complexity or overheads
+#[derive(Debug)]
+pub struct LocalFileSystem {
+    config: Arc<Config>,
+}
+
+#[derive(Debug)]
+struct Config {
+    root: Url,
+}
+
+impl std::fmt::Display for LocalFileSystem {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "LocalFileSystem({})", self.config.root)
+    }
+}
+
+impl Default for LocalFileSystem {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl LocalFileSystem {
+    /// Create new filesystem storage with no prefix
+    pub fn new() -> Self {
+        Self {
+            config: Arc::new(Config {
+                root: Url::parse("file:///").unwrap(),
+            }),
+        }
+    }
+
+    /// Create new filesystem storage with `prefix` applied to all paths
+    pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
+        Ok(Self {
+            config: Arc::new(Config {
+                root: filesystem_path_to_url(prefix)?,
+            }),
+        })
+    }
+}
+
+impl Config {
+    /// Return filesystem path of the given location
+    fn path_to_filesystem(&self, location: &Path) -> Result<std::path::PathBuf> {
+        let mut url = self.root.clone();
+        url.path_segments_mut()
+            .expect("url path")
+            .extend(location.parts());
+
+        url.to_file_path()
+            .map_err(|_| Error::InvalidUrl { url }.into())
+    }
+
+    fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
+        Ok(Path::from_filesystem_path_with_base(
+            location,
+            Some(&self.root),
+        )?)
+    }
+}
+
+#[async_trait]
+impl ObjectStore for LocalFileSystem {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let path = self.config.path_to_filesystem(location)?;
+
+        maybe_spawn_blocking(move || {
+            let mut file = match File::create(&path) {
+                Ok(f) => f,
+                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
+                    let parent = path
+                        .parent()
+                        .context(UnableToCreateFileSnafu { path: &path, err })?;
+                    std::fs::create_dir_all(&parent)
+                        .context(UnableToCreateDirSnafu { path: parent })?;
+
+                    match File::create(&path) {
+                        Ok(f) => f,
+                        Err(err) => {
+                            return Err(Error::UnableToCreateFile { path, err }.into())
+                        }
+                    }
+                }
+                Err(err) => return Err(Error::UnableToCreateFile { path, err }.into()),
+            };
+
+            file.write_all(&bytes)
+                .context(UnableToCopyDataToFileSnafu)?;
+
+            Ok(())
+        })
+        .await
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let path = self.config.path_to_filesystem(location)?;
+        maybe_spawn_blocking(move || {
+            let file = open_file(&path)?;
+            Ok(GetResult::File(file, path))
+        })
+        .await
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let path = self.config.path_to_filesystem(location)?;
+        maybe_spawn_blocking(move || {
+            let mut file = open_file(&path)?;
+            let to_read = range.end - range.start;
+            file.seek(SeekFrom::Start(range.start as u64))
+                .context(SeekSnafu { path: &path })?;
+
+            let mut buf = Vec::with_capacity(to_read);
+            let read = file
+                .take(to_read as u64)
+                .read_to_end(&mut buf)
+                .context(UnableToReadBytesSnafu { path: &path })?;
+
+            ensure!(
+                read == to_read,
+                OutOfRangeSnafu {
+                    path: &path,
+                    expected: to_read,
+                    actual: read
+                }
+            );
+
+            Ok(buf.into())
+        })
+        .await
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let path = self.config.path_to_filesystem(location)?;
+        let location = location.clone();
+
+        maybe_spawn_blocking(move || {
+            let file = open_file(&path)?;
+            let metadata =
+                file.metadata().map_err(|e| Error::UnableToAccessMetadata {
+                    source: e.into(),
+                    path: location.to_string(),
+                })?;
+
+            convert_metadata(metadata, location)
+        })
+        .await
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        let path = self.config.path_to_filesystem(location)?;
+        maybe_spawn_blocking(move || {
+            std::fs::remove_file(&path).context(UnableToDeleteFileSnafu { path })?;
+            Ok(())
+        })
+        .await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let config = Arc::clone(&self.config);
+
+        let root_path = match prefix {
+            Some(prefix) => config.path_to_filesystem(prefix)?,
+            None => self.config.root.to_file_path().unwrap(),
+        };
+
+        let walkdir = WalkDir::new(&root_path)
+            // Don't include the root directory itself
+            .min_depth(1);
+
+        let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
+            match convert_walkdir_result(result_dir_entry) {
+                Err(e) => Some(Err(e)),
+                Ok(None) => None,
+                Ok(entry @ Some(_)) => entry
+                    .filter(|dir_entry| dir_entry.file_type().is_file())
+                    .map(|entry| {
+                        let location = config.filesystem_to_path(entry.path())?;
+                        convert_entry(entry, location)
+                    }),
+            }
+        });
+
+        // If no tokio context, return iterator directly as no
+        // need to perform chunked spawn_blocking reads
+        if tokio::runtime::Handle::try_current().is_err() {
+            return Ok(futures::stream::iter(s).boxed());
+        }
+
+        // Otherwise list in batches of CHUNK_SIZE
+        const CHUNK_SIZE: usize = 1024;
+
+        let buffer = VecDeque::with_capacity(CHUNK_SIZE);
+        let stream =
+            futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
+                if buffer.is_empty() {
+                    (s, buffer) = tokio::task::spawn_blocking(move || {
+                        for _ in 0..CHUNK_SIZE {
+                            match s.next() {
+                                Some(r) => buffer.push_back(r),
+                                None => break,
+                            }
+                        }
+                        (s, buffer)
+                    })
+                    .await?;
+                }
+
+                match buffer.pop_front() {
+                    Some(Err(e)) => Err(e),
+                    Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
+                    None => Ok(None),
+                }
+            });
+
+        Ok(stream.boxed())
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let config = Arc::clone(&self.config);
+
+        let prefix = prefix.cloned().unwrap_or_default();
+        let resolved_prefix = config.path_to_filesystem(&prefix)?;
+
+        maybe_spawn_blocking(move || {
+            let walkdir = WalkDir::new(&resolved_prefix).min_depth(1).max_depth(1);
+
+            let mut common_prefixes = BTreeSet::new();
+            let mut objects = Vec::new();
+
+            for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
+                if let Some(entry) = entry_res? {
+                    let is_directory = entry.file_type().is_dir();
+                    let entry_location = config.filesystem_to_path(entry.path())?;
+
+                    let mut parts = match entry_location.prefix_match(&prefix) {
+                        Some(parts) => parts,
+                        None => continue,
+                    };
+
+                    let common_prefix = match parts.next() {
+                        Some(p) => p,
+                        None => continue,
+                    };
+
+                    drop(parts);
+
+                    if is_directory {
+                        common_prefixes.insert(prefix.child(common_prefix));
+                    } else {
+                        objects.push(convert_entry(entry, entry_location)?);
+                    }
+                }
+            }
+
+            Ok(ListResult {
+                common_prefixes: common_prefixes.into_iter().collect(),
+                objects,
+            })
+        })
+        .await
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let from = self.config.path_to_filesystem(from)?;
+        let to = self.config.path_to_filesystem(to)?;
+
+        maybe_spawn_blocking(move || {
+            std::fs::copy(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
+            Ok(())
+        })
+        .await
+    }
+
+    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+        let from = self.config.path_to_filesystem(from)?;
+        let to = self.config.path_to_filesystem(to)?;
+        maybe_spawn_blocking(move || {
+            std::fs::rename(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
+            Ok(())
+        })
+        .await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let from = self.config.path_to_filesystem(from)?;
+        let to = self.config.path_to_filesystem(to)?;
+
+        maybe_spawn_blocking(move || {
+            std::fs::hard_link(&from, &to).map_err(|err| match err.kind() {
+                io::ErrorKind::AlreadyExists => Error::AlreadyExists {
+                    path: to.to_str().unwrap().to_string(),
+                    source: err,
+                }
+                .into(),
+                _ => Error::UnableToCopyFile {
+                    from,
+                    to,
+                    source: err,
+                }
+                .into(),
+            })
+        })
+        .await
+    }
+}
+
+fn open_file(path: &std::path::PathBuf) -> Result<File> {
+    let file = File::open(path).map_err(|e| {
+        if e.kind() == std::io::ErrorKind::NotFound {
+            Error::NotFound {
+                path: path.clone(),
+                source: e,
+            }
+        } else {
+            Error::UnableToOpenFile {
+                path: path.clone(),
+                source: e,
+            }
+        }
+    })?;
+    Ok(file)
+}
+
+fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
+    let metadata = entry
+        .metadata()
+        .map_err(|e| Error::UnableToAccessMetadata {
+            source: e.into(),
+            path: location.to_string(),
+        })?;
+    convert_metadata(metadata, location)
+}
+
+fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result<ObjectMeta> {
+    let last_modified = metadata
+        .modified()
+        .expect("Modified file time should be supported on this platform")
+        .into();
+
+    let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
+        path: location.as_ref(),
+    })?;
+
+    Ok(ObjectMeta {
+        location,
+        last_modified,
+        size,
+    })
+}
+
+/// Convert walkdir results and converts not-found errors into `None`.
+fn convert_walkdir_result(
+    res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
+) -> Result<Option<walkdir::DirEntry>> {
+    match res {
+        Ok(entry) => Ok(Some(entry)),
+        Err(walkdir_err) => match walkdir_err.io_error() {
+            Some(io_err) => match io_err.kind() {
+                io::ErrorKind::NotFound => Ok(None),
+                _ => Err(Error::UnableToWalkDir {
+                    source: walkdir_err,
+                }
+                .into()),
+            },
+            None => Err(Error::UnableToWalkDir {
+                source: walkdir_err,
+            }
+            .into()),
+        },
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::test_util::flatten_list_stream;
+    use crate::{
+        tests::{
+            copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
+            list_with_delimiter, put_get_delete_list, rename_and_copy,
+        },
+        Error as ObjectStoreError, ObjectStore,
+    };
+    use tempfile::TempDir;
+
+    #[tokio::test]
+    async fn file_test() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        put_get_delete_list(&integration).await.unwrap();
+        list_uses_directories_correctly(&integration).await.unwrap();
+        list_with_delimiter(&integration).await.unwrap();
+        rename_and_copy(&integration).await.unwrap();
+        copy_if_not_exists(&integration).await.unwrap();
+    }
+
+    #[test]
+    fn test_non_tokio() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+        futures::executor::block_on(async move {
+            put_get_delete_list(&integration).await.unwrap();
+            list_uses_directories_correctly(&integration).await.unwrap();
+            list_with_delimiter(&integration).await.unwrap();
+        });
+    }
+
+    #[tokio::test]
+    async fn creates_dir_if_not_present() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        let location = Path::from("nested/file/test_file");
+
+        let data = Bytes::from("arbitrary data");
+        let expected_data = data.clone();
+
+        integration.put(&location, data).await.unwrap();
+
+        let read_data = integration
+            .get(&location)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        assert_eq!(&*read_data, expected_data);
+    }
+
+    #[tokio::test]
+    async fn unknown_length() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        let location = Path::from("some_file");
+
+        let data = Bytes::from("arbitrary data");
+        let expected_data = data.clone();
+
+        integration.put(&location, data).await.unwrap();
+
+        let read_data = integration
+            .get(&location)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        assert_eq!(&*read_data, expected_data);
+    }
+
+    #[tokio::test]
+    #[cfg(target_family = "unix")]
+    // Fails on github actions runner (which runs the tests as root)
+    #[ignore]
+    async fn bubble_up_io_errors() {
+        use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
+
+        let root = TempDir::new().unwrap();
+
+        // make non-readable
+        let metadata = root.path().metadata().unwrap();
+        let mut permissions = metadata.permissions();
+        permissions.set_mode(0o000);
+        set_permissions(root.path(), permissions).unwrap();
+
+        let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        // `list` must fail
+        match store.list(None).await {
+            Err(_) => {
+                // ok, error found
+            }
+            Ok(mut stream) => {
+                let mut any_err = false;
+                while let Some(res) = stream.next().await {
+                    if res.is_err() {
+                        any_err = true;
+                    }
+                }
+                assert!(any_err);
+            }
+        }
+
+        // `list_with_delimiter
+        assert!(store.list_with_delimiter(None).await.is_err());
+    }
+
+    const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+    #[tokio::test]
+    async fn get_nonexistent_location() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        let location = Path::from(NON_EXISTENT_NAME);
+
+        let err = get_nonexistent_object(&integration, Some(location))
+            .await
+            .unwrap_err();
+        if let ObjectStoreError::NotFound { path, source } = err {
+            let source_variant = source.downcast_ref::<std::io::Error>();
+            assert!(
+                matches!(source_variant, Some(std::io::Error { .. }),),
+                "got: {:?}",
+                source_variant
+            );
+            assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
+        } else {
+            panic!("unexpected error type: {:?}", err);
+        }
+    }
+
+    #[tokio::test]
+    async fn root() {
+        let integration = LocalFileSystem::new();
+
+        let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
+        let url = Url::from_directory_path(&canonical).unwrap();
+        let path = Path::parse(url.path()).unwrap();
+
+        let roundtrip = integration.config.path_to_filesystem(&path).unwrap();
+
+        // Needed as on Windows canonicalize returns extended length path syntax
+        // C:\Users\circleci -> \\?\C:\Users\circleci
+        let roundtrip = roundtrip.canonicalize().unwrap();
+
+        assert_eq!(roundtrip, canonical);
+
+        integration.head(&path).await.unwrap();
+    }
+
+    #[tokio::test]
+    #[cfg(target_os = "linux")]
+    // macos has some magic in its root '/.VolumeIcon.icns"' which causes this test to fail
+    async fn test_list_root() {
+        let integration = LocalFileSystem::new();
+        let result = integration.list_with_delimiter(None).await;
+        if cfg!(target_family = "windows") {
+            let r = result.unwrap_err().to_string();
+            assert!(
+                r.contains("Unable to convert URL \"file:///\" to filesystem path"),
+                "{}",
+                r
+            );
+        } else {
+            result.unwrap();
+        }
+    }
+
+    #[tokio::test]
+    async fn invalid_path() {
+        let root = TempDir::new().unwrap();
+        let root = root.path().join("🙀");
+        std::fs::create_dir(root.clone()).unwrap();
+
+        // Invalid paths supported above root of store
+        let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
+
+        let directory = Path::from("directory");
+        let object = directory.child("child.txt");
+        let data = Bytes::from("arbitrary");
+        integration.put(&object, data.clone()).await.unwrap();
+        integration.head(&object).await.unwrap();
+        let result = integration.get(&object).await.unwrap();
+        assert_eq!(result.bytes().await.unwrap(), data);
+
+        flatten_list_stream(&integration, None).await.unwrap();
+        flatten_list_stream(&integration, Some(&directory))
+            .await
+            .unwrap();
+
+        let result = integration
+            .list_with_delimiter(Some(&directory))
+            .await
+            .unwrap();
+        assert_eq!(result.objects.len(), 1);
+        assert!(result.common_prefixes.is_empty());
+        assert_eq!(result.objects[0].location, object);
+
+        let illegal = root.join("💀");
+        std::fs::write(illegal, "foo").unwrap();
+
+        // Can list directory that doesn't contain illegal path
+        flatten_list_stream(&integration, Some(&directory))
+            .await
+            .unwrap();
+
+        // Cannot list illegal file
+        let err = flatten_list_stream(&integration, None)
+            .await
+            .unwrap_err()
+            .to_string();
+
+        assert!(
+            err.contains("Invalid path segment - got \"💀\" expected: \"%F0%9F%92%80\""),
+            "{}",
+            err
+        );
+    }
+}
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
new file mode 100644
index 0000000..ffd8e3a
--- /dev/null
+++ b/object_store/src/memory.rs
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An in-memory object store implementation
+use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::Utc;
+use futures::{stream::BoxStream, StreamExt};
+use parking_lot::RwLock;
+use snafu::{ensure, OptionExt, Snafu};
+use std::collections::BTreeMap;
+use std::collections::BTreeSet;
+use std::ops::Range;
+
+/// A specialized `Error` for in-memory object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+    #[snafu(display("No data in memory found. Location: {path}"))]
+    NoDataInMemory { path: String },
+
+    #[snafu(display("Out of range"))]
+    OutOfRange,
+
+    #[snafu(display("Bad range"))]
+    BadRange,
+
+    #[snafu(display("Object already exists at that location: {path}"))]
+    AlreadyExists { path: String },
+}
+
+impl From<Error> for super::Error {
+    fn from(source: Error) -> Self {
+        match source {
+            Error::NoDataInMemory { ref path } => Self::NotFound {
+                path: path.into(),
+                source: source.into(),
+            },
+            Error::AlreadyExists { ref path } => Self::AlreadyExists {
+                path: path.into(),
+                source: source.into(),
+            },
+            _ => Self::Generic {
+                store: "InMemory",
+                source: Box::new(source),
+            },
+        }
+    }
+}
+
+/// In-memory storage suitable for testing or for opting out of using a cloud
+/// storage provider.
+#[derive(Debug, Default)]
+pub struct InMemory {
+    storage: RwLock<BTreeMap<Path, Bytes>>,
+}
+
+impl std::fmt::Display for InMemory {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "InMemory")
+    }
+}
+
+#[async_trait]
+impl ObjectStore for InMemory {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        self.storage.write().insert(location.clone(), bytes);
+        Ok(())
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let data = self.get_bytes(location).await?;
+
+        Ok(GetResult::Stream(
+            futures::stream::once(async move { Ok(data) }).boxed(),
+        ))
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let data = self.get_bytes(location).await?;
+        ensure!(range.end <= data.len(), OutOfRangeSnafu);
+        ensure!(range.start <= range.end, BadRangeSnafu);
+
+        Ok(data.slice(range))
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let last_modified = Utc::now();
+        let bytes = self.get_bytes(location).await?;
+        Ok(ObjectMeta {
+            location: location.clone(),
+            last_modified,
+            size: bytes.len(),
+        })
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        self.storage.write().remove(location);
+        Ok(())
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let last_modified = Utc::now();
+
+        let storage = self.storage.read();
+        let values: Vec<_> = storage
+            .iter()
+            .filter(move |(key, _)| prefix.map(|p| key.prefix_matches(p)).unwrap_or(true))
+            .map(move |(key, value)| {
+                Ok(ObjectMeta {
+                    location: key.clone(),
+                    last_modified,
+                    size: value.len(),
+                })
+            })
+            .collect();
+
+        Ok(futures::stream::iter(values).boxed())
+    }
+
+    /// The memory implementation returns all results, as opposed to the cloud
+    /// versions which limit their results to 1k or more because of API
+    /// limitations.
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        let root = Path::default();
+        let prefix = prefix.unwrap_or(&root);
+
+        let mut common_prefixes = BTreeSet::new();
+        let last_modified = Utc::now();
+
+        // Only objects in this base level should be returned in the
+        // response. Otherwise, we just collect the common prefixes.
+        let mut objects = vec![];
+        for (k, v) in self.storage.read().range((prefix)..) {
+            let mut parts = match k.prefix_match(prefix) {
+                Some(parts) => parts,
+                None => break,
+            };
+
+            // Pop first element
+            let common_prefix = match parts.next() {
+                Some(p) => p,
+                None => continue,
+            };
+
+            if parts.next().is_some() {
+                common_prefixes.insert(prefix.child(common_prefix));
+            } else {
+                let object = ObjectMeta {
+                    location: k.clone(),
+                    last_modified,
+                    size: v.len(),
+                };
+                objects.push(object);
+            }
+        }
+
+        Ok(ListResult {
+            objects,
+            common_prefixes: common_prefixes.into_iter().collect(),
+        })
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let data = self.get_bytes(from).await?;
+        self.storage.write().insert(to.clone(), data);
+        Ok(())
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let data = self.get_bytes(from).await?;
+        let mut storage = self.storage.write();
+        if storage.contains_key(to) {
+            return Err(Error::AlreadyExists {
+                path: to.to_string(),
+            }
+            .into());
+        }
+        storage.insert(to.clone(), data);
+        Ok(())
+    }
+}
+
+impl InMemory {
+    /// Create new in-memory storage.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Creates a clone of the store
+    pub async fn clone(&self) -> Self {
+        let storage = self.storage.read();
+        let storage = storage.clone();
+
+        Self {
+            storage: RwLock::new(storage),
+        }
+    }
+
+    async fn get_bytes(&self, location: &Path) -> Result<Bytes> {
+        let storage = self.storage.read();
+        let bytes = storage
+            .get(location)
+            .cloned()
+            .context(NoDataInMemorySnafu {
+                path: location.to_string(),
+            })?;
+        Ok(bytes)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use crate::{
+        tests::{
+            copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly,
+            list_with_delimiter, put_get_delete_list, rename_and_copy,
+        },
+        Error as ObjectStoreError, ObjectStore,
+    };
+
+    #[tokio::test]
+    async fn in_memory_test() {
+        let integration = InMemory::new();
+
+        put_get_delete_list(&integration).await.unwrap();
+        list_uses_directories_correctly(&integration).await.unwrap();
+        list_with_delimiter(&integration).await.unwrap();
+        rename_and_copy(&integration).await.unwrap();
+        copy_if_not_exists(&integration).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn unknown_length() {
+        let integration = InMemory::new();
+
+        let location = Path::from("some_file");
+
+        let data = Bytes::from("arbitrary data");
+        let expected_data = data.clone();
+
+        integration.put(&location, data).await.unwrap();
+
+        let read_data = integration
+            .get(&location)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        assert_eq!(&*read_data, expected_data);
+    }
+
+    const NON_EXISTENT_NAME: &str = "nonexistentname";
+
+    #[tokio::test]
+    async fn nonexistent_location() {
+        let integration = InMemory::new();
+
+        let location = Path::from(NON_EXISTENT_NAME);
+
+        let err = get_nonexistent_object(&integration, Some(location))
+            .await
+            .unwrap_err();
+        if let ObjectStoreError::NotFound { path, source } = err {
+            let source_variant = source.downcast_ref::<Error>();
+            assert!(
+                matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
+                "got: {:?}",
+                source_variant
+            );
+            assert_eq!(path, NON_EXISTENT_NAME);
+        } else {
+            panic!("unexpected error type: {:?}", err);
+        }
+    }
+}
diff --git a/object_store/src/oauth.rs b/object_store/src/oauth.rs
new file mode 100644
index 0000000..273e37b
--- /dev/null
+++ b/object_store/src/oauth.rs
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::token::TemporaryToken;
+use reqwest::{Client, Method};
+use ring::signature::RsaKeyPair;
+use snafu::{ResultExt, Snafu};
+use std::time::{Duration, Instant};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("No RSA key found in pem file"))]
+    MissingKey,
+
+    #[snafu(display("Invalid RSA key: {}", source), context(false))]
+    InvalidKey { source: ring::error::KeyRejected },
+
+    #[snafu(display("Error signing jwt: {}", source))]
+    Sign { source: ring::error::Unspecified },
+
+    #[snafu(display("Error encoding jwt payload: {}", source))]
+    Encode { source: serde_json::Error },
+
+    #[snafu(display("Unsupported key encoding: {}", encoding))]
+    UnsupportedKey { encoding: String },
+
+    #[snafu(display("Error performing token request: {}", source))]
+    TokenRequest { source: reqwest::Error },
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
+#[derive(Debug, Default, serde::Serialize)]
+pub struct JwtHeader {
+    /// The type of JWS: it can only be "JWT" here
+    ///
+    /// Defined in [RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9).
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub typ: Option<String>,
+    /// The algorithm used
+    ///
+    /// Defined in [RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1).
+    pub alg: String,
+    /// Content type
+    ///
+    /// Defined in [RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2).
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub cty: Option<String>,
+    /// JSON Key URL
+    ///
+    /// Defined in [RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2).
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub jku: Option<String>,
+    /// Key ID
+    ///
+    /// Defined in [RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4).
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub kid: Option<String>,
+    /// X.509 URL
+    ///
+    /// Defined in [RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5).
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub x5u: Option<String>,
+    /// X.509 certificate thumbprint
+    ///
+    /// Defined in [RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7).
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub x5t: Option<String>,
+}
+
+#[derive(serde::Serialize)]
+struct TokenClaims<'a> {
+    iss: &'a str,
+    scope: &'a str,
+    aud: &'a str,
+    exp: u64,
+    iat: u64,
+}
+
+#[derive(serde::Deserialize, Debug)]
+struct TokenResponse {
+    access_token: String,
+    expires_in: u64,
+}
+
+/// Encapsulates the logic to perform an OAuth token challenge
+#[derive(Debug)]
+pub struct OAuthProvider {
+    issuer: String,
+    scope: String,
+    audience: String,
+    key_pair: RsaKeyPair,
+    jwt_header: String,
+    random: ring::rand::SystemRandom,
+}
+
+impl OAuthProvider {
+    /// Create a new [`OAuthProvider`]
+    pub fn new(
+        issuer: String,
+        private_key_pem: String,
+        scope: String,
+        audience: String,
+    ) -> Result<Self> {
+        let key_pair = decode_first_rsa_key(private_key_pem)?;
+        let jwt_header = b64_encode_obj(&JwtHeader {
+            alg: "RS256".to_string(),
+            ..Default::default()
+        })?;
+
+        Ok(Self {
+            issuer,
+            key_pair,
+            scope,
+            audience,
+            jwt_header,
+            random: ring::rand::SystemRandom::new(),
+        })
+    }
+
+    /// Fetch a fresh token
+    pub async fn fetch_token(&self, client: &Client) -> Result<TemporaryToken<String>> {
+        let now = seconds_since_epoch();
+        let exp = now + 3600;
+
+        let claims = TokenClaims {
+            iss: &self.issuer,
+            scope: &self.scope,
+            aud: &self.audience,
+            exp,
+            iat: now,
+        };
+
+        let claim_str = b64_encode_obj(&claims)?;
+        let message = [self.jwt_header.as_ref(), claim_str.as_ref()].join(".");
+        let mut sig_bytes = vec![0; self.key_pair.public_modulus_len()];
+        self.key_pair
+            .sign(
+                &ring::signature::RSA_PKCS1_SHA256,
+                &self.random,
+                message.as_bytes(),
+                &mut sig_bytes,
+            )
+            .context(SignSnafu)?;
+
+        let signature = base64::encode_config(&sig_bytes, base64::URL_SAFE_NO_PAD);
+        let jwt = [message, signature].join(".");
+
+        let body = [
+            ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
+            ("assertion", &jwt),
+        ];
+
+        let response: TokenResponse = client
+            .request(Method::POST, &self.audience)
+            .form(&body)
+            .send()
+            .await
+            .context(TokenRequestSnafu)?
+            .error_for_status()
+            .context(TokenRequestSnafu)?
+            .json()
+            .await
+            .context(TokenRequestSnafu)?;
+
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+
+        Ok(token)
+    }
+}
+
+/// Returns the number of seconds since unix epoch
+fn seconds_since_epoch() -> u64 {
+    std::time::SystemTime::now()
+        .duration_since(std::time::SystemTime::UNIX_EPOCH)
+        .unwrap()
+        .as_secs()
+}
+
+fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
+    use rustls_pemfile::Item;
+    use std::io::{BufReader, Cursor};
+
+    let mut cursor = Cursor::new(private_key_pem);
+    let mut reader = BufReader::new(&mut cursor);
+
+    // Reading from string is infallible
+    match rustls_pemfile::read_one(&mut reader).unwrap() {
+        Some(Item::PKCS8Key(key)) => Ok(RsaKeyPair::from_pkcs8(&key)?),
+        Some(Item::RSAKey(key)) => Ok(RsaKeyPair::from_der(&key)?),
+        _ => Err(Error::MissingKey),
+    }
+}
+
+fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
+    let string = serde_json::to_string(obj).context(EncodeSnafu)?;
+    Ok(base64::encode_config(string, base64::URL_SAFE_NO_PAD))
+}
diff --git a/object_store/src/path/mod.rs b/object_store/src/path/mod.rs
new file mode 100644
index 0000000..23488ef
--- /dev/null
+++ b/object_store/src/path/mod.rs
@@ -0,0 +1,531 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Path abstraction for Object Storage
+
+use itertools::Itertools;
+use percent_encoding::percent_decode;
+use snafu::{ensure, ResultExt, Snafu};
+use std::fmt::Formatter;
+use url::Url;
+
+/// The delimiter to separate object namespaces, creating a directory structure.
+pub const DELIMITER: &str = "/";
+
+/// The path delimiter as a single byte
+pub const DELIMITER_BYTE: u8 = DELIMITER.as_bytes()[0];
+
+mod parts;
+
+pub use parts::{InvalidPart, PathPart};
+
+/// Error returned by [`Path::parse`]
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub enum Error {
+    #[snafu(display("Path \"{}\" contained empty path segment", path))]
+    EmptySegment { path: String },
+
+    #[snafu(display("Error parsing Path \"{}\": {}", path, source))]
+    BadSegment { path: String, source: InvalidPart },
+
+    #[snafu(display("Failed to canonicalize path \"{}\": {}", path.display(), source))]
+    Canonicalize {
+        path: std::path::PathBuf,
+        source: std::io::Error,
+    },
+
+    #[snafu(display("Unable to convert path \"{}\" to URL", path.display()))]
+    InvalidPath { path: std::path::PathBuf },
+
+    #[snafu(display("Path \"{}\" contained non-unicode characters: {}", path, source))]
+    NonUnicode {
+        path: String,
+        source: std::str::Utf8Error,
+    },
+
+    #[snafu(display("Path {} does not start with prefix {}", path, prefix))]
+    PrefixMismatch { path: String, prefix: String },
+}
+
+/// A parsed path representation that can be safely written to object storage
+///
+/// # Path Safety
+///
+/// In theory object stores support any UTF-8 character sequence, however, certain character
+/// sequences cause compatibility problems with some applications and protocols. As such the
+/// naming guidelines for [S3], [GCS] and [Azure Blob Storage] all recommend sticking to a
+/// limited character subset.
+///
+/// [S3]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
+/// [GCS]: https://cloud.google.com/storage/docs/naming-objects
+/// [Azure Blob Storage]: https://docs.microsoft.com/en-us/rest/api/storageservices/Naming-and-Referencing-Containers--Blobs--and-Metadata#blob-names
+///
+/// This presents libraries with two options for consistent path handling:
+///
+/// 1. Allow constructing unsafe paths, allowing for both reading and writing of data to paths
+/// that may not be consistently understood or supported
+/// 2. Disallow constructing unsafe paths, ensuring data written can be consistently handled by
+/// all other systems, but preventing interaction with objects at unsafe paths
+///
+/// This library takes the second approach, in particular:
+///
+/// * Paths are delimited by `/`
+/// * Paths do not start with a `/`
+/// * Empty path segments are discarded (e.g. `//` is treated as though it were `/`)
+/// * Relative path segments, i.e. `.` and `..` are percent encoded
+/// * Unsafe characters are percent encoded, as described by [RFC 1738]
+/// * All paths are relative to the root of the object store
+///
+/// In order to provide these guarantees there are two ways to safely construct a [`Path`]
+///
+/// # Encode
+///
+/// A string containing potentially illegal path segments can be encoded to a [`Path`]
+/// using [`Path::from`] or [`Path::from_iter`].
+///
+/// ```
+/// # use object_store::path::Path;
+/// assert_eq!(Path::from("foo/bar").as_ref(), "foo/bar");
+/// assert_eq!(Path::from("foo//bar").as_ref(), "foo/bar");
+/// assert_eq!(Path::from("foo/../bar").as_ref(), "foo/%2E%2E/bar");
+/// assert_eq!(Path::from_iter(["foo", "foo/bar"]).as_ref(), "foo/foo%2Fbar");
+/// ```
+///
+/// Note: if provided with an already percent encoded string, this will encode it again
+///
+/// ```
+/// # use object_store::path::Path;
+/// assert_eq!(Path::from("foo/foo%2Fbar").as_ref(), "foo/foo%252Fbar");
+/// ```
+///
+/// # Parse
+///
+/// Alternatively a [`Path`] can be created from an existing string, returning an
+/// error if it is invalid. Unlike the encoding methods, this will permit
+/// valid percent encoded sequences.
+///
+/// ```
+/// # use object_store::path::Path;
+///
+/// assert_eq!(Path::parse("/foo/foo%2Fbar").unwrap().as_ref(), "foo/foo%2Fbar");
+/// Path::parse("..").unwrap_err();
+/// Path::parse("/foo//").unwrap_err();
+/// Path::parse("😀").unwrap_err();
+/// Path::parse("%Q").unwrap_err();
+/// ```
+///
+/// [RFC 1738]: https://www.ietf.org/rfc/rfc1738.txt
+#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Ord, PartialOrd)]
+pub struct Path {
+    /// The raw path with no leading or trailing delimiters
+    raw: String,
+}
+
+impl Path {
+    /// Parse a string as a [`Path`], returning a [`Error`] if invalid,
+    /// as defined on the docstring for [`Path`]
+    ///
+    /// Note: this will strip any leading `/` or trailing `/`
+    pub fn parse(path: impl AsRef<str>) -> Result<Self, Error> {
+        let path = path.as_ref();
+
+        let stripped = path.strip_prefix(DELIMITER).unwrap_or(path);
+        if stripped.is_empty() {
+            return Ok(Default::default());
+        }
+
+        let stripped = stripped.strip_suffix(DELIMITER).unwrap_or(stripped);
+
+        for segment in stripped.split(DELIMITER) {
+            ensure!(!segment.is_empty(), EmptySegmentSnafu { path });
+            PathPart::parse(segment).context(BadSegmentSnafu { path })?;
+        }
+
+        Ok(Self {
+            raw: stripped.to_string(),
+        })
+    }
+
+    /// Convert a filesystem path to a [`Path`] relative to the filesystem root
+    ///
+    /// This will return an error if the path does not exist, or contains illegal
+    /// character sequences as defined by [`Path::parse`]
+    pub fn from_filesystem_path(
+        path: impl AsRef<std::path::Path>,
+    ) -> Result<Self, Error> {
+        Self::from_filesystem_path_with_base(path, None)
+    }
+
+    /// Convert a filesystem path to a [`Path`] relative to the provided base
+    ///
+    /// This will return an error if the path does not exist on the local filesystem,
+    /// contains illegal character sequences as defined by [`Path::parse`], or `base`
+    /// does not refer to a parent path of `path`
+    pub(crate) fn from_filesystem_path_with_base(
+        path: impl AsRef<std::path::Path>,
+        base: Option<&Url>,
+    ) -> Result<Self, Error> {
+        let url = filesystem_path_to_url(path)?;
+        let path = match base {
+            Some(prefix) => url.path().strip_prefix(prefix.path()).ok_or_else(|| {
+                Error::PrefixMismatch {
+                    path: url.path().to_string(),
+                    prefix: prefix.to_string(),
+                }
+            })?,
+            None => url.path(),
+        };
+
+        // Reverse any percent encoding performed by conversion to URL
+        let decoded = percent_decode(path.as_bytes())
+            .decode_utf8()
+            .context(NonUnicodeSnafu { path })?;
+
+        Self::parse(decoded)
+    }
+
+    /// Returns the [`PathPart`] of this [`Path`]
+    pub fn parts(&self) -> impl Iterator<Item = PathPart<'_>> {
+        match self.raw.is_empty() {
+            true => itertools::Either::Left(std::iter::empty()),
+            false => itertools::Either::Right(
+                self.raw
+                    .split(DELIMITER)
+                    .map(|s| PathPart { raw: s.into() }),
+            ),
+        }
+    }
+
+    /// Returns an iterator of the [`PathPart`] of this [`Path`] after `prefix`
+    ///
+    /// Returns `None` if the prefix does not match
+    pub fn prefix_match(
+        &self,
+        prefix: &Self,
+    ) -> Option<impl Iterator<Item = PathPart<'_>> + '_> {
+        let diff = itertools::diff_with(self.parts(), prefix.parts(), |a, b| a == b);
+
+        match diff {
+            // Both were equal
+            None => Some(itertools::Either::Left(std::iter::empty())),
+            // Mismatch or prefix was longer => None
+            Some(
+                itertools::Diff::FirstMismatch(_, _, _) | itertools::Diff::Longer(_, _),
+            ) => None,
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, back)) => {
+                Some(itertools::Either::Right(back))
+            }
+        }
+    }
+
+    /// Returns true if this [`Path`] starts with `prefix`
+    pub fn prefix_matches(&self, prefix: &Self) -> bool {
+        self.prefix_match(prefix).is_some()
+    }
+
+    /// Creates a new child of this [`Path`]
+    pub fn child<'a>(&self, child: impl Into<PathPart<'a>>) -> Self {
+        let raw = match self.raw.is_empty() {
+            true => format!("{}", child.into().raw),
+            false => format!("{}{}{}", self.raw, DELIMITER, child.into().raw),
+        };
+
+        Self { raw }
+    }
+}
+
+impl AsRef<str> for Path {
+    fn as_ref(&self) -> &str {
+        &self.raw
+    }
+}
+
+impl From<&str> for Path {
+    fn from(path: &str) -> Self {
+        Self::from_iter(path.split(DELIMITER))
+    }
+}
+
+impl From<String> for Path {
+    fn from(path: String) -> Self {
+        Self::from_iter(path.split(DELIMITER))
+    }
+}
+
+impl From<Path> for String {
+    fn from(path: Path) -> Self {
+        path.raw
+    }
+}
+
+impl std::fmt::Display for Path {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        self.raw.fmt(f)
+    }
+}
+
+impl<'a, I> FromIterator<I> for Path
+where
+    I: Into<PathPart<'a>>,
+{
+    fn from_iter<T: IntoIterator<Item = I>>(iter: T) -> Self {
+        let raw = T::into_iter(iter)
+            .map(|s| s.into())
+            .filter(|s| !s.raw.is_empty())
+            .map(|s| s.raw)
+            .join(DELIMITER);
+
+        Self { raw }
+    }
+}
+
+/// Given a filesystem path, convert it to its canonical URL representation,
+/// returning an error if the file doesn't exist on the local filesystem
+pub(crate) fn filesystem_path_to_url(
+    path: impl AsRef<std::path::Path>,
+) -> Result<Url, Error> {
+    let path = path.as_ref().canonicalize().context(CanonicalizeSnafu {
+        path: path.as_ref(),
+    })?;
+
+    match path.is_dir() {
+        true => Url::from_directory_path(&path),
+        false => Url::from_file_path(&path),
+    }
+    .map_err(|_| Error::InvalidPath { path })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn cloud_prefix_with_trailing_delimiter() {
+        // Use case: files exist in object storage named `foo/bar.json` and
+        // `foo_test.json`. A search for the prefix `foo/` should return
+        // `foo/bar.json` but not `foo_test.json'.
+        let prefix = Path::from_iter(["test"]);
+        assert_eq!(prefix.as_ref(), "test");
+    }
+
+    #[test]
+    fn push_encodes() {
+        let location = Path::from_iter(["foo/bar", "baz%2Ftest"]);
+        assert_eq!(location.as_ref(), "foo%2Fbar/baz%252Ftest");
+    }
+
+    #[test]
+    fn test_parse() {
+        assert_eq!(Path::parse("/").unwrap().as_ref(), "");
+        assert_eq!(Path::parse("").unwrap().as_ref(), "");
+
+        let err = Path::parse("//").unwrap_err();
+        assert!(matches!(err, Error::EmptySegment { .. }));
+
+        assert_eq!(Path::parse("/foo/bar/").unwrap().as_ref(), "foo/bar");
+        assert_eq!(Path::parse("foo/bar/").unwrap().as_ref(), "foo/bar");
+        assert_eq!(Path::parse("foo/bar").unwrap().as_ref(), "foo/bar");
+
+        let err = Path::parse("foo///bar").unwrap_err();
+        assert!(matches!(err, Error::EmptySegment { .. }));
+    }
+
+    #[test]
+    fn convert_raw_before_partial_eq() {
+        // dir and file_name
+        let cloud = Path::from("test_dir/test_file.json");
+        let built = Path::from_iter(["test_dir", "test_file.json"]);
+
+        assert_eq!(built, cloud);
+
+        // dir and file_name w/o dot
+        let cloud = Path::from("test_dir/test_file");
+        let built = Path::from_iter(["test_dir", "test_file"]);
+
+        assert_eq!(built, cloud);
+
+        // dir, no file
+        let cloud = Path::from("test_dir/");
+        let built = Path::from_iter(["test_dir"]);
+        assert_eq!(built, cloud);
+
+        // file_name, no dir
+        let cloud = Path::from("test_file.json");
+        let built = Path::from_iter(["test_file.json"]);
+        assert_eq!(built, cloud);
+
+        // empty
+        let cloud = Path::from("");
+        let built = Path::from_iter(["", ""]);
+
+        assert_eq!(built, cloud);
+    }
+
+    #[test]
+    fn parts_after_prefix_behavior() {
+        let existing_path = Path::from("apple/bear/cow/dog/egg.json");
+
+        // Prefix with one directory
+        let prefix = Path::from("apple");
+        let expected_parts: Vec<PathPart<'_>> = vec!["bear", "cow", "dog", "egg.json"]
+            .into_iter()
+            .map(Into::into)
+            .collect();
+        let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect();
+        assert_eq!(parts, expected_parts);
+
+        // Prefix with two directories
+        let prefix = Path::from("apple/bear");
+        let expected_parts: Vec<PathPart<'_>> = vec!["cow", "dog", "egg.json"]
+            .into_iter()
+            .map(Into::into)
+            .collect();
+        let parts: Vec<_> = existing_path.prefix_match(&prefix).unwrap().collect();
+        assert_eq!(parts, expected_parts);
+
+        // Not a prefix
+        let prefix = Path::from("cow");
+        assert!(existing_path.prefix_match(&prefix).is_none());
+
+        // Prefix with a partial directory
+        let prefix = Path::from("ap");
+        assert!(existing_path.prefix_match(&prefix).is_none());
+
+        // Prefix matches but there aren't any parts after it
+        let existing_path = Path::from("apple/bear/cow/dog");
+
+        let prefix = existing_path.clone();
+        assert_eq!(existing_path.prefix_match(&prefix).unwrap().count(), 0);
+    }
+
+    #[test]
+    fn prefix_matches() {
+        let haystack = Path::from_iter(["foo/bar", "baz%2Ftest", "something"]);
+        let needle = haystack.clone();
+        // self starts with self
+        assert!(
+            haystack.prefix_matches(&haystack),
+            "{:?} should have started with {:?}",
+            haystack,
+            haystack
+        );
+
+        // a longer prefix doesn't match
+        let needle = needle.child("longer now");
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} shouldn't have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // one dir prefix matches
+        let needle = Path::from_iter(["foo/bar"]);
+        assert!(
+            haystack.prefix_matches(&needle),
+            "{:?} should have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // two dir prefix matches
+        let needle = needle.child("baz%2Ftest");
+        assert!(
+            haystack.prefix_matches(&needle),
+            "{:?} should have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // partial dir prefix doesn't match
+        let needle = Path::from_iter(["f"]);
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} should not have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // one dir and one partial dir doesn't match
+        let needle = Path::from_iter(["foo/bar", "baz"]);
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} should not have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // empty prefix matches
+        let needle = Path::from("");
+        assert!(
+            haystack.prefix_matches(&needle),
+            "{:?} should have started with {:?}",
+            haystack,
+            needle
+        );
+    }
+
+    #[test]
+    fn prefix_matches_with_file_name() {
+        let haystack =
+            Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo.segment"]);
+
+        // All directories match and file name is a prefix
+        let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "foo"]);
+
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} should not have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // All directories match but file name is not a prefix
+        let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "something", "e"]);
+
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} should not have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // Not all directories match; file name is a prefix of the next directory; this
+        // does not match
+        let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "s"]);
+
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} should not have started with {:?}",
+            haystack,
+            needle
+        );
+
+        // Not all directories match; file name is NOT a prefix of the next directory;
+        // no match
+        let needle = Path::from_iter(["foo/bar", "baz%2Ftest", "p"]);
+
+        assert!(
+            !haystack.prefix_matches(&needle),
+            "{:?} should not have started with {:?}",
+            haystack,
+            needle
+        );
+    }
+}
diff --git a/object_store/src/path/parts.rs b/object_store/src/path/parts.rs
new file mode 100644
index 0000000..e73b184
--- /dev/null
+++ b/object_store/src/path/parts.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use percent_encoding::{percent_decode, percent_encode, AsciiSet, CONTROLS};
+use std::borrow::Cow;
+
+use crate::path::DELIMITER_BYTE;
+use snafu::Snafu;
+
+/// Error returned by [`PathPart::parse`]
+#[derive(Debug, Snafu)]
+#[snafu(display("Invalid path segment - got \"{}\" expected: \"{}\"", actual, expected))]
+#[allow(missing_copy_implementations)]
+pub struct InvalidPart {
+    actual: String,
+    expected: String,
+}
+
+/// The PathPart type exists to validate the directory/file names that form part
+/// of a path.
+///
+/// A PathPart instance is guaranteed to to contain no illegal characters (e.g. `/`)
+/// as it can only be constructed by going through the `from` impl.
+#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default, Hash)]
+pub struct PathPart<'a> {
+    pub(super) raw: Cow<'a, str>,
+}
+
+impl<'a> PathPart<'a> {
+    /// Parse the provided path segment as a [`PathPart`] returning an error if invalid
+    pub fn parse(segment: &'a str) -> Result<Self, InvalidPart> {
+        let decoded: Cow<'a, [u8]> = percent_decode(segment.as_bytes()).into();
+        let part = PathPart::from(decoded.as_ref());
+        if segment != part.as_ref() {
+            return Err(InvalidPart {
+                actual: segment.to_string(),
+                expected: part.raw.to_string(),
+            });
+        }
+
+        Ok(Self {
+            raw: segment.into(),
+        })
+    }
+}
+
+/// Characters we want to encode.
+const INVALID: &AsciiSet = &CONTROLS
+    // The delimiter we are reserving for internal hierarchy
+    .add(DELIMITER_BYTE)
+    // Characters AWS recommends avoiding for object keys
+    // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
+    .add(b'\\')
+    .add(b'{')
+    .add(b'^')
+    .add(b'}')
+    .add(b'%')
+    .add(b'`')
+    .add(b']')
+    .add(b'"') // " <-- my editor is confused about double quotes within single quotes
+    .add(b'>')
+    .add(b'[')
+    .add(b'~')
+    .add(b'<')
+    .add(b'#')
+    .add(b'|')
+    // Characters Google Cloud Storage recommends avoiding for object names
+    // https://cloud.google.com/storage/docs/naming-objects
+    .add(b'\r')
+    .add(b'\n')
+    .add(b'*')
+    .add(b'?');
+
+impl<'a> From<&'a [u8]> for PathPart<'a> {
+    fn from(v: &'a [u8]) -> Self {
+        let inner = match v {
+            // We don't want to encode `.` generally, but we do want to disallow parts of paths
+            // to be equal to `.` or `..` to prevent file system traversal shenanigans.
+            b"." => "%2E".into(),
+            b".." => "%2E%2E".into(),
+            other => percent_encode(other, INVALID).into(),
+        };
+        Self { raw: inner }
+    }
+}
+
+impl<'a> From<&'a str> for PathPart<'a> {
+    fn from(v: &'a str) -> Self {
+        Self::from(v.as_bytes())
+    }
+}
+
+impl From<String> for PathPart<'static> {
+    fn from(s: String) -> Self {
+        Self {
+            raw: Cow::Owned(PathPart::from(s.as_str()).raw.into_owned()),
+        }
+    }
+}
+
+impl<'a> AsRef<str> for PathPart<'a> {
+    fn as_ref(&self) -> &str {
+        self.raw.as_ref()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn path_part_delimiter_gets_encoded() {
+        let part: PathPart<'_> = "foo/bar".into();
+        assert_eq!(part.raw, "foo%2Fbar");
+    }
+
+    #[test]
+    fn path_part_given_already_encoded_string() {
+        let part: PathPart<'_> = "foo%2Fbar".into();
+        assert_eq!(part.raw, "foo%252Fbar");
+    }
+
+    #[test]
+    fn path_part_cant_be_one_dot() {
+        let part: PathPart<'_> = ".".into();
+        assert_eq!(part.raw, "%2E");
+    }
+
+    #[test]
+    fn path_part_cant_be_two_dots() {
+        let part: PathPart<'_> = "..".into();
+        assert_eq!(part.raw, "%2E%2E");
+    }
+}
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
new file mode 100644
index 0000000..7a54a06
--- /dev/null
+++ b/object_store/src/throttle.rs
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A throttling object store wrapper
+use parking_lot::Mutex;
+use std::ops::Range;
+use std::{convert::TryInto, sync::Arc};
+
+use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::{stream::BoxStream, StreamExt};
+use std::time::Duration;
+
+/// Configuration settings for throttled store
+#[derive(Debug, Default, Clone, Copy)]
+pub struct ThrottleConfig {
+    /// Sleep duration for every call to [`delete`](ThrottledStore::delete).
+    ///
+    /// Sleeping is done before the underlying store is called and independently of the success of
+    /// the operation.
+    pub wait_delete_per_call: Duration,
+
+    /// Sleep duration for every byte received during [`get`](ThrottledStore::get).
+    ///
+    /// Sleeping is performed after the underlying store returned and only for successful gets. The
+    /// sleep duration is additive to [`wait_get_per_call`](Self::wait_get_per_call).
+    ///
+    /// Note that the per-byte sleep only happens as the user consumes the output bytes. Should
+    /// there be an intermediate failure (i.e. after partly consuming the output bytes), the
+    /// resulting sleep time will be partial as well.
+    pub wait_get_per_byte: Duration,
+
+    /// Sleep duration for every call to [`get`](ThrottledStore::get).
+    ///
+    /// Sleeping is done before the underlying store is called and independently of the success of
+    /// the operation. The sleep duration is additive to
+    /// [`wait_get_per_byte`](Self::wait_get_per_byte).
+    pub wait_get_per_call: Duration,
+
+    /// Sleep duration for every call to [`list`](ThrottledStore::list).
+    ///
+    /// Sleeping is done before the underlying store is called and independently of the success of
+    /// the operation. The sleep duration is additive to
+    /// [`wait_list_per_entry`](Self::wait_list_per_entry).
+    pub wait_list_per_call: Duration,
+
+    /// Sleep duration for every entry received during [`list`](ThrottledStore::list).
+    ///
+    /// Sleeping is performed after the underlying store returned and only for successful lists.
+    /// The sleep duration is additive to [`wait_list_per_call`](Self::wait_list_per_call).
+    ///
+    /// Note that the per-entry sleep only happens as the user consumes the output entries. Should
+    /// there be an intermediate failure (i.e. after partly consuming the output entries), the
+    /// resulting sleep time will be partial as well.
+    pub wait_list_per_entry: Duration,
+
+    /// Sleep duration for every call to
+    /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
+    ///
+    /// Sleeping is done before the underlying store is called and independently of the success of
+    /// the operation. The sleep duration is additive to
+    /// [`wait_list_with_delimiter_per_entry`](Self::wait_list_with_delimiter_per_entry).
+    pub wait_list_with_delimiter_per_call: Duration,
+
+    /// Sleep duration for every entry received during
+    /// [`list_with_delimiter`](ThrottledStore::list_with_delimiter).
+    ///
+    /// Sleeping is performed after the underlying store returned and only for successful gets. The
+    /// sleep duration is additive to
+    /// [`wait_list_with_delimiter_per_call`](Self::wait_list_with_delimiter_per_call).
+    pub wait_list_with_delimiter_per_entry: Duration,
+
+    /// Sleep duration for every call to [`put`](ThrottledStore::put).
+    ///
+    /// Sleeping is done before the underlying store is called and independently of the success of
+    /// the operation.
+    pub wait_put_per_call: Duration,
+}
+
+/// Sleep only if non-zero duration
+async fn sleep(duration: Duration) {
+    if !duration.is_zero() {
+        tokio::time::sleep(duration).await
+    }
+}
+
+/// Store wrapper that wraps an inner store with some `sleep` calls.
+///
+/// This can be used for performance testing.
+///
+/// **Note that the behavior of the wrapper is deterministic and might not reflect real-world
+/// conditions!**
+#[derive(Debug)]
+pub struct ThrottledStore<T: ObjectStore> {
+    inner: T,
+    config: Arc<Mutex<ThrottleConfig>>,
+}
+
+impl<T: ObjectStore> ThrottledStore<T> {
+    /// Create new wrapper with zero waiting times.
+    pub fn new(inner: T, config: ThrottleConfig) -> Self {
+        Self {
+            inner,
+            config: Arc::new(Mutex::new(config)),
+        }
+    }
+
+    /// Mutate config.
+    pub fn config_mut<F>(&self, f: F)
+    where
+        F: Fn(&mut ThrottleConfig),
+    {
+        let mut guard = self.config.lock();
+        f(&mut guard)
+    }
+
+    /// Return copy of current config.
+    pub fn config(&self) -> ThrottleConfig {
+        *self.config.lock()
+    }
+}
+
+impl<T: ObjectStore> std::fmt::Display for ThrottledStore<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "ThrottledStore({})", self.inner)
+    }
+}
+
+#[async_trait]
+impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        sleep(self.config().wait_put_per_call).await;
+
+        self.inner.put(location, bytes).await
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        sleep(self.config().wait_get_per_call).await;
+
+        // need to copy to avoid moving / referencing `self`
+        let wait_get_per_byte = self.config().wait_get_per_byte;
+
+        self.inner.get(location).await.map(|result| {
+            let s = match result {
+                GetResult::Stream(s) => s,
+                GetResult::File(_, _) => unimplemented!(),
+            };
+
+            GetResult::Stream(
+                s.then(move |bytes_result| async move {
+                    match bytes_result {
+                        Ok(bytes) => {
+                            let bytes_len: u32 = usize_to_u32_saturate(bytes.len());
+                            sleep(wait_get_per_byte * bytes_len).await;
+                            Ok(bytes)
+                        }
+                        Err(err) => Err(err),
+                    }
+                })
+                .boxed(),
+            )
+        })
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
+        let config = self.config();
+
+        let sleep_duration = config.wait_delete_per_call
+            + config.wait_get_per_byte * (range.end - range.start) as u32;
+
+        sleep(sleep_duration).await;
+
+        self.inner.get_range(location, range).await
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        sleep(self.config().wait_put_per_call).await;
+        self.inner.head(location).await
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        sleep(self.config().wait_delete_per_call).await;
+
+        self.inner.delete(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        sleep(self.config().wait_list_per_call).await;
+
+        // need to copy to avoid moving / referencing `self`
+        let wait_list_per_entry = self.config().wait_list_per_entry;
+
+        self.inner.list(prefix).await.map(|stream| {
+            stream
+                .then(move |result| async move {
+                    match result {
+                        Ok(entry) => {
+                            sleep(wait_list_per_entry).await;
+                            Ok(entry)
+                        }
+                        Err(err) => Err(err),
+                    }
+                })
+                .boxed()
+        })
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
+        sleep(self.config().wait_list_with_delimiter_per_call).await;
+
+        match self.inner.list_with_delimiter(prefix).await {
+            Ok(list_result) => {
+                let entries_len = usize_to_u32_saturate(list_result.objects.len());
+                sleep(self.config().wait_list_with_delimiter_per_entry * entries_len)
+                    .await;
+                Ok(list_result)
+            }
+            Err(err) => Err(err),
+        }
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        sleep(self.config().wait_put_per_call).await;
+
+        self.inner.copy(from, to).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        sleep(self.config().wait_put_per_call).await;
+
+        self.inner.copy_if_not_exists(from, to).await
+    }
+}
+
+/// Saturated `usize` to `u32` cast.
+fn usize_to_u32_saturate(x: usize) -> u32 {
+    x.try_into().unwrap_or(u32::MAX)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{
+        memory::InMemory,
+        tests::{
+            copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
+            put_get_delete_list, rename_and_copy,
+        },
+    };
+    use bytes::Bytes;
+    use futures::TryStreamExt;
+    use tokio::time::Duration;
+    use tokio::time::Instant;
+
+    const WAIT_TIME: Duration = Duration::from_millis(100);
+    const ZERO: Duration = Duration::from_millis(0); // Duration::default isn't constant
+
+    macro_rules! assert_bounds {
+        ($d:expr, $lower:expr) => {
+            assert_bounds!($d, $lower, $lower + 1);
+        };
+        ($d:expr, $lower:expr, $upper:expr) => {
+            let d = $d;
+            let lower = $lower * WAIT_TIME;
+            let upper = $upper * WAIT_TIME;
+            assert!(d >= lower, "{:?} must be >= than {:?}", d, lower);
+            assert!(d < upper, "{:?} must be < than {:?}", d, upper);
+        };
+    }
+
+    #[tokio::test]
+    async fn throttle_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        put_get_delete_list(&store).await.unwrap();
+        list_uses_directories_correctly(&store).await.unwrap();
+        list_with_delimiter(&store).await.unwrap();
+        rename_and_copy(&store).await.unwrap();
+        copy_if_not_exists(&store).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn delete_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        assert_bounds!(measure_delete(&store, None).await, 0);
+        assert_bounds!(measure_delete(&store, Some(0)).await, 0);
+        assert_bounds!(measure_delete(&store, Some(10)).await, 0);
+
+        store.config_mut(|cfg| cfg.wait_delete_per_call = WAIT_TIME);
+        assert_bounds!(measure_delete(&store, None).await, 1);
+        assert_bounds!(measure_delete(&store, Some(0)).await, 1);
+        assert_bounds!(measure_delete(&store, Some(10)).await, 1);
+    }
+
+    #[tokio::test]
+    // macos github runner is so slow it can't complete within WAIT_TIME*2
+    #[cfg(target_os = "linux")]
+    async fn get_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        assert_bounds!(measure_get(&store, None).await, 0);
+        assert_bounds!(measure_get(&store, Some(0)).await, 0);
+        assert_bounds!(measure_get(&store, Some(10)).await, 0);
+
+        store.config_mut(|cfg| cfg.wait_get_per_call = WAIT_TIME);
+        assert_bounds!(measure_get(&store, None).await, 1);
+        assert_bounds!(measure_get(&store, Some(0)).await, 1);
+        assert_bounds!(measure_get(&store, Some(10)).await, 1);
+
+        store.config_mut(|cfg| {
+            cfg.wait_get_per_call = ZERO;
+            cfg.wait_get_per_byte = WAIT_TIME;
+        });
+        assert_bounds!(measure_get(&store, Some(2)).await, 2);
+
+        store.config_mut(|cfg| {
+            cfg.wait_get_per_call = WAIT_TIME;
+            cfg.wait_get_per_byte = WAIT_TIME;
+        });
+        assert_bounds!(measure_get(&store, Some(2)).await, 3);
+    }
+
+    #[tokio::test]
+    // macos github runner is so slow it can't complete within WAIT_TIME*2
+    #[cfg(target_os = "linux")]
+    async fn list_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        assert_bounds!(measure_list(&store, 0).await, 0);
+        assert_bounds!(measure_list(&store, 10).await, 0);
+
+        store.config_mut(|cfg| cfg.wait_list_per_call = WAIT_TIME);
+        assert_bounds!(measure_list(&store, 0).await, 1);
+        assert_bounds!(measure_list(&store, 10).await, 1);
+
+        store.config_mut(|cfg| {
+            cfg.wait_list_per_call = ZERO;
+            cfg.wait_list_per_entry = WAIT_TIME;
+        });
+        assert_bounds!(measure_list(&store, 2).await, 2);
+
+        store.config_mut(|cfg| {
+            cfg.wait_list_per_call = WAIT_TIME;
+            cfg.wait_list_per_entry = WAIT_TIME;
+        });
+        assert_bounds!(measure_list(&store, 2).await, 3);
+    }
+
+    #[tokio::test]
+    // macos github runner is so slow it can't complete within WAIT_TIME*2
+    #[cfg(target_os = "linux")]
+    async fn list_with_delimiter_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        assert_bounds!(measure_list_with_delimiter(&store, 0).await, 0);
+        assert_bounds!(measure_list_with_delimiter(&store, 10).await, 0);
+
+        store.config_mut(|cfg| cfg.wait_list_with_delimiter_per_call = WAIT_TIME);
+        assert_bounds!(measure_list_with_delimiter(&store, 0).await, 1);
+        assert_bounds!(measure_list_with_delimiter(&store, 10).await, 1);
+
+        store.config_mut(|cfg| {
+            cfg.wait_list_with_delimiter_per_call = ZERO;
+            cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
+        });
+        assert_bounds!(measure_list_with_delimiter(&store, 2).await, 2);
+
+        store.config_mut(|cfg| {
+            cfg.wait_list_with_delimiter_per_call = WAIT_TIME;
+            cfg.wait_list_with_delimiter_per_entry = WAIT_TIME;
+        });
+        assert_bounds!(measure_list_with_delimiter(&store, 2).await, 3);
+    }
+
+    #[tokio::test]
+    async fn put_test() {
+        let inner = InMemory::new();
+        let store = ThrottledStore::new(inner, ThrottleConfig::default());
+
+        assert_bounds!(measure_put(&store, 0).await, 0);
+        assert_bounds!(measure_put(&store, 10).await, 0);
+
+        store.config_mut(|cfg| cfg.wait_put_per_call = WAIT_TIME);
+        assert_bounds!(measure_put(&store, 0).await, 1);
+        assert_bounds!(measure_put(&store, 10).await, 1);
+
+        store.config_mut(|cfg| cfg.wait_put_per_call = ZERO);
+        assert_bounds!(measure_put(&store, 0).await, 0);
+    }
+
+    async fn place_test_object(
+        store: &ThrottledStore<InMemory>,
+        n_bytes: Option<usize>,
+    ) -> Path {
+        let path = Path::from("foo");
+
+        if let Some(n_bytes) = n_bytes {
+            let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
+            let bytes = Bytes::from(data);
+            store.put(&path, bytes).await.unwrap();
+        } else {
+            // ensure object is absent
+            store.delete(&path).await.unwrap();
+        }
+
+        path
+    }
+
+    async fn place_test_objects(
+        store: &ThrottledStore<InMemory>,
+        n_entries: usize,
+    ) -> Path {
+        let prefix = Path::from("foo");
+
+        // clean up store
+        let entries: Vec<_> = store
+            .list(Some(&prefix))
+            .await
+            .unwrap()
+            .try_collect()
+            .await
+            .unwrap();
+
+        for entry in entries {
+            store.delete(&entry.location).await.unwrap();
+        }
+
+        // create new entries
+        for i in 0..n_entries {
+            let path = prefix.child(i.to_string().as_str());
+
+            let data = Bytes::from("bar");
+            store.put(&path, data).await.unwrap();
+        }
+
+        prefix
+    }
+
+    async fn measure_delete(
+        store: &ThrottledStore<InMemory>,
+        n_bytes: Option<usize>,
+    ) -> Duration {
+        let path = place_test_object(store, n_bytes).await;
+
+        let t0 = Instant::now();
+        store.delete(&path).await.unwrap();
+
+        t0.elapsed()
+    }
+
+    async fn measure_get(
+        store: &ThrottledStore<InMemory>,
+        n_bytes: Option<usize>,
+    ) -> Duration {
+        let path = place_test_object(store, n_bytes).await;
+
+        let t0 = Instant::now();
+        let res = store.get(&path).await;
+        if n_bytes.is_some() {
+            // need to consume bytes to provoke sleep times
+            let s = match res.unwrap() {
+                GetResult::Stream(s) => s,
+                GetResult::File(_, _) => unimplemented!(),
+            };
+
+            s.map_ok(|b| bytes::BytesMut::from(&b[..]))
+                .try_concat()
+                .await
+                .unwrap();
+        } else {
+            assert!(res.is_err());
+        }
+
+        t0.elapsed()
+    }
+
+    async fn measure_list(
+        store: &ThrottledStore<InMemory>,
+        n_entries: usize,
+    ) -> Duration {
+        let prefix = place_test_objects(store, n_entries).await;
+
+        let t0 = Instant::now();
+        store
+            .list(Some(&prefix))
+            .await
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        t0.elapsed()
+    }
+
+    async fn measure_list_with_delimiter(
+        store: &ThrottledStore<InMemory>,
+        n_entries: usize,
+    ) -> Duration {
+        let prefix = place_test_objects(store, n_entries).await;
+
+        let t0 = Instant::now();
+        store.list_with_delimiter(Some(&prefix)).await.unwrap();
+
+        t0.elapsed()
+    }
+
+    async fn measure_put(store: &ThrottledStore<InMemory>, n_bytes: usize) -> Duration {
+        let data: Vec<_> = std::iter::repeat(1u8).take(n_bytes).collect();
+        let bytes = Bytes::from(data);
+
+        let t0 = Instant::now();
+        store.put(&Path::from("foo"), bytes).await.unwrap();
+
+        t0.elapsed()
+    }
+}
diff --git a/object_store/src/token.rs b/object_store/src/token.rs
new file mode 100644
index 0000000..a56a294
--- /dev/null
+++ b/object_store/src/token.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::future::Future;
+use std::time::Instant;
+use tokio::sync::Mutex;
+
+/// A temporary authentication token with an associated expiry
+#[derive(Debug, Clone)]
+pub struct TemporaryToken<T> {
+    /// The temporary credential
+    pub token: T,
+    /// The instant at which this credential is no longer valid
+    pub expiry: Instant,
+}
+
+/// Provides [`TokenCache::get_or_insert_with`] which can be used to cache a
+/// [`TemporaryToken`] based on its expiry
+#[derive(Debug, Default)]
+pub struct TokenCache<T> {
+    cache: Mutex<Option<TemporaryToken<T>>>,
+}
+
+impl<T: Clone + Send> TokenCache<T> {
+    pub async fn get_or_insert_with<F, Fut, E>(&self, f: F) -> Result<T, E>
+    where
+        F: FnOnce() -> Fut + Send,
+        Fut: Future<Output = Result<TemporaryToken<T>, E>> + Send,
+    {
+        let now = Instant::now();
+        let mut locked = self.cache.lock().await;
+
+        if let Some(cached) = locked.as_ref() {
+            let delta = cached
+                .expiry
+                .checked_duration_since(now)
+                .unwrap_or_default();
+
+            if delta.as_secs() > 300 {
+                return Ok(cached.token.clone());
+            }
+        }
+
+        let cached = f().await?;
+        let token = cached.token.clone();
+        *locked = Some(cached);
+
+        Ok(token)
+    }
+}
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
new file mode 100644
index 0000000..4f3ed86
--- /dev/null
+++ b/object_store/src/util.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common logic for interacting with remote object stores
+use super::Result;
+use bytes::Bytes;
+use futures::{stream::StreamExt, Stream};
+
+/// Returns the prefix to be passed to an object store
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
+    prefix
+        .filter(|x| !x.as_ref().is_empty())
+        .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
+}
+
+/// Returns a formatted HTTP range header as per
+/// <https://httpwg.org/specs/rfc7233.html#header.range>
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub fn format_http_range(range: std::ops::Range<usize>) -> String {
+    format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
+}
+
+/// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk
+pub async fn collect_bytes<S>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes>
+where
+    S: Stream<Item = Result<Bytes>> + Send + Unpin,
+{
+    let first = stream.next().await.transpose()?.unwrap_or_default();
+
+    // Avoid copying if single response
+    match stream.next().await.transpose()? {
+        None => Ok(first),
+        Some(second) => {
+            let size_hint = size_hint.unwrap_or_else(|| first.len() + second.len());
+
+            let mut buf = Vec::with_capacity(size_hint);
+            buf.extend_from_slice(&first);
+            buf.extend_from_slice(&second);
+            while let Some(maybe_bytes) = stream.next().await {
+                buf.extend_from_slice(&maybe_bytes?);
+            }
+
+            Ok(buf.into())
+        }
+    }
+}
+
+/// Takes a function and spawns it to a tokio blocking pool if available
+pub async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
+where
+    F: FnOnce() -> Result<T> + Send + 'static,
+    T: Send + 'static,
+{
+    match tokio::runtime::Handle::try_current() {
+        Ok(runtime) => runtime.spawn_blocking(f).await?,
+        Err(_) => f(),
+    }
+}