blob: d42e6f231c04658d59504f4859f2952df885f958 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! An in-memory object store implementation
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
use crate::multipart::{MultipartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart,
};
use crate::{GetOptions, PutPayload};
/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("No data in memory found. Location: {path}"))]
NoDataInMemory { path: String },
#[snafu(display("Invalid range: {source}"))]
Range { source: InvalidGetRange },
#[snafu(display("Object already exists at that location: {path}"))]
AlreadyExists { path: String },
#[snafu(display("ETag required for conditional update"))]
MissingETag,
#[snafu(display("MultipartUpload not found: {id}"))]
UploadNotFound { id: String },
#[snafu(display("Missing part at index: {part}"))]
MissingPart { part: usize },
}
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {
Error::NoDataInMemory { ref path } => Self::NotFound {
path: path.into(),
source: source.into(),
},
Error::AlreadyExists { ref path } => Self::AlreadyExists {
path: path.into(),
source: source.into(),
},
_ => Self::Generic {
store: "InMemory",
source: Box::new(source),
},
}
}
}
/// In-memory storage suitable for testing or for opting out of using a cloud
/// storage provider.
#[derive(Debug, Default)]
pub struct InMemory {
storage: SharedStorage,
}
#[derive(Debug, Clone)]
struct Entry {
data: Bytes,
last_modified: DateTime<Utc>,
e_tag: usize,
}
impl Entry {
fn new(data: Bytes, last_modified: DateTime<Utc>, e_tag: usize) -> Self {
Self {
data,
last_modified,
e_tag,
}
}
}
#[derive(Debug, Default, Clone)]
struct Storage {
next_etag: usize,
map: BTreeMap<Path, Entry>,
uploads: HashMap<usize, PartStorage>,
}
#[derive(Debug, Default, Clone)]
struct PartStorage {
parts: Vec<Option<Bytes>>,
}
type SharedStorage = Arc<RwLock<Storage>>;
impl Storage {
fn insert(&mut self, location: &Path, bytes: Bytes) -> usize {
let etag = self.next_etag;
self.next_etag += 1;
let entry = Entry::new(bytes, Utc::now(), etag);
self.overwrite(location, entry);
etag
}
fn overwrite(&mut self, location: &Path, entry: Entry) {
self.map.insert(location.clone(), entry);
}
fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
use std::collections::btree_map;
match self.map.entry(location.clone()) {
btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists {
path: location.to_string(),
}
.into()),
btree_map::Entry::Vacant(v) => {
v.insert(entry);
Ok(())
}
}
}
fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> {
match self.map.get_mut(location) {
// Return Precondition instead of NotFound for consistency with stores
None => Err(crate::Error::Precondition {
path: location.to_string(),
source: format!("Object at location {location} not found").into(),
}),
Some(e) => {
let existing = e.e_tag.to_string();
let expected = v.e_tag.context(MissingETagSnafu)?;
if existing == expected {
*e = entry;
Ok(())
} else {
Err(crate::Error::Precondition {
path: location.to_string(),
source: format!("{existing} does not match {expected}").into(),
})
}
}
}
}
fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
let parts = id
.parse()
.ok()
.and_then(|x| self.uploads.get_mut(&x))
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}
fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
let parts = id
.parse()
.ok()
.and_then(|x| self.uploads.remove(&x))
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}
}
impl std::fmt::Display for InMemory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "InMemory")
}
}
#[async_trait]
impl ObjectStore for InMemory {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
let entry = Entry::new(payload.into(), Utc::now(), etag);
match opts.mode {
PutMode::Overwrite => storage.overwrite(location, entry),
PutMode::Create => storage.create(location, entry)?,
PutMode::Update(v) => storage.update(location, v, entry)?,
}
storage.next_etag += 1;
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
}
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
Ok(Box::new(InMemoryUpload {
location: location.clone(),
parts: vec![],
storage: Arc::clone(&self.storage),
}))
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();
let meta = ObjectMeta {
location: location.clone(),
last_modified: entry.last_modified,
size: entry.data.len(),
e_tag: Some(e_tag),
version: None,
};
options.check_preconditions(&meta)?;
let (range, data) = match options.range {
Some(range) => {
let r = range.as_range(entry.data.len()).context(RangeSnafu)?;
(r.clone(), entry.data.slice(r))
}
None => (0..entry.data.len(), entry.data),
};
let stream = futures::stream::once(futures::future::ready(Ok(data)));
Ok(GetResult {
payload: GetResultPayload::Stream(stream.boxed()),
meta,
range,
})
}
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
let entry = self.entry(location).await?;
ranges
.iter()
.map(|range| {
let r = GetRange::Bounded(range.clone())
.as_range(entry.data.len())
.context(RangeSnafu)?;
Ok(entry.data.slice(r))
})
.collect()
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let entry = self.entry(location).await?;
Ok(ObjectMeta {
location: location.clone(),
last_modified: entry.last_modified,
size: entry.data.len(),
e_tag: Some(entry.e_tag.to_string()),
version: None,
})
}
async fn delete(&self, location: &Path) -> Result<()> {
self.storage.write().map.remove(location);
Ok(())
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
let storage = self.storage.read();
let values: Vec<_> = storage
.map
.range((prefix)..)
.take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
.filter(|(key, _)| {
// Don't return for exact prefix match
key.prefix_match(prefix)
.map(|mut x| x.next().is_some())
.unwrap_or(false)
})
.map(|(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
last_modified: value.last_modified,
size: value.data.len(),
e_tag: Some(value.e_tag.to_string()),
version: None,
})
})
.collect();
futures::stream::iter(values).boxed()
}
/// The memory implementation returns all results, as opposed to the cloud
/// versions which limit their results to 1k or more because of API
/// limitations.
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
let mut common_prefixes = BTreeSet::new();
// Only objects in this base level should be returned in the
// response. Otherwise, we just collect the common prefixes.
let mut objects = vec![];
for (k, v) in self.storage.read().map.range((prefix)..) {
if !k.as_ref().starts_with(prefix.as_ref()) {
break;
}
let mut parts = match k.prefix_match(prefix) {
Some(parts) => parts,
None => continue,
};
// Pop first element
let common_prefix = match parts.next() {
Some(p) => p,
// Should only return children of the prefix
None => continue,
};
if parts.next().is_some() {
common_prefixes.insert(prefix.child(common_prefix));
} else {
let object = ObjectMeta {
location: k.clone(),
last_modified: v.last_modified,
size: v.data.len(),
e_tag: Some(v.e_tag.to_string()),
version: None,
};
objects.push(object);
}
}
Ok(ListResult {
objects,
common_prefixes: common_prefixes.into_iter().collect(),
})
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let entry = self.entry(from).await?;
self.storage.write().insert(to, entry.data);
Ok(())
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let entry = self.entry(from).await?;
let mut storage = self.storage.write();
if storage.map.contains_key(to) {
return Err(Error::AlreadyExists {
path: to.to_string(),
}
.into());
}
storage.insert(to, entry.data);
Ok(())
}
}
#[async_trait]
impl MultipartStore for InMemory {
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
storage.next_etag += 1;
storage.uploads.insert(etag, Default::default());
Ok(etag.to_string())
}
async fn put_part(
&self,
_path: &Path,
id: &MultipartId,
part_idx: usize,
payload: PutPayload,
) -> Result<PartId> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(payload.into());
Ok(PartId {
content_id: Default::default(),
})
}
async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
_parts: Vec<PartId>,
) -> Result<PutResult> {
let mut storage = self.storage.write();
let upload = storage.remove_upload(id)?;
let mut cap = 0;
for (part, x) in upload.parts.iter().enumerate() {
cap += x.as_ref().context(MissingPartSnafu { part })?.len();
}
let mut buf = Vec::with_capacity(cap);
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
let etag = storage.insert(path, buf.into());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
}
async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
self.storage.write().remove_upload(id)?;
Ok(())
}
}
impl InMemory {
/// Create new in-memory storage.
pub fn new() -> Self {
Self::default()
}
/// Creates a fork of the store, with the current content copied into the
/// new store.
pub fn fork(&self) -> Self {
let storage = self.storage.read();
let storage = Arc::new(RwLock::new(storage.clone()));
Self { storage }
}
/// Creates a clone of the store
#[deprecated(note = "Use fork() instead")]
pub async fn clone(&self) -> Self {
self.fork()
}
async fn entry(&self, location: &Path) -> Result<Entry> {
let storage = self.storage.read();
let value = storage
.map
.get(location)
.cloned()
.context(NoDataInMemorySnafu {
path: location.to_string(),
})?;
Ok(value)
}
}
#[derive(Debug)]
struct InMemoryUpload {
location: Path,
parts: Vec<PutPayload>,
storage: Arc<RwLock<Storage>>,
}
#[async_trait]
impl MultipartUpload for InMemoryUpload {
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
self.parts.push(payload);
Box::pin(futures::future::ready(Ok(())))
}
async fn complete(&mut self) -> Result<PutResult> {
let cap = self.parts.iter().map(|x| x.content_length()).sum();
let mut buf = Vec::with_capacity(cap);
let parts = self.parts.iter().flatten();
parts.for_each(|x| buf.extend_from_slice(x));
let etag = self.storage.write().insert(&self.location, buf.into());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
}
async fn abort(&mut self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::tests::*;
use super::*;
#[tokio::test]
async fn in_memory_test() {
let integration = InMemory::new();
put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
}
#[tokio::test]
async fn box_test() {
let integration: Box<dyn ObjectStore> = Box::new(InMemory::new());
put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}
#[tokio::test]
async fn arc_test() {
let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
}
#[tokio::test]
async fn unknown_length() {
let integration = InMemory::new();
let location = Path::from("some_file");
let data = Bytes::from("arbitrary data");
integration
.put(&location, data.clone().into())
.await
.unwrap();
let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, data);
}
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn nonexistent_location() {
let integration = InMemory::new();
let location = Path::from(NON_EXISTENT_NAME);
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let crate::Error::NotFound { path, source } = err {
let source_variant = source.downcast_ref::<Error>();
assert!(
matches!(source_variant, Some(Error::NoDataInMemory { .. }),),
"got: {source_variant:?}"
);
assert_eq!(path, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type: {err:?}");
}
}
}