| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use crate::error::StorageServiceError; |
| use crate::proxy::ProxyRequest; |
| use anyhow::anyhow; |
| use rusty_leveldb::LdbIterator; |
| use rusty_leveldb::DB; |
| use std::cell::RefCell; |
| use teaclave_proto::teaclave_storage_service::*; |
| use teaclave_service_enclave_utils::bail; |
| use tokio::sync::mpsc::UnboundedReceiver; |
| |
| pub(crate) struct TeaclaveStorageService { |
| // Current LevelDB implementation is not concurrent, so we need to wrap the |
| // DB with RefCell. This service is running in a single thread, it's safe to |
| // use RefCell. |
| database: RefCell<DB>, |
| receiver: UnboundedReceiver<ProxyRequest>, |
| } |
| |
| impl TeaclaveStorageService { |
| pub(crate) fn new(database: RefCell<DB>, receiver: UnboundedReceiver<ProxyRequest>) -> Self { |
| Self { database, receiver } |
| } |
| } |
| |
| // queue-key-head: u32; include element |
| // queue-key-tail: u32; not include element; if head == tail, queue is empty |
| // queue-key-index: Vec<u8>; elements |
| // Todo: what if there are errors when doing get_tail and get_head |
| struct DBQueue<'a> { |
| database: &'a mut DB, |
| key: &'a [u8], |
| } |
| |
| impl<'a> DBQueue<'a> { |
| fn get_tail_key(&self) -> Vec<u8> { |
| let mut head_key = b"queue-".to_vec(); |
| head_key.extend_from_slice(self.key); |
| head_key.extend_from_slice(b"-tail"); |
| head_key |
| } |
| fn get_head_key(&self) -> Vec<u8> { |
| let mut head_key = b"queue-".to_vec(); |
| head_key.extend_from_slice(self.key); |
| head_key.extend_from_slice(b"-head"); |
| head_key |
| } |
| fn get_element_key(&self, index: u32) -> Vec<u8> { |
| let mut element_key = b"queue-".to_vec(); |
| element_key.extend_from_slice(self.key); |
| element_key.extend_from_slice(b"-"); |
| element_key.extend_from_slice(&index.to_le_bytes()); |
| element_key |
| } |
| |
| fn get_head(&mut self) -> u32 { |
| let head_key = self.get_head_key(); |
| self.read_u32(&head_key).unwrap_or(0) |
| } |
| |
| fn get_tail(&mut self) -> u32 { |
| let tail_key = self.get_tail_key(); |
| self.read_u32(&tail_key).unwrap_or(0) |
| } |
| |
| fn read_u32(&mut self, key: &[u8]) -> Option<u32> { |
| let element_bytes: Vec<u8> = match self.database.get(key) { |
| Some(bytes) => bytes, |
| None => return None, |
| }; |
| if element_bytes.len() != 4 { |
| return None; |
| } |
| let mut bytes: [u8; 4] = [0; 4]; |
| bytes.copy_from_slice(&element_bytes); |
| Some(u32::from_le_bytes(bytes)) |
| } |
| |
| pub fn open(database: &'a mut DB, key: &'a [u8]) -> Self { |
| DBQueue { database, key } |
| } |
| |
| pub fn enqueue(&mut self, value: &[u8]) -> Result<(), StorageServiceError> { |
| let tail_index = self.get_tail(); |
| // put element |
| self.database |
| .put(&self.get_element_key(tail_index), value)?; |
| |
| // update tail |
| let tail_index = tail_index.wrapping_add(1); |
| self.database |
| .put(&self.get_tail_key(), &tail_index.to_le_bytes())?; |
| |
| self.database.flush()?; |
| Ok(()) |
| } |
| |
| pub fn dequeue(&mut self) -> Result<Vec<u8>, StorageServiceError> { |
| let head_index = self.get_head(); |
| let tail_index = self.get_tail(); |
| // check whether the queue is empty |
| if head_index == tail_index { |
| bail!(StorageServiceError::Service(anyhow!( |
| "head_index == tail_index" |
| ))) |
| } else { |
| let element_key = self.get_element_key(head_index); |
| let result = match self.database.get(&element_key) { |
| Some(value) => value, |
| None => bail!(StorageServiceError::Service(anyhow!( |
| "cannot get element_key" |
| ))), |
| }; |
| |
| // update head |
| let head_index = head_index.wrapping_add(1); |
| self.database |
| .put(&self.get_head_key(), &head_index.to_le_bytes())?; |
| self.database.delete(&element_key)?; |
| self.database.compact_range(b"queue", b"queuf")?; |
| Ok(result) |
| } |
| } |
| |
| #[allow(unused)] |
| pub fn len(&mut self) -> u32 { |
| let head_index = self.get_head(); |
| let tail_index = self.get_tail(); |
| |
| if tail_index >= head_index { |
| tail_index - head_index |
| } else { |
| u32::MAX - head_index + tail_index + 1 |
| } |
| } |
| } |
| |
| impl TeaclaveStorageService { |
| pub(crate) fn start(&mut self) { |
| while let Some(request) = self.receiver.blocking_recv() { |
| let database_request = request.request; |
| let sender = request.sender; |
| let response = self.dispatch(database_request); |
| |
| match sender.send(response) { |
| Ok(_) => (), |
| Err(e) => error!("mpsc send error: {}", e), |
| } |
| } |
| } |
| |
| fn dispatch( |
| &self, |
| request: teaclave_rpc::Request<TeaclaveStorageRequest>, |
| ) -> std::result::Result<TeaclaveStorageResponse, StorageServiceError> { |
| match request.into_inner() { |
| TeaclaveStorageRequest::Get(r) => self.get(r).map(TeaclaveStorageResponse::Get), |
| TeaclaveStorageRequest::Put(r) => self.put(r).map(TeaclaveStorageResponse::Empty), |
| TeaclaveStorageRequest::Delete(r) => self.delete(r).map(TeaclaveStorageResponse::Empty), |
| TeaclaveStorageRequest::Enqueue(r) => { |
| self.enqueue(r).map(TeaclaveStorageResponse::Empty) |
| } |
| TeaclaveStorageRequest::Dequeue(r) => { |
| self.dequeue(r).map(TeaclaveStorageResponse::Dequeue) |
| } |
| TeaclaveStorageRequest::GetKeysByPrefix(r) => self |
| .get_keys_by_prefix(r) |
| .map(TeaclaveStorageResponse::GetKeysByPrefix), |
| } |
| } |
| } |
| |
| impl TeaclaveStorageService { |
| fn get(&self, request: GetRequest) -> std::result::Result<GetResponse, StorageServiceError> { |
| match self.database.borrow_mut().get(&request.key) { |
| Some(value) => Ok(GetResponse { value }), |
| None => bail!(StorageServiceError::None), |
| } |
| } |
| |
| fn put(&self, request: PutRequest) -> std::result::Result<(), StorageServiceError> { |
| self.database |
| .borrow_mut() |
| .put(&request.key, &request.value) |
| .map_err(StorageServiceError::Database)?; |
| |
| self.database |
| .borrow_mut() |
| .flush() |
| .map_err(StorageServiceError::Database)?; |
| Ok(()) |
| } |
| |
| fn delete(&self, request: DeleteRequest) -> std::result::Result<(), StorageServiceError> { |
| self.database |
| .borrow_mut() |
| .delete(&request.key) |
| .map_err(StorageServiceError::Database)?; |
| |
| self.database |
| .borrow_mut() |
| .flush() |
| .map_err(StorageServiceError::Database)?; |
| Ok(()) |
| } |
| |
| fn enqueue(&self, request: EnqueueRequest) -> std::result::Result<(), StorageServiceError> { |
| let mut db = self.database.borrow_mut(); |
| let mut queue = DBQueue::open(&mut db, &request.key); |
| match queue.enqueue(&request.value) { |
| Ok(_) => Ok(()), |
| Err(e) => bail!(e), |
| } |
| } |
| |
| fn dequeue( |
| &self, |
| request: DequeueRequest, |
| ) -> std::result::Result<DequeueResponse, StorageServiceError> { |
| let mut db = self.database.borrow_mut(); |
| let mut queue = DBQueue::open(&mut db, &request.key); |
| match queue.dequeue() { |
| Ok(value) => Ok(DequeueResponse { value }), |
| Err(e) => bail!(e), |
| } |
| } |
| |
| fn get_keys_by_prefix( |
| &self, |
| request: GetKeysByPrefixRequest, |
| ) -> std::result::Result<GetKeysByPrefixResponse, StorageServiceError> { |
| let prefix = request.prefix; |
| let mut db = self.database.borrow_mut(); |
| let mut it = db.new_iter().map_err(StorageServiceError::Database)?; |
| |
| let mut first_prefix = prefix.clone(); |
| first_prefix.push(b'-'); |
| let mut last_prefix = prefix; |
| last_prefix.push(b'.'); |
| |
| it.seek(&first_prefix[..]); |
| if !it.valid() { |
| return Ok(GetKeysByPrefixResponse::default()); |
| } |
| let mut key = Vec::new(); |
| let mut value = Vec::new(); |
| let mut keys = Vec::new(); |
| if !it.current(&mut key, &mut value) { |
| return Ok(GetKeysByPrefixResponse::default()); |
| } |
| keys.push(key); |
| |
| while let Some((k, _)) = it.next() { |
| if k >= last_prefix { |
| break; |
| } |
| keys.push(k); |
| } |
| |
| Ok(GetKeysByPrefixResponse { keys }) |
| } |
| } |
| |
| #[cfg(feature = "enclave_unit_test")] |
| pub mod tests { |
| use super::*; |
| use tokio::sync::mpsc::unbounded_channel; |
| |
| fn get_mock_service() -> TeaclaveStorageService { |
| let (_sender, receiver) = unbounded_channel(); |
| let key = [ |
| 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, |
| 0x09, 0x08, |
| ]; |
| let opt = rusty_leveldb::Options::new_disk_db_with(key); |
| let mut database = DB::open("mock_db_unit_test", opt).unwrap(); |
| database.put(b"test_get_key", b"test_get_value").unwrap(); |
| database |
| .put(b"test_delete_key", b"test_delete_value") |
| .unwrap(); |
| TeaclaveStorageService { |
| database: RefCell::new(database), |
| receiver, |
| } |
| } |
| |
| pub fn test_get_key() { |
| let service = get_mock_service(); |
| let request = GetRequest::new("test_get_key"); |
| assert!(service.get(request).is_ok()); |
| } |
| |
| pub fn test_put_key() { |
| let service = get_mock_service(); |
| let request = PutRequest::new("test_put_key", "test_put_value"); |
| assert!(service.put(request).is_ok()); |
| let request = GetRequest::new("test_put_key"); |
| assert!(service.get(request).is_ok()); |
| } |
| |
| pub fn test_delete_key() { |
| let service = get_mock_service(); |
| let request = DeleteRequest::new("test_delete_key"); |
| assert!(service.delete(request).is_ok()); |
| let request = GetRequest::new("test_delete_key"); |
| assert!(service.get(request).is_err()); |
| } |
| |
| pub fn test_empty_value() { |
| let service = get_mock_service(); |
| let request = PutRequest::new("test_empty_value", ""); |
| assert!(service.put(request).is_ok()); |
| let request = GetRequest::new("test_empty_value"); |
| let response = service.get(request).unwrap(); |
| assert_eq!(response.value, Vec::<u8>::new()); |
| } |
| |
| pub fn test_enqueue() { |
| let service = get_mock_service(); |
| let request = EnqueueRequest::new("test_enqueue_key", "1"); |
| assert!(service.enqueue(request).is_ok()); |
| let request = EnqueueRequest::new("test_enqueue_key", "2"); |
| assert!(service.enqueue(request).is_ok()); |
| } |
| |
| pub fn test_dequeue() { |
| let service = get_mock_service(); |
| let request = DequeueRequest::new("test_dequeue_key"); |
| assert!(service.dequeue(request).is_err()); |
| let request = EnqueueRequest::new("test_dequeue_key", "1"); |
| assert!(service.enqueue(request).is_ok()); |
| let request = EnqueueRequest::new("test_dequeue_key", "2"); |
| assert!(service.enqueue(request).is_ok()); |
| let request = DequeueRequest::new("test_dequeue_key"); |
| assert_eq!(service.dequeue(request).unwrap().value, b"1"); |
| let request = DequeueRequest::new("test_dequeue_key"); |
| assert_eq!(service.dequeue(request).unwrap().value, b"2"); |
| } |
| |
| pub fn test_get_keys_by_prefix() { |
| let service = get_mock_service(); |
| let request = PutRequest::new("function-1", "test_put_value"); |
| assert!(service.put(request).is_ok()); |
| let request = PutRequest::new("function-22", "test_put_value"); |
| assert!(service.put(request).is_ok()); |
| let request = PutRequest::new("function-333", "test_put_value"); |
| assert!(service.put(request).is_ok()); |
| let request = PutRequest::new("task-444", "test_put_value"); |
| assert!(service.put(request).is_ok()); |
| let request = PutRequest::new("function-5", "test_put_value"); |
| assert!(service.put(request).is_ok()); |
| let request = GetKeysByPrefixRequest::new("function"); |
| let response = service.get_keys_by_prefix(request); |
| assert!(response.is_ok()); |
| assert_eq!( |
| response.unwrap().keys, |
| std::vec![ |
| b"function-1".to_vec(), |
| b"function-22".to_vec(), |
| b"function-333".to_vec(), |
| b"function-5".to_vec() |
| ] |
| ); |
| } |
| } |