| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! An in-memory object store implementation |
| use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; |
| use async_trait::async_trait; |
| use bytes::Bytes; |
| use chrono::Utc; |
| use futures::{stream::BoxStream, StreamExt}; |
| use parking_lot::RwLock; |
| use snafu::{ensure, OptionExt, Snafu}; |
| use std::collections::BTreeMap; |
| use std::collections::BTreeSet; |
| use std::ops::Range; |
| |
| /// A specialized `Error` for in-memory object store-related errors |
| #[derive(Debug, Snafu)] |
| #[allow(missing_docs)] |
| enum Error { |
| #[snafu(display("No data in memory found. Location: {path}"))] |
| NoDataInMemory { path: String }, |
| |
| #[snafu(display("Out of range"))] |
| OutOfRange, |
| |
| #[snafu(display("Bad range"))] |
| BadRange, |
| |
| #[snafu(display("Object already exists at that location: {path}"))] |
| AlreadyExists { path: String }, |
| } |
| |
| impl From<Error> for super::Error { |
| fn from(source: Error) -> Self { |
| match source { |
| Error::NoDataInMemory { ref path } => Self::NotFound { |
| path: path.into(), |
| source: source.into(), |
| }, |
| Error::AlreadyExists { ref path } => Self::AlreadyExists { |
| path: path.into(), |
| source: source.into(), |
| }, |
| _ => Self::Generic { |
| store: "InMemory", |
| source: Box::new(source), |
| }, |
| } |
| } |
| } |
| |
| /// In-memory storage suitable for testing or for opting out of using a cloud |
| /// storage provider. |
| #[derive(Debug, Default)] |
| pub struct InMemory { |
| storage: RwLock<BTreeMap<Path, Bytes>>, |
| } |
| |
| impl std::fmt::Display for InMemory { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "InMemory") |
| } |
| } |
| |
| #[async_trait] |
| impl ObjectStore for InMemory { |
| async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { |
| self.storage.write().insert(location.clone(), bytes); |
| Ok(()) |
| } |
| |
| async fn get(&self, location: &Path) -> Result<GetResult> { |
| let data = self.get_bytes(location).await?; |
| |
| Ok(GetResult::Stream( |
| futures::stream::once(async move { Ok(data) }).boxed(), |
| )) |
| } |
| |
| async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> { |
| let data = self.get_bytes(location).await?; |
| ensure!(range.end <= data.len(), OutOfRangeSnafu); |
| ensure!(range.start <= range.end, BadRangeSnafu); |
| |
| Ok(data.slice(range)) |
| } |
| |
| async fn head(&self, location: &Path) -> Result<ObjectMeta> { |
| let last_modified = Utc::now(); |
| let bytes = self.get_bytes(location).await?; |
| Ok(ObjectMeta { |
| location: location.clone(), |
| last_modified, |
| size: bytes.len(), |
| }) |
| } |
| |
| async fn delete(&self, location: &Path) -> Result<()> { |
| self.storage.write().remove(location); |
| Ok(()) |
| } |
| |
| async fn list( |
| &self, |
| prefix: Option<&Path>, |
| ) -> Result<BoxStream<'_, Result<ObjectMeta>>> { |
| let last_modified = Utc::now(); |
| |
| let storage = self.storage.read(); |
| let values: Vec<_> = storage |
| .iter() |
| .filter(move |(key, _)| prefix.map(|p| key.prefix_matches(p)).unwrap_or(true)) |
| .map(move |(key, value)| { |
| Ok(ObjectMeta { |
| location: key.clone(), |
| last_modified, |
| size: value.len(), |
| }) |
| }) |
| .collect(); |
| |
| Ok(futures::stream::iter(values).boxed()) |
| } |
| |
| /// The memory implementation returns all results, as opposed to the cloud |
| /// versions which limit their results to 1k or more because of API |
| /// limitations. |
| async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { |
| let root = Path::default(); |
| let prefix = prefix.unwrap_or(&root); |
| |
| let mut common_prefixes = BTreeSet::new(); |
| let last_modified = Utc::now(); |
| |
| // Only objects in this base level should be returned in the |
| // response. Otherwise, we just collect the common prefixes. |
| let mut objects = vec![]; |
| for (k, v) in self.storage.read().range((prefix)..) { |
| let mut parts = match k.prefix_match(prefix) { |
| Some(parts) => parts, |
| None => break, |
| }; |
| |
| // Pop first element |
| let common_prefix = match parts.next() { |
| Some(p) => p, |
| None => continue, |
| }; |
| |
| if parts.next().is_some() { |
| common_prefixes.insert(prefix.child(common_prefix)); |
| } else { |
| let object = ObjectMeta { |
| location: k.clone(), |
| last_modified, |
| size: v.len(), |
| }; |
| objects.push(object); |
| } |
| } |
| |
| Ok(ListResult { |
| objects, |
| common_prefixes: common_prefixes.into_iter().collect(), |
| }) |
| } |
| |
| async fn copy(&self, from: &Path, to: &Path) -> Result<()> { |
| let data = self.get_bytes(from).await?; |
| self.storage.write().insert(to.clone(), data); |
| Ok(()) |
| } |
| |
| async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { |
| let data = self.get_bytes(from).await?; |
| let mut storage = self.storage.write(); |
| if storage.contains_key(to) { |
| return Err(Error::AlreadyExists { |
| path: to.to_string(), |
| } |
| .into()); |
| } |
| storage.insert(to.clone(), data); |
| Ok(()) |
| } |
| } |
| |
| impl InMemory { |
| /// Create new in-memory storage. |
| pub fn new() -> Self { |
| Self::default() |
| } |
| |
| /// Creates a clone of the store |
| pub async fn clone(&self) -> Self { |
| let storage = self.storage.read(); |
| let storage = storage.clone(); |
| |
| Self { |
| storage: RwLock::new(storage), |
| } |
| } |
| |
| async fn get_bytes(&self, location: &Path) -> Result<Bytes> { |
| let storage = self.storage.read(); |
| let bytes = storage |
| .get(location) |
| .cloned() |
| .context(NoDataInMemorySnafu { |
| path: location.to_string(), |
| })?; |
| Ok(bytes) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use crate::{ |
| tests::{ |
| copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly, |
| list_with_delimiter, put_get_delete_list, rename_and_copy, |
| }, |
| Error as ObjectStoreError, ObjectStore, |
| }; |
| |
| #[tokio::test] |
| async fn in_memory_test() { |
| let integration = InMemory::new(); |
| |
| put_get_delete_list(&integration).await.unwrap(); |
| list_uses_directories_correctly(&integration).await.unwrap(); |
| list_with_delimiter(&integration).await.unwrap(); |
| rename_and_copy(&integration).await.unwrap(); |
| copy_if_not_exists(&integration).await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn unknown_length() { |
| let integration = InMemory::new(); |
| |
| let location = Path::from("some_file"); |
| |
| let data = Bytes::from("arbitrary data"); |
| let expected_data = data.clone(); |
| |
| integration.put(&location, data).await.unwrap(); |
| |
| let read_data = integration |
| .get(&location) |
| .await |
| .unwrap() |
| .bytes() |
| .await |
| .unwrap(); |
| assert_eq!(&*read_data, expected_data); |
| } |
| |
| const NON_EXISTENT_NAME: &str = "nonexistentname"; |
| |
| #[tokio::test] |
| async fn nonexistent_location() { |
| let integration = InMemory::new(); |
| |
| let location = Path::from(NON_EXISTENT_NAME); |
| |
| let err = get_nonexistent_object(&integration, Some(location)) |
| .await |
| .unwrap_err(); |
| if let ObjectStoreError::NotFound { path, source } = err { |
| let source_variant = source.downcast_ref::<Error>(); |
| assert!( |
| matches!(source_variant, Some(Error::NoDataInMemory { .. }),), |
| "got: {:?}", |
| source_variant |
| ); |
| assert_eq!(path, NON_EXISTENT_NAME); |
| } else { |
| panic!("unexpected error type: {:?}", err); |
| } |
| } |
| } |