blob: af3feb10d9bdcbd73e4a0ea16562e0ae7dd3f613 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::error::StorageServiceError;
use crate::proxy::ProxyRequest;
use anyhow::anyhow;
use rusty_leveldb::LdbIterator;
use rusty_leveldb::DB;
use std::cell::RefCell;
use teaclave_proto::teaclave_storage_service::*;
use teaclave_service_enclave_utils::bail;
use tokio::sync::mpsc::UnboundedReceiver;
pub(crate) struct TeaclaveStorageService {
// Current LevelDB implementation is not concurrent, so we need to wrap the
// DB with RefCell. This service is running in a single thread, it's safe to
// use RefCell.
database: RefCell<DB>,
receiver: UnboundedReceiver<ProxyRequest>,
}
impl TeaclaveStorageService {
pub(crate) fn new(database: RefCell<DB>, receiver: UnboundedReceiver<ProxyRequest>) -> Self {
Self { database, receiver }
}
}
// queue-key-head: u32; include element
// queue-key-tail: u32; not include element; if head == tail, queue is empty
// queue-key-index: Vec<u8>; elements
// Todo: what if there are errors when doing get_tail and get_head
struct DBQueue<'a> {
database: &'a mut DB,
key: &'a [u8],
}
impl<'a> DBQueue<'a> {
fn get_tail_key(&self) -> Vec<u8> {
let mut head_key = b"queue-".to_vec();
head_key.extend_from_slice(self.key);
head_key.extend_from_slice(b"-tail");
head_key
}
fn get_head_key(&self) -> Vec<u8> {
let mut head_key = b"queue-".to_vec();
head_key.extend_from_slice(self.key);
head_key.extend_from_slice(b"-head");
head_key
}
fn get_element_key(&self, index: u32) -> Vec<u8> {
let mut element_key = b"queue-".to_vec();
element_key.extend_from_slice(self.key);
element_key.extend_from_slice(b"-");
element_key.extend_from_slice(&index.to_le_bytes());
element_key
}
fn get_head(&mut self) -> u32 {
let head_key = self.get_head_key();
self.read_u32(&head_key).unwrap_or(0)
}
fn get_tail(&mut self) -> u32 {
let tail_key = self.get_tail_key();
self.read_u32(&tail_key).unwrap_or(0)
}
fn read_u32(&mut self, key: &[u8]) -> Option<u32> {
let element_bytes: Vec<u8> = match self.database.get(key) {
Some(bytes) => bytes,
None => return None,
};
if element_bytes.len() != 4 {
return None;
}
let mut bytes: [u8; 4] = [0; 4];
bytes.copy_from_slice(&element_bytes);
Some(u32::from_le_bytes(bytes))
}
pub fn open(database: &'a mut DB, key: &'a [u8]) -> Self {
DBQueue { database, key }
}
pub fn enqueue(&mut self, value: &[u8]) -> Result<(), StorageServiceError> {
let tail_index = self.get_tail();
// put element
self.database
.put(&self.get_element_key(tail_index), value)?;
// update tail
let tail_index = tail_index.wrapping_add(1);
self.database
.put(&self.get_tail_key(), &tail_index.to_le_bytes())?;
self.database.flush()?;
Ok(())
}
pub fn dequeue(&mut self) -> Result<Vec<u8>, StorageServiceError> {
let head_index = self.get_head();
let tail_index = self.get_tail();
// check whether the queue is empty
if head_index == tail_index {
bail!(StorageServiceError::Service(anyhow!(
"head_index == tail_index"
)))
} else {
let element_key = self.get_element_key(head_index);
let result = match self.database.get(&element_key) {
Some(value) => value,
None => bail!(StorageServiceError::Service(anyhow!(
"cannot get element_key"
))),
};
// update head
let head_index = head_index.wrapping_add(1);
self.database
.put(&self.get_head_key(), &head_index.to_le_bytes())?;
self.database.delete(&element_key)?;
self.database.compact_range(b"queue", b"queuf")?;
Ok(result)
}
}
#[allow(unused)]
pub fn len(&mut self) -> u32 {
let head_index = self.get_head();
let tail_index = self.get_tail();
if tail_index >= head_index {
tail_index - head_index
} else {
u32::MAX - head_index + tail_index + 1
}
}
}
impl TeaclaveStorageService {
pub(crate) fn start(&mut self) {
while let Some(request) = self.receiver.blocking_recv() {
let database_request = request.request;
let sender = request.sender;
let response = self.dispatch(database_request);
match sender.send(response) {
Ok(_) => (),
Err(e) => error!("mpsc send error: {}", e),
}
}
}
fn dispatch(
&self,
request: teaclave_rpc::Request<TeaclaveStorageRequest>,
) -> std::result::Result<TeaclaveStorageResponse, StorageServiceError> {
match request.into_inner() {
TeaclaveStorageRequest::Get(r) => self.get(r).map(TeaclaveStorageResponse::Get),
TeaclaveStorageRequest::Put(r) => self.put(r).map(TeaclaveStorageResponse::Empty),
TeaclaveStorageRequest::Delete(r) => self.delete(r).map(TeaclaveStorageResponse::Empty),
TeaclaveStorageRequest::Enqueue(r) => {
self.enqueue(r).map(TeaclaveStorageResponse::Empty)
}
TeaclaveStorageRequest::Dequeue(r) => {
self.dequeue(r).map(TeaclaveStorageResponse::Dequeue)
}
TeaclaveStorageRequest::GetKeysByPrefix(r) => self
.get_keys_by_prefix(r)
.map(TeaclaveStorageResponse::GetKeysByPrefix),
}
}
}
impl TeaclaveStorageService {
fn get(&self, request: GetRequest) -> std::result::Result<GetResponse, StorageServiceError> {
match self.database.borrow_mut().get(&request.key) {
Some(value) => Ok(GetResponse { value }),
None => bail!(StorageServiceError::None),
}
}
fn put(&self, request: PutRequest) -> std::result::Result<(), StorageServiceError> {
self.database
.borrow_mut()
.put(&request.key, &request.value)
.map_err(StorageServiceError::Database)?;
self.database
.borrow_mut()
.flush()
.map_err(StorageServiceError::Database)?;
Ok(())
}
fn delete(&self, request: DeleteRequest) -> std::result::Result<(), StorageServiceError> {
self.database
.borrow_mut()
.delete(&request.key)
.map_err(StorageServiceError::Database)?;
self.database
.borrow_mut()
.flush()
.map_err(StorageServiceError::Database)?;
Ok(())
}
fn enqueue(&self, request: EnqueueRequest) -> std::result::Result<(), StorageServiceError> {
let mut db = self.database.borrow_mut();
let mut queue = DBQueue::open(&mut db, &request.key);
match queue.enqueue(&request.value) {
Ok(_) => Ok(()),
Err(e) => bail!(e),
}
}
fn dequeue(
&self,
request: DequeueRequest,
) -> std::result::Result<DequeueResponse, StorageServiceError> {
let mut db = self.database.borrow_mut();
let mut queue = DBQueue::open(&mut db, &request.key);
match queue.dequeue() {
Ok(value) => Ok(DequeueResponse { value }),
Err(e) => bail!(e),
}
}
fn get_keys_by_prefix(
&self,
request: GetKeysByPrefixRequest,
) -> std::result::Result<GetKeysByPrefixResponse, StorageServiceError> {
let prefix = request.prefix;
let mut db = self.database.borrow_mut();
let mut it = db.new_iter().map_err(StorageServiceError::Database)?;
let mut first_prefix = prefix.clone();
first_prefix.push(b'-');
let mut last_prefix = prefix;
last_prefix.push(b'.');
it.seek(&first_prefix[..]);
if !it.valid() {
return Ok(GetKeysByPrefixResponse::default());
}
let mut key = Vec::new();
let mut value = Vec::new();
let mut keys = Vec::new();
if !it.current(&mut key, &mut value) {
return Ok(GetKeysByPrefixResponse::default());
}
keys.push(key);
while let Some((k, _)) = it.next() {
if k >= last_prefix {
break;
}
keys.push(k);
}
Ok(GetKeysByPrefixResponse { keys })
}
}
#[cfg(feature = "enclave_unit_test")]
pub mod tests {
use super::*;
use tokio::sync::mpsc::unbounded_channel;
fn get_mock_service() -> TeaclaveStorageService {
let (_sender, receiver) = unbounded_channel();
let key = [
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a,
0x09, 0x08,
];
let opt = rusty_leveldb::Options::new_disk_db_with(key);
let mut database = DB::open("mock_db_unit_test", opt).unwrap();
database.put(b"test_get_key", b"test_get_value").unwrap();
database
.put(b"test_delete_key", b"test_delete_value")
.unwrap();
TeaclaveStorageService {
database: RefCell::new(database),
receiver,
}
}
pub fn test_get_key() {
let service = get_mock_service();
let request = GetRequest::new("test_get_key");
assert!(service.get(request).is_ok());
}
pub fn test_put_key() {
let service = get_mock_service();
let request = PutRequest::new("test_put_key", "test_put_value");
assert!(service.put(request).is_ok());
let request = GetRequest::new("test_put_key");
assert!(service.get(request).is_ok());
}
pub fn test_delete_key() {
let service = get_mock_service();
let request = DeleteRequest::new("test_delete_key");
assert!(service.delete(request).is_ok());
let request = GetRequest::new("test_delete_key");
assert!(service.get(request).is_err());
}
pub fn test_empty_value() {
let service = get_mock_service();
let request = PutRequest::new("test_empty_value", "");
assert!(service.put(request).is_ok());
let request = GetRequest::new("test_empty_value");
let response = service.get(request).unwrap();
assert_eq!(response.value, Vec::<u8>::new());
}
pub fn test_enqueue() {
let service = get_mock_service();
let request = EnqueueRequest::new("test_enqueue_key", "1");
assert!(service.enqueue(request).is_ok());
let request = EnqueueRequest::new("test_enqueue_key", "2");
assert!(service.enqueue(request).is_ok());
}
pub fn test_dequeue() {
let service = get_mock_service();
let request = DequeueRequest::new("test_dequeue_key");
assert!(service.dequeue(request).is_err());
let request = EnqueueRequest::new("test_dequeue_key", "1");
assert!(service.enqueue(request).is_ok());
let request = EnqueueRequest::new("test_dequeue_key", "2");
assert!(service.enqueue(request).is_ok());
let request = DequeueRequest::new("test_dequeue_key");
assert_eq!(service.dequeue(request).unwrap().value, b"1");
let request = DequeueRequest::new("test_dequeue_key");
assert_eq!(service.dequeue(request).unwrap().value, b"2");
}
pub fn test_get_keys_by_prefix() {
let service = get_mock_service();
let request = PutRequest::new("function-1", "test_put_value");
assert!(service.put(request).is_ok());
let request = PutRequest::new("function-22", "test_put_value");
assert!(service.put(request).is_ok());
let request = PutRequest::new("function-333", "test_put_value");
assert!(service.put(request).is_ok());
let request = PutRequest::new("task-444", "test_put_value");
assert!(service.put(request).is_ok());
let request = PutRequest::new("function-5", "test_put_value");
assert!(service.put(request).is_ok());
let request = GetKeysByPrefixRequest::new("function");
let response = service.get_keys_by_prefix(request);
assert!(response.is_ok());
assert_eq!(
response.unwrap().keys,
std::vec![
b"function-1".to_vec(),
b"function-22".to_vec(),
b"function-333".to_vec(),
b"function-5".to_vec()
]
);
}
}