| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An in-memory object store implementation |
| use std::collections::{BTreeMap, BTreeSet, HashMap}; |
| use std::ops::Range; |
| use std::sync::Arc; |
| |
| use async_trait::async_trait; |
| use bytes::Bytes; |
| use chrono::{DateTime, Utc}; |
| use futures::{stream::BoxStream, StreamExt}; |
| use parking_lot::RwLock; |
| use snafu::{OptionExt, ResultExt, Snafu}; |
| |
| use crate::multipart::{MultipartStore, PartId}; |
| use crate::util::InvalidGetRange; |
| use crate::{ |
| path::Path, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, |
| ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, UpdateVersion, UploadPart, |
| }; |
| use crate::{GetOptions, PutPayload}; |
| |
| /// A specialized `Error` for in-memory object store-related errors |
| #[derive(Debug, Snafu)] |
| #[allow(missing_docs)] |
| enum Error { |
| #[snafu(display("No data in memory found. Location: {path}"))] |
| NoDataInMemory { path: String }, |
| |
| #[snafu(display("Invalid range: {source}"))] |
| Range { source: InvalidGetRange }, |
| |
| #[snafu(display("Object already exists at that location: {path}"))] |
| AlreadyExists { path: String }, |
| |
| #[snafu(display("ETag required for conditional update"))] |
| MissingETag, |
| |
| #[snafu(display("MultipartUpload not found: {id}"))] |
| UploadNotFound { id: String }, |
| |
| #[snafu(display("Missing part at index: {part}"))] |
| MissingPart { part: usize }, |
| } |
| |
| impl From<Error> for super::Error { |
| fn from(source: Error) -> Self { |
| match source { |
| Error::NoDataInMemory { ref path } => Self::NotFound { |
| path: path.into(), |
| source: source.into(), |
| }, |
| Error::AlreadyExists { ref path } => Self::AlreadyExists { |
| path: path.into(), |
| source: source.into(), |
| }, |
| _ => Self::Generic { |
| store: "InMemory", |
| source: Box::new(source), |
| }, |
| } |
| } |
| } |
| |
| /// In-memory storage suitable for testing or for opting out of using a cloud |
| /// storage provider. |
| #[derive(Debug, Default)] |
| pub struct InMemory { |
| storage: SharedStorage, |
| } |
| |
| #[derive(Debug, Clone)] |
| struct Entry { |
| data: Bytes, |
| last_modified: DateTime<Utc>, |
| e_tag: usize, |
| } |
| |
| impl Entry { |
| fn new(data: Bytes, last_modified: DateTime<Utc>, e_tag: usize) -> Self { |
| Self { |
| data, |
| last_modified, |
| e_tag, |
| } |
| } |
| } |
| |
| #[derive(Debug, Default, Clone)] |
| struct Storage { |
| next_etag: usize, |
| map: BTreeMap<Path, Entry>, |
| uploads: HashMap<usize, PartStorage>, |
| } |
| |
| #[derive(Debug, Default, Clone)] |
| struct PartStorage { |
| parts: Vec<Option<Bytes>>, |
| } |
| |
| type SharedStorage = Arc<RwLock<Storage>>; |
| |
| impl Storage { |
| fn insert(&mut self, location: &Path, bytes: Bytes) -> usize { |
| let etag = self.next_etag; |
| self.next_etag += 1; |
| let entry = Entry::new(bytes, Utc::now(), etag); |
| self.overwrite(location, entry); |
| etag |
| } |
| |
| fn overwrite(&mut self, location: &Path, entry: Entry) { |
| self.map.insert(location.clone(), entry); |
| } |
| |
| fn create(&mut self, location: &Path, entry: Entry) -> Result<()> { |
| use std::collections::btree_map; |
| match self.map.entry(location.clone()) { |
| btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists { |
| path: location.to_string(), |
| } |
| .into()), |
| btree_map::Entry::Vacant(v) => { |
| v.insert(entry); |
| Ok(()) |
| } |
| } |
| } |
| |
| fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> Result<()> { |
| match self.map.get_mut(location) { |
| // Return Precondition instead of NotFound for consistency with stores |
| None => Err(crate::Error::Precondition { |
| path: location.to_string(), |
| source: format!("Object at location {location} not found").into(), |
| }), |
| Some(e) => { |
| let existing = e.e_tag.to_string(); |
| let expected = v.e_tag.context(MissingETagSnafu)?; |
| if existing == expected { |
| *e = entry; |
| Ok(()) |
| } else { |
| Err(crate::Error::Precondition { |
| path: location.to_string(), |
| source: format!("{existing} does not match {expected}").into(), |
| }) |
| } |
| } |
| } |
| } |
| |
| fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> { |
| let parts = id |
| .parse() |
| .ok() |
| .and_then(|x| self.uploads.get_mut(&x)) |
| .context(UploadNotFoundSnafu { id })?; |
| Ok(parts) |
| } |
| |
| fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> { |
| let parts = id |
| .parse() |
| .ok() |
| .and_then(|x| self.uploads.remove(&x)) |
| .context(UploadNotFoundSnafu { id })?; |
| Ok(parts) |
| } |
| } |
| |
| impl std::fmt::Display for InMemory { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "InMemory") |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for InMemory { |
| async fn put_opts( |
| &self, |
| location: &Path, |
| payload: PutPayload, |
| opts: PutOptions, |
| ) -> Result<PutResult> { |
| let mut storage = self.storage.write(); |
| let etag = storage.next_etag; |
| let entry = Entry::new(payload.into(), Utc::now(), etag); |
| |
| match opts.mode { |
| PutMode::Overwrite => storage.overwrite(location, entry), |
| PutMode::Create => storage.create(location, entry)?, |
| PutMode::Update(v) => storage.update(location, v, entry)?, |
| } |
| storage.next_etag += 1; |
| |
| Ok(PutResult { |
| e_tag: Some(etag.to_string()), |
| version: None, |
| }) |
| } |
| |
| async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> { |
| Ok(Box::new(InMemoryUpload { |
| location: location.clone(), |
| parts: vec![], |
| storage: Arc::clone(&self.storage), |
| })) |
| } |
| |
| async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> { |
| let entry = self.entry(location).await?; |
| let e_tag = entry.e_tag.to_string(); |
| |
| let meta = ObjectMeta { |
| location: location.clone(), |
| last_modified: entry.last_modified, |
| size: entry.data.len(), |
| e_tag: Some(e_tag), |
| version: None, |
| }; |
| options.check_preconditions(&meta)?; |
| |
| let (range, data) = match options.range { |
| Some(range) => { |
| let r = range.as_range(entry.data.len()).context(RangeSnafu)?; |
| (r.clone(), entry.data.slice(r)) |
| } |
| None => (0..entry.data.len(), entry.data), |
| }; |
| let stream = futures::stream::once(futures::future::ready(Ok(data))); |
| |
| Ok(GetResult { |
| payload: GetResultPayload::Stream(stream.boxed()), |
| meta, |
| range, |
| }) |
| } |
| |
| async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> { |
| let entry = self.entry(location).await?; |
| ranges |
| .iter() |
| .map(|range| { |
| let r = GetRange::Bounded(range.clone()) |
| .as_range(entry.data.len()) |
| .context(RangeSnafu)?; |
| |
| Ok(entry.data.slice(r)) |
| }) |
| .collect() |
| } |
| |
| async fn head(&self, location: &Path) -> Result<ObjectMeta> { |
| let entry = self.entry(location).await?; |
| |
| Ok(ObjectMeta { |
| location: location.clone(), |
| last_modified: entry.last_modified, |
| size: entry.data.len(), |
| e_tag: Some(entry.e_tag.to_string()), |
| version: None, |
| }) |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| self.storage.write().map.remove(location); |
| Ok(()) |
| } |
| |
| fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> { |
| let root = Path::default(); |
| let prefix = prefix.unwrap_or(&root); |
| |
| let storage = self.storage.read(); |
| let values: Vec<_> = storage |
| .map |
| .range((prefix)..) |
| .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref())) |
| .filter(|(key, _)| { |
| // Don't return for exact prefix match |
| key.prefix_match(prefix) |
| .map(|mut x| x.next().is_some()) |
| .unwrap_or(false) |
| }) |
| .map(|(key, value)| { |
| Ok(ObjectMeta { |
| location: key.clone(), |
| last_modified: value.last_modified, |
| size: value.data.len(), |
| e_tag: Some(value.e_tag.to_string()), |
| version: None, |
| }) |
| }) |
| .collect(); |
| |
| futures::stream::iter(values).boxed() |
| } |
| |
| /// The memory implementation returns all results, as opposed to the cloud |
| /// versions which limit their results to 1k or more because of API |
| /// limitations. |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| let root = Path::default(); |
| let prefix = prefix.unwrap_or(&root); |
| |
| let mut common_prefixes = BTreeSet::new(); |
| |
| // Only objects in this base level should be returned in the |
| // response. Otherwise, we just collect the common prefixes. |
| let mut objects = vec![]; |
| for (k, v) in self.storage.read().map.range((prefix)..) { |
| if !k.as_ref().starts_with(prefix.as_ref()) { |
| break; |
| } |
| |
| let mut parts = match k.prefix_match(prefix) { |
| Some(parts) => parts, |
| None => continue, |
| }; |
| |
| // Pop first element |
| let common_prefix = match parts.next() { |
| Some(p) => p, |
| // Should only return children of the prefix |
| None => continue, |
| }; |
| |
| if parts.next().is_some() { |
| common_prefixes.insert(prefix.child(common_prefix)); |
| } else { |
| let object = ObjectMeta { |
| location: k.clone(), |
| last_modified: v.last_modified, |
| size: v.data.len(), |
| e_tag: Some(v.e_tag.to_string()), |
| version: None, |
| }; |
| objects.push(object); |
| } |
| } |
| |
| Ok(ListResult { |
| objects, |
| common_prefixes: common_prefixes.into_iter().collect(), |
| }) |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| let entry = self.entry(from).await?; |
| self.storage.write().insert(to, entry.data); |
| Ok(()) |
| } |
| |
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
| let entry = self.entry(from).await?; |
| let mut storage = self.storage.write(); |
| if storage.map.contains_key(to) { |
| return Err(Error::AlreadyExists { |
| path: to.to_string(), |
| } |
| .into()); |
| } |
| storage.insert(to, entry.data); |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl MultipartStore for InMemory { |
| async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> { |
| let mut storage = self.storage.write(); |
| let etag = storage.next_etag; |
| storage.next_etag += 1; |
| storage.uploads.insert(etag, Default::default()); |
| Ok(etag.to_string()) |
| } |
| |
| async fn put_part( |
| &self, |
| _path: &Path, |
| id: &MultipartId, |
| part_idx: usize, |
| payload: PutPayload, |
| ) -> Result<PartId> { |
| let mut storage = self.storage.write(); |
| let upload = storage.upload_mut(id)?; |
| if part_idx <= upload.parts.len() { |
| upload.parts.resize(part_idx + 1, None); |
| } |
| upload.parts[part_idx] = Some(payload.into()); |
| Ok(PartId { |
| content_id: Default::default(), |
| }) |
| } |
| |
| async fn complete_multipart( |
| &self, |
| path: &Path, |
| id: &MultipartId, |
| _parts: Vec<PartId>, |
| ) -> Result<PutResult> { |
| let mut storage = self.storage.write(); |
| let upload = storage.remove_upload(id)?; |
| |
| let mut cap = 0; |
| for (part, x) in upload.parts.iter().enumerate() { |
| cap += x.as_ref().context(MissingPartSnafu { part })?.len(); |
| } |
| let mut buf = Vec::with_capacity(cap); |
| for x in &upload.parts { |
| buf.extend_from_slice(x.as_ref().unwrap()) |
| } |
| let etag = storage.insert(path, buf.into()); |
| Ok(PutResult { |
| e_tag: Some(etag.to_string()), |
| version: None, |
| }) |
| } |
| |
| async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> { |
| self.storage.write().remove_upload(id)?; |
| Ok(()) |
| } |
| } |
| |
| impl InMemory { |
| /// Create new in-memory storage. |
| pub fn new() -> Self { |
| Self::default() |
| } |
| |
| /// Creates a fork of the store, with the current content copied into the |
| /// new store. |
| pub fn fork(&self) -> Self { |
| let storage = self.storage.read(); |
| let storage = Arc::new(RwLock::new(storage.clone())); |
| Self { storage } |
| } |
| |
| /// Creates a clone of the store |
| #[deprecated(note = "Use fork() instead")] |
| pub async fn clone(&self) -> Self { |
| self.fork() |
| } |
| |
| async fn entry(&self, location: &Path) -> Result<Entry> { |
| let storage = self.storage.read(); |
| let value = storage |
| .map |
| .get(location) |
| .cloned() |
| .context(NoDataInMemorySnafu { |
| path: location.to_string(), |
| })?; |
| |
| Ok(value) |
| } |
| } |
| |
| #[derive(Debug)] |
| struct InMemoryUpload { |
| location: Path, |
| parts: Vec<PutPayload>, |
| storage: Arc<RwLock<Storage>>, |
| } |
| |
| #[async_trait] |
| impl MultipartUpload for InMemoryUpload { |
| fn put_part(&mut self, payload: PutPayload) -> UploadPart { |
| self.parts.push(payload); |
| Box::pin(futures::future::ready(Ok(()))) |
| } |
| |
| async fn complete(&mut self) -> Result<PutResult> { |
| let cap = self.parts.iter().map(|x| x.content_length()).sum(); |
| let mut buf = Vec::with_capacity(cap); |
| let parts = self.parts.iter().flatten(); |
| parts.for_each(|x| buf.extend_from_slice(x)); |
| let etag = self.storage.write().insert(&self.location, buf.into()); |
| Ok(PutResult { |
| e_tag: Some(etag.to_string()), |
| version: None, |
| }) |
| } |
| |
| async fn abort(&mut self) -> Result<()> { |
| Ok(()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use crate::tests::*; |
| |
| use super::*; |
| |
| #[tokio::test] |
| async fn in_memory_test() { |
| let integration = InMemory::new(); |
| |
| put_get_delete_list(&integration).await; |
| get_opts(&integration).await; |
| list_uses_directories_correctly(&integration).await; |
| list_with_delimiter(&integration).await; |
| rename_and_copy(&integration).await; |
| copy_if_not_exists(&integration).await; |
| stream_get(&integration).await; |
| put_opts(&integration, true).await; |
| multipart(&integration, &integration).await; |
| } |
| |
| #[tokio::test] |
| async fn box_test() { |
| let integration: Box<dyn ObjectStore> = Box::new(InMemory::new()); |
| |
| put_get_delete_list(&integration).await; |
| get_opts(&integration).await; |
| list_uses_directories_correctly(&integration).await; |
| list_with_delimiter(&integration).await; |
| rename_and_copy(&integration).await; |
| copy_if_not_exists(&integration).await; |
| stream_get(&integration).await; |
| } |
| |
| #[tokio::test] |
| async fn arc_test() { |
| let integration: Arc<dyn ObjectStore> = Arc::new(InMemory::new()); |
| |
| put_get_delete_list(&integration).await; |
| get_opts(&integration).await; |
| list_uses_directories_correctly(&integration).await; |
| list_with_delimiter(&integration).await; |
| rename_and_copy(&integration).await; |
| copy_if_not_exists(&integration).await; |
| stream_get(&integration).await; |
| } |
| |
| #[tokio::test] |
| async fn unknown_length() { |
| let integration = InMemory::new(); |
| |
| let location = Path::from("some_file"); |
| |
| let data = Bytes::from("arbitrary data"); |
| |
| integration |
| .put(&location, data.clone().into()) |
| .await |
| .unwrap(); |
| |
| let read_data = integration |
| .get(&location) |
| .await |
| .unwrap() |
| .bytes() |
| .await |
| .unwrap(); |
| assert_eq!(&*read_data, data); |
| } |
| |
| const NON_EXISTENT_NAME: &str = "nonexistentname"; |
| |
| #[tokio::test] |
| async fn nonexistent_location() { |
| let integration = InMemory::new(); |
| |
| let location = Path::from(NON_EXISTENT_NAME); |
| |
| let err = get_nonexistent_object(&integration, Some(location)) |
| .await |
| .unwrap_err(); |
| if let crate::Error::NotFound { path, source } = err { |
| let source_variant = source.downcast_ref::<Error>(); |
| assert!( |
| matches!(source_variant, Some(Error::NoDataInMemory { .. }),), |
| "got: {source_variant:?}" |
| ); |
| assert_eq!(path, NON_EXISTENT_NAME); |
| } else { |
| panic!("unexpected error type: {err:?}"); |
| } |
| } |
| } |