blob: a46a1970bcb0bf3d83f8c157a8fd5bf2043dd4d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::convert::TryInto;
use std::prelude::v1::*;
use std::sync::{Arc, SgxMutex as Mutex};
use teaclave_proto::teaclave_frontend_service::{
ApproveTaskRequest, ApproveTaskResponse, AssignDataRequest, AssignDataResponse,
CreateTaskRequest, CreateTaskResponse, GetFunctionRequest, GetFunctionResponse,
GetInputFileRequest, GetInputFileResponse, GetOutputFileRequest, GetOutputFileResponse,
GetTaskRequest, GetTaskResponse, InvokeTaskRequest, InvokeTaskResponse,
RegisterFunctionRequest, RegisterFunctionResponse, RegisterFusionOutputRequest,
RegisterFusionOutputResponse, RegisterInputFileRequest, RegisterInputFileResponse,
RegisterInputFromOutputRequest, RegisterInputFromOutputResponse, RegisterOutputFileRequest,
RegisterOutputFileResponse, UpdateInputFileRequest, UpdateInputFileResponse,
UpdateOutputFileRequest, UpdateOutputFileResponse,
};
use teaclave_proto::teaclave_management_service::TeaclaveManagement;
use teaclave_proto::teaclave_storage_service::{
EnqueueRequest, GetRequest, PutRequest, TeaclaveStorageClient,
};
use teaclave_rpc::endpoint::Endpoint;
use teaclave_rpc::Request;
use teaclave_service_enclave_utils::{ensure, teaclave_service};
use teaclave_types::*;
use thiserror::Error;
use url::Url;
use uuid::Uuid;
#[derive(Error, Debug)]
enum ServiceError {
#[error("invalid request")]
InvalidRequest,
#[error("data error")]
DataError,
#[error("storage error")]
StorageError,
#[error("permission denied")]
PermissionDenied,
#[error("bad task")]
BadTask,
}
impl From<ServiceError> for TeaclaveServiceResponseError {
fn from(error: ServiceError) -> Self {
TeaclaveServiceResponseError::RequestError(error.to_string())
}
}
#[teaclave_service(teaclave_management_service, TeaclaveManagement, ServiceError)]
#[derive(Clone)]
pub(crate) struct TeaclaveManagementService {
storage_client: Arc<Mutex<TeaclaveStorageClient>>,
}
impl TeaclaveManagement for TeaclaveManagementService {
// access control: none
fn register_input_file(
&self,
request: Request<RegisterInputFileRequest>,
) -> TeaclaveServiceResponseResult<RegisterInputFileResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let input_file = TeaclaveInputFile::new(
request.url,
request.cmac,
request.crypto_info,
vec![user_id],
);
self.write_to_db(&input_file)
.map_err(|_| ServiceError::StorageError)?;
let response = RegisterInputFileResponse::new(input_file.external_id());
Ok(response)
}
// access control:
// 1) exisiting_file.owner_list.len() == 1
// 2) user_id in existing_file.owner_list
fn update_input_file(
&self,
request: Request<UpdateInputFileRequest>,
) -> TeaclaveServiceResponseResult<UpdateInputFileResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let old_input_file: TeaclaveInputFile = self
.read_from_db(&request.data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(
old_input_file.owner == OwnerList::from(vec![user_id]),
ServiceError::PermissionDenied
);
let input_file = TeaclaveInputFile::new(
request.url,
old_input_file.cmac,
old_input_file.crypto_info,
old_input_file.owner,
);
self.write_to_db(&input_file)
.map_err(|_| ServiceError::StorageError)?;
let response = UpdateInputFileResponse::new(input_file.external_id());
Ok(response)
}
// access control: none
fn register_output_file(
&self,
request: Request<RegisterOutputFileRequest>,
) -> TeaclaveServiceResponseResult<RegisterOutputFileResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let output_file = TeaclaveOutputFile::new(request.url, request.crypto_info, vec![user_id]);
self.write_to_db(&output_file)
.map_err(|_| ServiceError::StorageError)?;
let response = RegisterOutputFileResponse::new(output_file.external_id());
Ok(response)
}
// access control:
// 1) exisiting_file.owner_list.len() == 1
// 2) user_id in existing_file.owner_list
fn update_output_file(
&self,
request: Request<UpdateOutputFileRequest>,
) -> TeaclaveServiceResponseResult<UpdateOutputFileResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let old_output_file: TeaclaveOutputFile = self
.read_from_db(&request.data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(
old_output_file.owner == OwnerList::from(vec![user_id]),
ServiceError::PermissionDenied
);
let output_file = TeaclaveOutputFile::new(
request.url,
old_output_file.crypto_info,
old_output_file.owner,
);
self.write_to_db(&output_file)
.map_err(|_| ServiceError::StorageError)?;
let response = UpdateOutputFileResponse::new(output_file.external_id());
Ok(response)
}
// access control: user_id in owner_list
fn register_fusion_output(
&self,
request: Request<RegisterFusionOutputRequest>,
) -> TeaclaveServiceResponseResult<RegisterFusionOutputResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let owner_list = request.message.owner_list;
ensure!(
owner_list.len() > 1 && owner_list.contains(&user_id),
ServiceError::PermissionDenied
);
let output_file = self
.create_fusion_data(owner_list)
.map_err(|_| ServiceError::DataError)?;
self.write_to_db(&output_file)
.map_err(|_| ServiceError::StorageError)?;
let response = RegisterFusionOutputResponse::new(output_file.external_id());
Ok(response)
}
// access control:
// 1) user_id in output.owner
// 2) cmac != none
fn register_input_from_output(
&self,
request: Request<RegisterInputFromOutputRequest>,
) -> TeaclaveServiceResponseResult<RegisterInputFromOutputResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let output: TeaclaveOutputFile = self
.read_from_db(&request.message.data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(
output.owner.contains(&user_id),
ServiceError::PermissionDenied
);
let input =
TeaclaveInputFile::from_output(output).map_err(|_| ServiceError::PermissionDenied)?;
self.write_to_db(&input)
.map_err(|_| ServiceError::StorageError)?;
let response = RegisterInputFromOutputResponse::new(input.external_id());
Ok(response)
}
// access control: output_file.owner contains user_id
fn get_output_file(
&self,
request: Request<GetOutputFileRequest>,
) -> TeaclaveServiceResponseResult<GetOutputFileResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let output_file: TeaclaveOutputFile = self
.read_from_db(&request.message.data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(
output_file.owner.contains(&user_id),
ServiceError::PermissionDenied
);
let response = GetOutputFileResponse::new(output_file.owner, output_file.cmac);
Ok(response)
}
// access control: input_file.owner contains user_id
fn get_input_file(
&self,
request: Request<GetInputFileRequest>,
) -> TeaclaveServiceResponseResult<GetInputFileResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let input_file: TeaclaveInputFile = self
.read_from_db(&request.message.data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(
input_file.owner.contains(&user_id),
ServiceError::PermissionDenied
);
let response = GetInputFileResponse::new(input_file.owner, input_file.cmac);
Ok(response)
}
// access_control: none
fn register_function(
&self,
request: Request<RegisterFunctionRequest>,
) -> TeaclaveServiceResponseResult<RegisterFunctionResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let function = Function::from(request.message)
.id(Uuid::new_v4())
.owner(user_id);
self.write_to_db(&function)
.map_err(|_| ServiceError::StorageError)?;
let response = RegisterFunctionResponse::new(function.external_id());
Ok(response)
}
// access control: function.public || function.owner == user_id
fn get_function(
&self,
request: Request<GetFunctionRequest>,
) -> TeaclaveServiceResponseResult<GetFunctionResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let function: Function = self
.read_from_db(&request.message.function_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(
(function.public || function.owner == user_id),
ServiceError::PermissionDenied
);
let response = GetFunctionResponse {
name: function.name,
description: function.description,
owner: function.owner,
executor_type: function.executor_type,
payload: function.payload,
public: function.public,
arguments: function.arguments,
inputs: function.inputs,
outputs: function.outputs,
};
Ok(response)
}
// access control: none
// when a task is created, following rules will be verified:
// 1) arugments match function definition
// 2) input match function definition
// 3) output match function definition
fn create_task(
&self,
request: Request<CreateTaskRequest>,
) -> TeaclaveServiceResponseResult<CreateTaskResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let function: Function = self
.read_from_db(&request.function_id)
.map_err(|_| ServiceError::PermissionDenied)?;
let task = Task::<Create>::new(
user_id,
request.executor,
request.function_arguments,
request.inputs_ownership,
request.outputs_ownership,
function,
)
.map_err(|_| ServiceError::BadTask)?;
log::info!("CreateTask: {:?}", task);
let ts: TaskState = task.into();
self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
let response = CreateTaskResponse::new(ts.external_id());
Ok(response)
}
// access control: task.participants.contains(&user_id)
fn get_task(
&self,
request: Request<GetTaskRequest>,
) -> TeaclaveServiceResponseResult<GetTaskResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let ts: TaskState = self
.read_from_db(&request.message.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(ts.has_participant(&user_id), ServiceError::PermissionDenied);
log::info!("GetTask: {:?}", ts);
let response = GetTaskResponse {
task_id: ts.external_id(),
creator: ts.creator,
function_id: ts.function_id,
function_owner: ts.function_owner,
function_arguments: ts.function_arguments,
inputs_ownership: ts.inputs_ownership,
outputs_ownership: ts.outputs_ownership,
participants: ts.participants,
approved_users: ts.approved_users,
assigned_inputs: ts.assigned_inputs.external_ids(),
assigned_outputs: ts.assigned_outputs.external_ids(),
result: ts.result,
status: ts.status,
};
Ok(response)
}
// access control:
// 1) task.participants.contains(user_id)
// 2) task.status == Created
// 3) user can use the data:
// * input file: user_id == input_file.owner contains user_id
// * output file: output_file.owner contains user_id && output_file.cmac.is_none()
// 4) the data can be assgined to the task:
// * inputs_ownership or outputs_ownership contains the data name
// * input file: OwnerList match input_file.owner
// * output file: OwnerList match output_file.owner
fn assign_data(
&self,
request: Request<AssignDataRequest>,
) -> TeaclaveServiceResponseResult<AssignDataResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let ts: TaskState = self
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
ensure!(ts.has_participant(&user_id), ServiceError::PermissionDenied);
let mut task: Task<Assign> = ts.try_into().map_err(|e| {
log::warn!("Assign state error: {:?}", e);
ServiceError::PermissionDenied
})?;
for (data_name, data_id) in request.inputs.iter() {
let file: TeaclaveInputFile = self
.read_from_db(&data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
task.assign_input(&user_id, data_name, file)
.map_err(|_| ServiceError::PermissionDenied)?;
}
for (data_name, data_id) in request.outputs.iter() {
let file: TeaclaveOutputFile = self
.read_from_db(&data_id)
.map_err(|_| ServiceError::PermissionDenied)?;
task.assign_output(&user_id, data_name, file)
.map_err(|_| ServiceError::PermissionDenied)?;
}
log::info!("AssignData: {:?}", task);
let ts: TaskState = task.into();
self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
Ok(AssignDataResponse)
}
// access_control:
// 1) task status == Ready
// 2) user_id in task.participants
fn approve_task(
&self,
request: Request<ApproveTaskRequest>,
) -> TeaclaveServiceResponseResult<ApproveTaskResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let ts: TaskState = self
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
let mut task: Task<Approve> = ts.try_into().map_err(|e| {
log::warn!("Approve state error: {:?}", e);
ServiceError::PermissionDenied
})?;
task.approve(&user_id)
.map_err(|_| ServiceError::PermissionDenied)?;
log::info!("ApproveTask: approve:{:?}", task);
let ts: TaskState = task.into();
self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
Ok(ApproveTaskResponse)
}
// access_control:
// 1) task status == Approved
// 2) user_id == task.creator
fn invoke_task(
&self,
request: Request<InvokeTaskRequest>,
) -> TeaclaveServiceResponseResult<InvokeTaskResponse> {
let user_id = self.get_request_user_id(request.metadata())?;
let request = request.message;
let ts: TaskState = self
.read_from_db(&request.task_id)
.map_err(|_| ServiceError::PermissionDenied)?;
// Early validation
ensure!(ts.has_creator(&user_id), ServiceError::PermissionDenied);
let function: Function = self
.read_from_db(&ts.function_id)
.map_err(|_| ServiceError::PermissionDenied)?;
log::info!("InvokeTask: get function: {:?}", function);
let mut task: Task<Stage> = ts.try_into().map_err(|e| {
log::warn!("Stage state error: {:?}", e);
ServiceError::PermissionDenied
})?;
log::info!("InvokeTask: get task: {:?}", task);
let staged_task = task.stage_for_running(&user_id, function)?;
log::info!("InvokeTask: staged task: {:?}", staged_task);
self.enqueue_to_db(StagedTask::get_queue_key().as_bytes(), &staged_task)?;
let ts: TaskState = task.into();
self.write_to_db(&ts)
.map_err(|_| ServiceError::StorageError)?;
Ok(InvokeTaskResponse)
}
}
impl TeaclaveManagementService {
pub(crate) fn new(storage_service_endpoint: Endpoint) -> Result<Self> {
let mut i = 0;
let channel = loop {
match storage_service_endpoint.connect() {
Ok(channel) => break channel,
Err(_) => {
anyhow::ensure!(i < 10, "failed to connect to storage service");
log::debug!("Failed to connect to storage service, retry {}", i);
i += 1;
}
}
std::thread::sleep(std::time::Duration::from_secs(3));
};
let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new(channel)?));
let service = Self { storage_client };
#[cfg(test_mode)]
service.add_mock_data()?;
Ok(service)
}
pub fn create_fusion_data(&self, owners: impl Into<OwnerList>) -> Result<TeaclaveOutputFile> {
let uuid = Uuid::new_v4();
let url = format!("fusion:///TEACLAVE_FUSION_BASE/{}.fusion", uuid.to_string());
let url = Url::parse(&url).map_err(|_| anyhow!("invalid url"))?;
let crypto_info = FileCrypto::default();
Ok(TeaclaveOutputFile::new(url, crypto_info, owners))
}
fn get_request_user_id(
&self,
meta: &HashMap<String, String>,
) -> TeaclaveServiceResponseResult<UserID> {
let user_id = meta.get("id").ok_or_else(|| ServiceError::InvalidRequest)?;
Ok(user_id.to_string().into())
}
fn write_to_db(&self, item: &impl Storable) -> Result<()> {
let k = item.key();
let v = item.to_vec()?;
let put_request = PutRequest::new(k.as_slice(), v.as_slice());
let _put_response = self
.storage_client
.clone()
.lock()
.map_err(|_| anyhow!("Cannot lock storage client"))?
.put(put_request)?;
Ok(())
}
fn read_from_db<T: Storable>(&self, key: &ExternalID) -> Result<T> {
anyhow::ensure!(T::match_prefix(&key.prefix), "Key prefix doesn't match.");
let request = GetRequest::new(key.to_bytes());
let response = self
.storage_client
.clone()
.lock()
.map_err(|_| anyhow!("Cannot lock storage client"))?
.get(request)?;
T::from_slice(response.value.as_slice())
}
fn enqueue_to_db(&self, key: &[u8], item: &impl Storable) -> TeaclaveServiceResponseResult<()> {
let value = item.to_vec().map_err(|_| ServiceError::DataError)?;
let enqueue_request = EnqueueRequest::new(key, value);
let _enqueue_response = self
.storage_client
.clone()
.lock()
.map_err(|_| ServiceError::StorageError)?
.enqueue(enqueue_request)?;
Ok(())
}
#[cfg(test_mode)]
fn add_mock_data(&self) -> Result<()> {
let mut output_file = self.create_fusion_data(vec!["mock_user1", "frontend_user"])?;
output_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000001")?;
output_file.cmac = Some(FileAuthTag::mock());
self.write_to_db(&output_file)?;
let mut output_file = self.create_fusion_data(vec!["mock_user2", "mock_user3"])?;
output_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000002")?;
output_file.cmac = Some(FileAuthTag::mock());
self.write_to_db(&output_file)?;
let mut input_file = TeaclaveInputFile::from_output(output_file)?;
input_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000002")?;
self.write_to_db(&input_file)?;
let function_input = FunctionInput::new("input", "input_desc");
let function_output = FunctionOutput::new("output", "output_desc");
let function_input2 = FunctionInput::new("input2", "input_desc");
let function_output2 = FunctionOutput::new("output2", "output_desc");
let function = Function::new()
.id(Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap())
.name("mock-func-1")
.description("mock-desc")
.payload(b"mock-payload".to_vec())
.public(true)
.arguments(vec!["arg1".to_string(), "arg2".to_string()])
.inputs(vec![function_input, function_input2])
.outputs(vec![function_output, function_output2])
.owner("teaclave".to_string());
self.write_to_db(&function)?;
let function_output = FunctionOutput::new("output", "output_desc");
let function = Function::new()
.id(Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap())
.name("mock-func-2")
.description("mock-desc")
.payload(b"mock-payload".to_vec())
.public(true)
.arguments(vec!["arg1".to_string()])
.outputs(vec![function_output])
.owner("teaclave".to_string());
self.write_to_db(&function)?;
Ok(())
}
}
#[cfg(feature = "enclave_unit_test")]
pub mod tests {
use super::*;
use serde_json::json;
use std::collections::HashMap;
use teaclave_types::{
hashmap, Executor, FileAuthTag, FileCrypto, FunctionArguments, FunctionInput,
FunctionInputFile, FunctionOutput, FunctionOutputFile,
};
use url::Url;
pub fn handle_input_file() {
let url = Url::parse("s3://bucket_id/path?token=mock_token").unwrap();
let cmac = FileAuthTag::mock();
let input_file =
TeaclaveInputFile::new(url, cmac, FileCrypto::default(), vec!["mock_user"]);
assert!(TeaclaveInputFile::match_prefix(&input_file.key_string()));
let value = input_file.to_vec().unwrap();
let deserialized_file = TeaclaveInputFile::from_slice(&value).unwrap();
info!("file: {:?}", deserialized_file);
}
pub fn handle_output_file() {
let url = Url::parse("s3://bucket_id/path?token=mock_token").unwrap();
let output_file = TeaclaveOutputFile::new(url, FileCrypto::default(), vec!["mock_user"]);
assert!(TeaclaveOutputFile::match_prefix(&output_file.key_string()));
let value = output_file.to_vec().unwrap();
let deserialized_file = TeaclaveOutputFile::from_slice(&value).unwrap();
info!("file: {:?}", deserialized_file);
}
pub fn handle_function() {
let function_input = FunctionInput::new("input", "input_desc");
let function_output = FunctionOutput::new("output", "output_desc");
let function = Function::new()
.id(Uuid::new_v4())
.name("mock_function")
.description("mock function")
.payload(b"python script".to_vec())
.arguments(vec!["arg".to_string()])
.inputs(vec![function_input])
.outputs(vec![function_output])
.public(true)
.owner("mock_user");
assert!(Function::match_prefix(&function.key_string()));
let value = function.to_vec().unwrap();
let deserialized_function = Function::from_slice(&value).unwrap();
info!("function: {:?}", deserialized_function);
}
pub fn handle_task() {
let function = Function::new()
.id(Uuid::new_v4())
.name("mock_function")
.description("mock function")
.payload(b"python script".to_vec())
.arguments(vec!["arg".to_string()])
.public(true)
.owner("mock_user");
let function_arguments = FunctionArguments::from_json(json!({"arg": "data"})).unwrap();
let task = Task::<Create>::new(
UserID::from("mock_user"),
Executor::MesaPy,
function_arguments,
HashMap::new(),
HashMap::new(),
function,
)
.unwrap();
let ts: TaskState = task.try_into().unwrap();
let value = ts.to_vec().unwrap();
let deserialized_task = TaskState::from_slice(&value).unwrap();
info!("task: {:?}", deserialized_task);
}
pub fn handle_staged_task() {
let function = Function::new()
.id(Uuid::new_v4())
.name("mock_function")
.description("mock function")
.payload(b"python script".to_vec())
.public(true)
.owner("mock_user");
let url = Url::parse("s3://bucket_id/path?token=mock_token").unwrap();
let cmac = FileAuthTag::mock();
let input_data = FunctionInputFile::new(url.clone(), cmac, FileCrypto::default());
let output_data = FunctionOutputFile::new(url, FileCrypto::default());
let staged_task = StagedTask::new()
.task_id(Uuid::new_v4())
.executor(Executor::MesaPy)
.function_payload(function.payload)
.function_arguments(hashmap!("arg" => "data"))
.input_data(hashmap!("input" => input_data))
.output_data(hashmap!("output" => output_data));
let value = staged_task.to_vec().unwrap();
let deserialized_data = StagedTask::from_slice(&value).unwrap();
info!("staged task: {:?}", deserialized_data);
}
}