blob: 0e38b98408fb4a1698afd46e585e52960858924c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Integration tests for custom object store implementations
//!
//! NB: These tests will delete everything present in the provided [`DynObjectStore`].
//!
//! These tests are not a stable part of the public API and breaking changes may be made
//! in patch releases.
//!
//! They are intended solely for testing purposes.
use crate::list::{PaginatedListOptions, PaginatedListStore};
use crate::multipart::MultipartStore;
use crate::path::Path;
use crate::{
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload,
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart,
};
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use rand::{rng, Rng};
use std::collections::HashSet;
pub(crate) async fn flatten_list_stream(
storage: &DynObjectStore,
prefix: Option<&Path>,
) -> crate::Result<Vec<Path>> {
storage
.list(prefix)
.map_ok(|meta| meta.location)
.try_collect::<Vec<Path>>()
.await
}
/// Tests basic read/write and listing operations
pub async fn put_get_delete_list(storage: &DynObjectStore) {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {content_list:?}"
);
let location = Path::from("test_dir/test_file.json");
let data = Bytes::from("arbitrary data");
storage.put(&location, data.clone().into()).await.unwrap();
let root = Path::from("/");
// List everything
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// Should behave the same as no prefix
let content_list = flatten_list_stream(storage, Some(&root)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// List with delimiter
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
// Should behave the same as no prefix
let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
assert!(result.objects.is_empty());
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
// Should return not found
let err = storage.get(&Path::from("test_dir")).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// Should return not found
let err = storage.head(&Path::from("test_dir")).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// List everything starting with a prefix that should return results
let prefix = Path::from("test_dir");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
// List everything starting with a prefix that shouldn't return results
let prefix = Path::from("something");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert!(content_list.is_empty());
let read_data = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, data);
// Test range request
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;
let bytes = range_result.unwrap();
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));
let opts = GetOptions {
range: Some(GetRange::Bounded(2..5)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
// Data is `"arbitrary data"`, length 14 bytes
assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
assert_eq!(result.range, 2..5);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bit".as_ref());
let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;
// Should be a non-fatal error
out_of_range_result.unwrap_err();
let opts = GetOptions {
range: Some(GetRange::Bounded(2..100)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 2..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bitrary data".as_ref());
let opts = GetOptions {
range: Some(GetRange::Suffix(2)),
..Default::default()
};
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 12..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"ta".as_ref());
}
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}
let opts = GetOptions {
range: Some(GetRange::Suffix(100)),
..Default::default()
};
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 0..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"arbitrary data".as_ref());
}
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}
let opts = GetOptions {
range: Some(GetRange::Offset(3)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 3..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"itrary data".as_ref());
let opts = GetOptions {
range: Some(GetRange::Offset(100)),
..Default::default()
};
storage.get_opts(&location, opts).await.unwrap_err();
let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));
}
let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, data.len() as u64);
storage.delete(&location).await.unwrap();
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
let err = storage.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
// Test handling of paths containing an encoded delimiter
let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
storage
.put(&file_with_delimiter, "arbitrary".into())
.await
.unwrap();
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![file_with_delimiter.clone()]);
let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
.await
.unwrap();
assert!(files.is_empty());
let files = storage
.list_with_delimiter(Some(&Path::from("a/b")))
.await
.unwrap();
assert!(files.common_prefixes.is_empty());
assert!(files.objects.is_empty());
let files = storage
.list_with_delimiter(Some(&Path::from("a")))
.await
.unwrap();
assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
assert!(files.objects.is_empty());
let files = storage
.list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
.await
.unwrap();
assert!(files.common_prefixes.is_empty());
assert_eq!(files.objects.len(), 1);
assert_eq!(files.objects[0].location, file_with_delimiter);
storage.delete(&file_with_delimiter).await.unwrap();
// Test handling of paths containing non-ASCII characters, e.g. emoji
let emoji_prefix = Path::from("🙀");
let emoji_file = Path::from("🙀/😀.parquet");
storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
storage.head(&emoji_file).await.unwrap();
storage
.get(&emoji_file)
.await
.unwrap()
.bytes()
.await
.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
assert_eq!(files, vec![emoji_file.clone()]);
let dst = Path::from("foo.parquet");
storage.copy(&emoji_file, &dst).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
let dst2 = Path::from("new/nested/foo.parquet");
storage.copy(&emoji_file, &dst2).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);
let dst3 = Path::from("new/nested2/bar.parquet");
storage.rename(&dst, &dst3).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);
let err = storage.head(&dst).await.unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));
storage.delete(&emoji_file).await.unwrap();
storage.delete(&dst3).await.unwrap();
storage.delete(&dst2).await.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
assert!(files.is_empty());
// Test handling of paths containing percent-encoded sequences
// "HELLO" percent encoded
let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
let path = hello_prefix.child("foo.parquet");
storage.put(&path, vec![0, 1].into()).await.unwrap();
let files = flatten_list_stream(storage, Some(&hello_prefix))
.await
.unwrap();
assert_eq!(files, vec![path.clone()]);
// Cannot list by decoded representation
let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
.await
.unwrap();
assert!(files.is_empty());
// Cannot access by decoded representation
let err = storage
.head(&Path::from("HELLO/foo.parquet"))
.await
.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
storage.delete(&path).await.unwrap();
// Test handling of unicode paths
let path = Path::parse("🇦🇺/$shenanigans@@~.txt").unwrap();
storage.put(&path, "test".into()).await.unwrap();
let r = storage.get(&path).await.unwrap();
assert_eq!(r.bytes().await.unwrap(), "test");
let dir = Path::parse("🇦🇺").unwrap();
let r = storage.list_with_delimiter(None).await.unwrap();
assert!(r.common_prefixes.contains(&dir));
let r = storage.list_with_delimiter(Some(&dir)).await.unwrap();
assert_eq!(r.objects.len(), 1);
assert_eq!(r.objects[0].location, path);
storage.delete(&path).await.unwrap();
// Can also write non-percent encoded sequences
let path = Path::parse("%Q.parquet").unwrap();
storage.put(&path, vec![0, 1].into()).await.unwrap();
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![path.clone()]);
storage.delete(&path).await.unwrap();
let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
storage.put(&path, vec![0, 1].into()).await.unwrap();
storage.head(&path).await.unwrap();
let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
.await
.unwrap();
assert_eq!(files, vec![path.clone()]);
storage.delete(&path).await.unwrap();
let files = flatten_list_stream(storage, None).await.unwrap();
assert!(files.is_empty(), "{files:?}");
// Test list order
let files = vec![
Path::from("a a/b.file"),
Path::parse("a%2Fa.file").unwrap(),
Path::from("a/😀.file"),
Path::from("a/a file"),
Path::parse("a/a%2F.file").unwrap(),
Path::from("a/a.file"),
Path::from("a/a/b.file"),
Path::from("a/b.file"),
Path::from("aa/a.file"),
Path::from("ab/a.file"),
];
for file in &files {
storage.put(file, "foo".into()).await.unwrap();
}
let cases = [
(None, Path::from("a")),
(None, Path::from("a/a file")),
(None, Path::from("a/a/b.file")),
(None, Path::from("ab/a.file")),
(None, Path::from("a%2Fa.file")),
(None, Path::from("a/😀.file")),
(Some(Path::from("a")), Path::from("")),
(Some(Path::from("a")), Path::from("a")),
(Some(Path::from("a")), Path::from("a/😀")),
(Some(Path::from("a")), Path::from("a/😀.file")),
(Some(Path::from("a")), Path::from("a/b")),
(Some(Path::from("a")), Path::from("a/a/b.file")),
];
for (prefix, offset) in cases {
let s = storage.list_with_offset(prefix.as_ref(), &offset);
let mut actual: Vec<_> = s.map_ok(|x| x.location).try_collect().await.unwrap();
actual.sort_unstable();
let expected: Vec<_> = files
.iter()
.filter(|x| {
let prefix_match = prefix.as_ref().map(|p| x.prefix_matches(p)).unwrap_or(true);
prefix_match && *x > &offset
})
.cloned()
.collect();
assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
}
// Test bulk delete
let paths = vec![
Path::from("a/a.file"),
Path::from("a/a/b.file"),
Path::from("aa/a.file"),
Path::from("does_not_exist"),
Path::from("I'm a < & weird path"),
Path::from("ab/a.file"),
Path::from("a/😀.file"),
];
storage.put(&paths[4], "foo".into()).await.unwrap();
let out_paths = storage
.delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
.collect::<Vec<_>>()
.await;
assert_eq!(out_paths.len(), paths.len());
let expect_errors = [3];
for (i, input_path) in paths.iter().enumerate() {
let err = storage.head(input_path).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
if expect_errors.contains(&i) {
// Some object stores will report NotFound, but others (such as S3) will
// report success regardless.
match &out_paths[i] {
Err(Error::NotFound { path: out_path, .. }) => {
assert!(out_path.ends_with(&input_path.to_string()));
}
Ok(out_path) => {
assert_eq!(out_path, input_path);
}
_ => panic!("unexpected error"),
}
} else {
assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
}
}
delete_fixtures(storage).await;
let path = Path::from("empty");
storage.put(&path, PutPayload::default()).await.unwrap();
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, 0);
let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(data.len(), 0);
storage.delete(&path).await.unwrap();
}
/// Tests the ability to read and write [`Attributes`]
pub async fn put_get_attributes(integration: &dyn ObjectStore) {
// Test handling of attributes
let attributes = Attributes::from_iter([
(Attribute::CacheControl, "max-age=604800"),
(
Attribute::ContentDisposition,
r#"attachment; filename="test.html""#,
),
(Attribute::ContentEncoding, "gzip"),
(Attribute::ContentLanguage, "en-US"),
(Attribute::ContentType, "text/html; charset=utf-8"),
(Attribute::Metadata("test_key".into()), "test_value"),
]);
let path = Path::from("attributes");
let opts = attributes.clone().into();
match integration.put_opts(&path, "foo".into(), opts).await {
Ok(_) => {
let r = integration.get(&path).await.unwrap();
assert_eq!(r.attributes, attributes);
}
Err(Error::NotImplemented) => {}
Err(e) => panic!("{e}"),
}
let opts = attributes.clone().into();
match integration.put_multipart_opts(&path, opts).await {
Ok(mut w) => {
w.put_part("foo".into()).await.unwrap();
w.complete().await.unwrap();
let r = integration.get(&path).await.unwrap();
assert_eq!(r.attributes, attributes);
}
Err(Error::NotImplemented) => {}
Err(e) => panic!("{e}"),
}
}
/// Tests conditional read requests
pub async fn get_opts(storage: &dyn ObjectStore) {
let path = Path::from("test");
storage.put(&path, "foo".into()).await.unwrap();
let meta = storage.head(&path).await.unwrap();
let options = GetOptions {
if_unmodified_since: Some(meta.last_modified),
..GetOptions::default()
};
match storage.get_opts(&path, options).await {
Ok(_) | Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}
let options = GetOptions {
if_unmodified_since: Some(meta.last_modified + chrono::Duration::try_hours(10).unwrap()),
..GetOptions::default()
};
match storage.get_opts(&path, options).await {
Ok(_) | Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}
let options = GetOptions {
if_unmodified_since: Some(meta.last_modified - chrono::Duration::try_hours(10).unwrap()),
..GetOptions::default()
};
match storage.get_opts(&path, options).await {
Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
d => panic!("{d:?}"),
}
let options = GetOptions {
if_modified_since: Some(meta.last_modified),
..GetOptions::default()
};
match storage.get_opts(&path, options).await {
Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
d => panic!("{d:?}"),
}
let options = GetOptions {
if_modified_since: Some(meta.last_modified - chrono::Duration::try_hours(10).unwrap()),
..GetOptions::default()
};
match storage.get_opts(&path, options).await {
Ok(_) | Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}
let tag = meta.e_tag.unwrap();
let options = GetOptions {
if_match: Some(tag.clone()),
..GetOptions::default()
};
storage.get_opts(&path, options).await.unwrap();
let options = GetOptions {
if_match: Some("invalid".to_string()),
..GetOptions::default()
};
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");
let options = GetOptions {
if_none_match: Some(tag.clone()),
..GetOptions::default()
};
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::NotModified { .. }), "{err}");
let options = GetOptions {
if_none_match: Some("invalid".to_string()),
..GetOptions::default()
};
storage.get_opts(&path, options).await.unwrap();
let result = storage.put(&path, "test".into()).await.unwrap();
let new_tag = result.e_tag.unwrap();
assert_ne!(tag, new_tag);
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.e_tag.unwrap(), new_tag);
let options = GetOptions {
if_match: Some(new_tag),
..GetOptions::default()
};
storage.get_opts(&path, options).await.unwrap();
let options = GetOptions {
if_match: Some(tag),
..GetOptions::default()
};
let err = storage.get_opts(&path, options).await.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");
if let Some(version) = meta.version {
storage.put(&path, "bar".into()).await.unwrap();
let options = GetOptions {
version: Some(version),
..GetOptions::default()
};
// Can retrieve previous version
let get_opts = storage.get_opts(&path, options).await.unwrap();
let old = get_opts.bytes().await.unwrap();
assert_eq!(old, b"test".as_slice());
// Current version contains the updated data
let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(&current, b"bar".as_slice());
}
}
/// Tests conditional writes
pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
delete_fixtures(storage).await;
let path = Path::from("put_opts");
let v1 = storage
.put_opts(&path, "a".into(), PutMode::Create.into())
.await
.unwrap();
let err = storage
.put_opts(&path, "b".into(), PutMode::Create.into())
.await
.unwrap_err();
assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(b.as_ref(), b"a");
if !supports_update {
let err = storage
.put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into())
.await
.unwrap_err();
assert!(matches!(err, Error::NotImplemented), "{err}");
return;
}
let v2 = storage
.put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into())
.await
.unwrap();
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(b.as_ref(), b"c");
let err = storage
.put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
.await
.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");
storage
.put_opts(&path, "e".into(), PutMode::Update(v2.clone().into()).into())
.await
.unwrap();
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
assert_eq!(b.as_ref(), b"e");
// Update not exists
let path = Path::from("I don't exist");
let err = storage
.put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
.await
.unwrap_err();
assert!(matches!(err, Error::Precondition { .. }), "{err}");
const NUM_WORKERS: usize = 5;
const NUM_INCREMENTS: usize = 10;
let path = Path::from("RACE");
let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
.map(|_| async {
for _ in 0..NUM_INCREMENTS {
loop {
match storage.get(&path).await {
Ok(r) => {
let mode = PutMode::Update(UpdateVersion {
e_tag: r.meta.e_tag.clone(),
version: r.meta.version.clone(),
});
let b = r.bytes().await.unwrap();
let v: usize = std::str::from_utf8(&b).unwrap().parse().unwrap();
let new = (v + 1).to_string();
match storage.put_opts(&path, new.into(), mode.into()).await {
Ok(_) => break,
Err(Error::Precondition { .. }) => continue,
Err(e) => return Err(e),
}
}
Err(Error::NotFound { .. }) => {
let mode = PutMode::Create;
match storage.put_opts(&path, "1".into(), mode.into()).await {
Ok(_) => break,
Err(Error::AlreadyExists { .. }) => continue,
Err(e) => return Err(e),
}
}
Err(e) => return Err(e),
}
}
}
Ok(())
})
.collect();
while futures.next().await.transpose().unwrap().is_some() {}
let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);
}
/// Returns a chunk of length `chunk_length`
fn get_chunk(chunk_length: usize) -> Bytes {
let mut data = vec![0_u8; chunk_length];
let mut rng = rng();
// Set a random selection of bytes
for _ in 0..1000 {
data[rng.random_range(0..chunk_length)] = rng.random();
}
data.into()
}
/// Returns `num_chunks` of length `chunks`
fn get_chunks(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
(0..num_chunks).map(|_| get_chunk(chunk_length)).collect()
}
/// Tests the ability to perform multipart writes
pub async fn stream_get(storage: &DynObjectStore) {
let location = Path::from("test_dir/test_upload_file.txt");
// Can write to storage
let data = get_chunks(5 * 1024 * 1024, 3);
let bytes_expected = data.concat();
let mut upload = storage.put_multipart(&location).await.unwrap();
let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
futures::future::try_join_all(uploads).await.unwrap();
// Object should not yet exist in store
let meta_res = storage.head(&location).await;
assert!(meta_res.is_err());
assert!(matches!(
meta_res.unwrap_err(),
crate::Error::NotFound { .. }
));
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(&files, &[]);
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
upload.complete().await.unwrap();
let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
// Can overwrite some storage
// Sizes chosen to ensure we write three parts
let data = get_chunks(3_200_000, 7);
let bytes_expected = data.concat();
let upload = storage.put_multipart(&location).await.unwrap();
let mut writer = WriteMultipart::new(upload);
for chunk in &data {
writer.write(chunk)
}
writer.finish().await.unwrap();
let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
let location = Path::from("test_dir/test_put_part.txt");
let upload = storage.put_multipart(&location).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.put(vec![0; 2].into());
write.put(vec![3; 4].into());
write.finish().await.unwrap();
let meta = storage.head(&location).await.unwrap();
assert_eq!(meta.size, 6);
let location = Path::from("test_dir/test_put_part_mixed.txt");
let upload = storage.put_multipart(&location).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.put(vec![0; 2].into());
write.write(&[1, 2, 3]);
write.put(vec![4, 5, 6, 7].into());
write.finish().await.unwrap();
let r = storage.get(&location).await.unwrap();
let r = r.bytes().await.unwrap();
assert_eq!(r.as_ref(), &[0, 0, 1, 2, 3, 4, 5, 6, 7]);
// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
let mut upload = storage.put_multipart(&location).await.unwrap();
upload.abort().await.unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
get_res.unwrap_err(),
crate::Error::NotFound { .. }
));
// We can abort an in-progress write
let mut upload = storage.put_multipart(&location).await.unwrap();
upload
.put_part(data.first().unwrap().clone().into())
.await
.unwrap();
upload.abort().await.unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(get_res.unwrap_err(), Error::NotFound { .. }));
}
/// Tests that directories are transparent
pub async fn list_uses_directories_correctly(storage: &DynObjectStore) {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {content_list:?}"
);
let location1 = Path::from("foo/x.json");
let location2 = Path::from("foo.bar/y.json");
let data = PutPayload::from("arbitrary data");
storage.put(&location1, data.clone()).await.unwrap();
storage.put(&location2, data).await.unwrap();
let prefix = Path::from("foo");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location1.clone()]);
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.objects.len(), 1);
assert_eq!(result.objects[0].location, location1);
assert_eq!(result.common_prefixes, &[]);
// Listing an existing path (file) should return an empty list:
// https://github.com/apache/arrow-rs/issues/3712
let content_list = flatten_list_stream(storage, Some(&location1))
.await
.unwrap();
assert_eq!(content_list, &[]);
let list = storage.list_with_delimiter(Some(&location1)).await.unwrap();
assert_eq!(list.objects, &[]);
assert_eq!(list.common_prefixes, &[]);
let prefix = Path::from("foo/x");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[]);
let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(list.objects, &[]);
assert_eq!(list.common_prefixes, &[]);
}
/// Tests listing with delimiter
pub async fn list_with_delimiter(storage: &DynObjectStore) {
delete_fixtures(storage).await;
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
// ==================== do: create files ====================
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
"test_file",
"mydb/wb/000/000/000.segment",
"mydb/wb/000/000/001.segment",
"mydb/wb/000/000/002.segment",
"mydb/wb/001/001/000.segment",
"mydb/wb/foo.json",
"mydb/wbwbwb/111/222/333.segment",
"mydb/data/whatevs",
]
.iter()
.map(|&s| Path::from(s))
.collect();
for f in &files {
storage.put(f, data.clone().into()).await.unwrap();
}
// ==================== check: prefix-list `mydb/wb` (directory) ====================
let prefix = Path::from("mydb/wb");
let expected_000 = Path::from("mydb/wb/000");
let expected_001 = Path::from("mydb/wb/001");
let expected_location = Path::from("mydb/wb/foo.json");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
assert_eq!(result.objects.len(), 1);
let object = &result.objects[0];
assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len() as u64);
// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from("mydb/wb/000/000/001");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert_eq!(result.objects.len(), 0);
// ==================== check: prefix-list `not_there` (non-existing prefix) ====================
let prefix = Path::from("not_there");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert!(result.objects.is_empty());
// ==================== do: remove all files ====================
for f in &files {
storage.delete(f).await.unwrap();
}
// ==================== check: store is empty ====================
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
}
/// Tests fetching a non-existent object returns a not found error
pub async fn get_nonexistent_object(
storage: &DynObjectStore,
location: Option<Path>,
) -> crate::Result<Bytes> {
let location = location.unwrap_or_else(|| Path::from("this_file_should_not_exist"));
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));
storage.get(&location).await?.bytes().await
}
/// Tests copying
pub async fn rename_and_copy(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
// copy() make both objects identical
storage.put(&path1, contents1.clone().into()).await.unwrap();
storage.put(&path2, contents2.clone().into()).await.unwrap();
storage.copy(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
// rename() copies contents and deletes original
storage.put(&path1, contents1.clone().into()).await.unwrap();
storage.put(&path2, contents2.clone().into()).await.unwrap();
storage.rename(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), Error::NotFound { .. }));
// Clean up
storage.delete(&path2).await.unwrap();
}
/// Tests copy if not exists
pub async fn copy_if_not_exists(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("not_exists_nested/test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
// copy_if_not_exists() errors if destination already exists
storage.put(&path1, contents1.clone().into()).await.unwrap();
storage.put(&path2, contents2.clone().into()).await.unwrap();
let result = storage.copy_if_not_exists(&path1, &path2).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
crate::Error::AlreadyExists { .. }
));
// copy_if_not_exists() copies contents and allows deleting original
storage.delete(&path2).await.unwrap();
storage.copy_if_not_exists(&path1, &path2).await.unwrap();
storage.delete(&path1).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
storage.delete(&path2).await.unwrap();
}
/// Tests copy and renaming behaviour of non-existent objects
pub async fn copy_rename_nonexistent_object(storage: &DynObjectStore) {
// Create empty source object
let path1 = Path::from("test1");
// Create destination object
let path2 = Path::from("test2");
storage.put(&path2, "hello".into()).await.unwrap();
// copy() errors if source does not exist
let result = storage.copy(&path1, &path2).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// rename() errors if source does not exist
let result = storage.rename(&path1, &path2).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// copy_if_not_exists() errors if source does not exist
let result = storage.copy_if_not_exists(&path1, &path2).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
// Clean up
storage.delete(&path2).await.unwrap();
}
/// Tests [`MultipartStore`]
pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore) {
let path = Path::from("test_multipart");
let chunk_size = 5 * 1024 * 1024;
let chunks = get_chunks(chunk_size, 2);
let id = multipart.create_multipart(&path).await.unwrap();
let parts: Vec<_> = futures::stream::iter(chunks)
.enumerate()
.map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
.buffered(2)
.try_collect()
.await
.unwrap();
multipart
.complete_multipart(&path, &id, parts)
.await
.unwrap();
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, chunk_size as u64 * 2);
// Empty case
let path = Path::from("test_empty_multipart");
let id = multipart.create_multipart(&path).await.unwrap();
let parts = vec![];
multipart
.complete_multipart(&path, &id, parts)
.await
.unwrap();
let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, 0);
}
async fn delete_fixtures(storage: &DynObjectStore) {
let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
storage
.delete_stream(paths)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
/// Tests a race condition where 2 threads are performing multipart writes to the same path
pub async fn multipart_race_condition(storage: &dyn ObjectStore, last_writer_wins: bool) {
let path = Path::from("test_multipart_race_condition");
let mut multipart_upload_1 = storage.put_multipart(&path).await.unwrap();
let mut multipart_upload_2 = storage.put_multipart(&path).await.unwrap();
/// Create a string like `"1:"` followed by `part` padded to 5,300,000 places
///
/// equivalent of format!("{prefix}:{part:05300000}"), which is no longer supported
///
/// See: <https://github.com/apache/arrow-rs-object-store/issues/343>
fn make_payload(prefix: u8, part: u8) -> Vec<u8> {
// prefix = 1 byte
// ':' = 1 byte
let mut payload = vec![b'0'; 5_300_002];
payload[0] = prefix;
payload[1] = b':';
payload[2] = part;
payload
}
// Upload parts interleaved
multipart_upload_1
.put_part(Bytes::from(make_payload(b'1', 0)).into())
.await
.unwrap();
multipart_upload_2
.put_part(Bytes::from(make_payload(b'2', 0)).into())
.await
.unwrap();
multipart_upload_2
.put_part(Bytes::from(make_payload(b'2', 1)).into())
.await
.unwrap();
multipart_upload_1
.put_part(Bytes::from(make_payload(b'1', 1)).into())
.await
.unwrap();
multipart_upload_1
.put_part(Bytes::from(make_payload(b'1', 2)).into())
.await
.unwrap();
multipart_upload_2
.put_part(Bytes::from(make_payload(b'2', 2)).into())
.await
.unwrap();
multipart_upload_2
.put_part(Bytes::from(make_payload(b'2', 3)).into())
.await
.unwrap();
multipart_upload_1
.put_part(Bytes::from(make_payload(b'1', 3)).into())
.await
.unwrap();
multipart_upload_1
.put_part(Bytes::from(make_payload(b'1', 4)).into())
.await
.unwrap();
multipart_upload_2
.put_part(Bytes::from(make_payload(b'2', 4)).into())
.await
.unwrap();
multipart_upload_1.complete().await.unwrap();
if last_writer_wins {
multipart_upload_2.complete().await.unwrap();
} else {
let err = multipart_upload_2.complete().await.unwrap_err();
assert!(matches!(err, crate::Error::Generic { .. }), "{err}");
}
let get_result = storage.get(&path).await.unwrap();
let result_bytes = get_result.bytes().await.unwrap();
let expected_writer_prefix = if last_writer_wins { b'2' } else { b'1' };
let mut expected_writer_contents = vec![];
for part in 0..5 {
expected_writer_contents.append(&mut make_payload(expected_writer_prefix, part));
}
assert!(result_bytes.starts_with(&expected_writer_contents));
}
/// Tests performing out of order multipart uploads
pub async fn multipart_out_of_order(storage: &dyn ObjectStore) {
let path = Path::from("test_multipart_out_of_order");
let mut multipart_upload = storage.put_multipart(&path).await.unwrap();
let part1 = std::iter::repeat(b'1')
.take(5 * 1024 * 1024)
.collect::<Bytes>();
let part2 = std::iter::repeat(b'2')
.take(5 * 1024 * 1024)
.collect::<Bytes>();
let part3 = std::iter::repeat(b'3')
.take(5 * 1024 * 1024)
.collect::<Bytes>();
let full = [part1.as_ref(), part2.as_ref(), part3.as_ref()].concat();
let fut1 = multipart_upload.put_part(part1.into());
let fut2 = multipart_upload.put_part(part2.into());
let fut3 = multipart_upload.put_part(part3.into());
// note order is 2,3,1 , different than the parts were created in
fut2.await.unwrap();
fut3.await.unwrap();
fut1.await.unwrap();
multipart_upload.complete().await.unwrap();
let result = storage.get(&path).await.unwrap();
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, full);
}
/// Tests [`PaginatedListStore`]
pub async fn list_paginated(storage: &dyn ObjectStore, list: &dyn PaginatedListStore) {
delete_fixtures(storage).await;
let r = list.list_paginated(None, Default::default()).await.unwrap();
assert_eq!(r.page_token, None);
assert_eq!(r.result.objects, vec![]);
assert_eq!(r.result.common_prefixes, vec![]);
let p1 = Path::from("foo/bar");
let p2 = Path::from("foo/bax");
let p3 = Path::from("foo/baz/bar");
let p4 = Path::from("foo/baz/banana");
let p5 = Path::from("fob/banana");
let p6 = Path::from("fongle/banana");
let paths = HashSet::from_iter([&p1, &p2, &p3, &p4, &p5, &p6]);
for path in &paths {
storage.put(path, vec![1].into()).await.unwrap();
}
// Test basic listing
let mut listed = HashSet::new();
let mut opts = PaginatedListOptions {
max_keys: Some(5),
..Default::default()
};
let ret = list.list_paginated(None, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 5);
listed.extend(ret.result.objects.iter().map(|x| &x.location));
opts.page_token = Some(ret.page_token.unwrap());
let ret = list.list_paginated(None, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 1);
listed.extend(ret.result.objects.iter().map(|x| &x.location));
assert_eq!(listed, paths);
// List with prefix
let prefix = Some("foo/");
opts.page_token = None;
let ret = list.list_paginated(prefix, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 4);
assert!(ret.page_token.is_none());
let actual = HashSet::from_iter(ret.result.objects.iter().map(|x| &x.location));
assert_eq!(actual, HashSet::<&Path>::from_iter([&p1, &p2, &p3, &p4]));
// List with partial prefix
let prefix = Some("fo");
opts.page_token = None;
let ret = list.list_paginated(prefix, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 5);
listed.extend(ret.result.objects.iter().map(|x| &x.location));
opts.page_token = Some(ret.page_token.unwrap());
let ret = list.list_paginated(prefix, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 1);
listed.extend(ret.result.objects.iter().map(|x| &x.location));
assert_eq!(listed, paths);
// List with prefix and delimiter
let prefix = Some("foo/");
opts.page_token = None;
opts.delimiter = Some("/".into());
let ret = list.list_paginated(prefix, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 2);
assert_eq!(ret.result.common_prefixes, vec![Path::from("foo/baz")]);
assert!(ret.page_token.is_none());
let actual = HashSet::from_iter(ret.result.objects.iter().map(|x| &x.location));
assert_eq!(actual, HashSet::<&Path>::from_iter([&p1, &p2]));
// List with partial prefix and delimiter
let prefix = Some("fo");
opts.page_token = None;
opts.delimiter = Some("/".into());
let ret = list.list_paginated(prefix, opts.clone()).await.unwrap();
assert_eq!(ret.result.objects.len(), 0);
assert_eq!(
HashSet::<Path>::from_iter(ret.result.common_prefixes),
HashSet::from_iter([Path::from("foo"), Path::from("fob"), Path::from("fongle")])
);
assert!(ret.page_token.is_none());
}