| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use super::*; |
| |
| use audit::Auditor; |
| use error::ManagementServiceError; |
| |
| use anyhow::anyhow; |
| use std::convert::TryInto; |
| use std::sync::Arc; |
| use teaclave_proto::teaclave_common::i32_from_task_status; |
| use teaclave_proto::teaclave_frontend_service::*; |
| use teaclave_proto::teaclave_frontend_service::{ |
| from_proto_file_ids, from_proto_ownership, to_proto_file_ids, to_proto_ownership, |
| }; |
| use teaclave_proto::teaclave_management_service::{SaveLogsRequest, TeaclaveManagement}; |
| use teaclave_proto::teaclave_storage_service::{ |
| DeleteRequest, EnqueueRequest, GetKeysByPrefixRequest, GetRequest, PutRequest, |
| TeaclaveStorageClient, |
| }; |
| use teaclave_rpc::transport::{channel::Endpoint, Channel}; |
| use teaclave_rpc::{Request, Response}; |
| use teaclave_service_enclave_utils::ensure; |
| use teaclave_types::*; |
| use tokio::sync::Mutex; |
| use tokio::task; |
| use url::Url; |
| use uuid::Uuid; |
| |
| #[derive(Clone)] |
| pub(crate) struct TeaclaveManagementService { |
| storage_client: Arc<Mutex<TeaclaveStorageClient<Channel>>>, |
| auditor: audit::Auditor, |
| } |
| |
| #[teaclave_rpc::async_trait] |
| impl TeaclaveManagement for TeaclaveManagementService { |
| // access control: none |
| async fn register_input_file( |
| &self, |
| request: Request<RegisterInputFileRequest>, |
| ) -> TeaclaveServiceResponseResult<RegisterInputFileResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let request = request.into_inner(); |
| let url = Url::parse(&request.url).map_err(tonic_error)?; |
| let cmac = FileAuthTag::from_bytes(&request.cmac).map_err(tonic_error)?; |
| let crypto_info = request |
| .crypto_info |
| .ok_or_else(|| tonic_error("missing crypto_info"))? |
| .try_into() |
| .map_err(tonic_error)?; |
| |
| let input_file = TeaclaveInputFile::new(url, cmac, crypto_info, vec![user_id]); |
| |
| self.write_to_db(&input_file).await?; |
| |
| let response = RegisterInputFileResponse::new(input_file.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| // access control: |
| // 1) exisiting_file.owner_list.len() == 1 |
| // 2) user_id in existing_file.owner_list |
| async fn update_input_file( |
| &self, |
| request: Request<UpdateInputFileRequest>, |
| ) -> TeaclaveServiceResponseResult<UpdateInputFileResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let request = request.into_inner(); |
| |
| let old_input_file: TeaclaveInputFile = self |
| .read_from_db(&request.data_id.try_into().map_err(tonic_error)?) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| |
| ensure!( |
| old_input_file.owner == OwnerList::from(vec![user_id]), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let input_file = TeaclaveInputFile::new( |
| Url::parse(&request.url).map_err(tonic_error)?, |
| old_input_file.cmac, |
| old_input_file.crypto_info, |
| old_input_file.owner, |
| ); |
| |
| self.write_to_db(&input_file).await?; |
| |
| let response = UpdateInputFileResponse::new(input_file.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| // access control: none |
| async fn register_output_file( |
| &self, |
| request: Request<RegisterOutputFileRequest>, |
| ) -> TeaclaveServiceResponseResult<RegisterOutputFileResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let request = request.into_inner(); |
| let output_file = TeaclaveOutputFile::new( |
| Url::parse(&request.url).map_err(tonic_error)?, |
| request |
| .crypto_info |
| .ok_or_else(|| tonic_error("missing crypto_info"))? |
| .try_into() |
| .map_err(tonic_error)?, |
| vec![user_id], |
| ); |
| |
| self.write_to_db(&output_file).await?; |
| |
| let response = RegisterOutputFileResponse::new(output_file.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| // access control: |
| // 1) exisiting_file.owner_list.len() == 1 |
| // 2) user_id in existing_file.owner_list |
| async fn update_output_file( |
| &self, |
| request: Request<UpdateOutputFileRequest>, |
| ) -> TeaclaveServiceResponseResult<UpdateOutputFileResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let request = request.into_inner(); |
| |
| let old_output_file: TeaclaveOutputFile = self |
| .read_from_db(&request.data_id.try_into().map_err(tonic_error)?) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| |
| ensure!( |
| old_output_file.owner == OwnerList::from(vec![user_id]), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let output_file = TeaclaveOutputFile::new( |
| Url::parse(&request.url).map_err(tonic_error)?, |
| old_output_file.crypto_info, |
| old_output_file.owner, |
| ); |
| |
| self.write_to_db(&output_file).await?; |
| |
| let response = UpdateOutputFileResponse::new(output_file.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| async fn register_fusion_output( |
| &self, |
| request: Request<RegisterFusionOutputRequest>, |
| ) -> TeaclaveServiceResponseResult<RegisterFusionOutputResponse> { |
| let user_id = get_request_user_id(&request)?.to_string(); |
| |
| let owner_list = request.into_inner().owner_list; |
| ensure!( |
| owner_list.len() > 1 && owner_list.contains(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let output_file = create_fusion_data(owner_list).map_err(tonic_error)?; |
| |
| self.write_to_db(&output_file).await?; |
| |
| let response = RegisterFusionOutputResponse::new(output_file.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| // access control: |
| // 1) user_id in output.owner |
| // 2) cmac != none |
| async fn register_input_from_output( |
| &self, |
| request: Request<RegisterInputFromOutputRequest>, |
| ) -> TeaclaveServiceResponseResult<RegisterInputFromOutputResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let data_id = request |
| .into_inner() |
| .data_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| let output: TeaclaveOutputFile = self |
| .read_from_db(&data_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| |
| ensure!( |
| output.owner.contains(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let input = TeaclaveInputFile::from_output(output) |
| .map_err(|_| ManagementServiceError::InvalidOutputFile)?; |
| |
| self.write_to_db(&input).await?; |
| |
| let response = RegisterInputFromOutputResponse::new(input.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| async fn get_output_file( |
| &self, |
| request: Request<GetOutputFileRequest>, |
| ) -> TeaclaveServiceResponseResult<GetOutputFileResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let data_id = request |
| .into_inner() |
| .data_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| let output_file: TeaclaveOutputFile = self |
| .read_from_db(&data_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| |
| ensure!( |
| output_file.owner.contains(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let response = GetOutputFileResponse::new(output_file.owner, output_file.cmac); |
| Ok(Response::new(response)) |
| } |
| |
| async fn get_input_file( |
| &self, |
| request: Request<GetInputFileRequest>, |
| ) -> TeaclaveServiceResponseResult<GetInputFileResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let data_id = request |
| .into_inner() |
| .data_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| let input_file: TeaclaveInputFile = self |
| .read_from_db(&data_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| |
| ensure!( |
| input_file.owner.contains(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let response = GetInputFileResponse::new(input_file.owner, input_file.cmac); |
| Ok(Response::new(response)) |
| } |
| |
| // access control: none |
| async fn register_function( |
| &self, |
| request: Request<RegisterFunctionRequest>, |
| ) -> TeaclaveServiceResponseResult<RegisterFunctionResponse> { |
| let user_id = get_request_user_id(&request)?; |
| |
| let function = FunctionBuilder::try_from(request.into_inner()) |
| .map_err(tonic_error)? |
| .id(Uuid::new_v4()) |
| .owner(user_id.clone()) |
| .build(); |
| |
| self.write_to_db(&function).await?; |
| |
| let mut u = User { |
| id: user_id, |
| ..Default::default() |
| }; |
| let external_id = u.external_id(); |
| |
| let user = self.read_from_db::<User>(&external_id).await; |
| match user { |
| Ok(mut us) => { |
| us.registered_functions |
| .push(function.external_id().to_string()); |
| self.write_to_db(&us).await?; |
| } |
| Err(_) => { |
| u.registered_functions |
| .push(function.external_id().to_string()); |
| self.write_to_db(&u).await?; |
| } |
| } |
| |
| // Update allowed function list for users |
| for user_id in &function.user_allowlist { |
| let mut u = User { |
| id: user_id.into(), |
| ..Default::default() |
| }; |
| let external_id = u.external_id(); |
| let user = self.read_from_db::<User>(&external_id).await; |
| match user { |
| Ok(mut us) => { |
| us.allowed_functions |
| .push(function.external_id().to_string()); |
| self.write_to_db(&us).await?; |
| } |
| Err(_) => { |
| u.allowed_functions.push(function.external_id().to_string()); |
| self.write_to_db(&u).await?; |
| } |
| } |
| } |
| |
| let usage = FunctionUsage { |
| function_id: function.id, |
| ..Default::default() |
| }; |
| self.write_to_db(&usage).await?; |
| |
| let response = RegisterFunctionResponse::new(function.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| async fn update_function( |
| &self, |
| request: Request<UpdateFunctionRequest>, |
| ) -> TeaclaveServiceResponseResult<UpdateFunctionResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let request = request.into_inner(); |
| |
| let function_id = request |
| .function_id |
| .clone() |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let function: Function = self |
| .read_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| ensure!( |
| function.owner == user_id, |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let function = FunctionBuilder::try_from(request) |
| .map_err(tonic_error)? |
| .owner(user_id) |
| .build(); |
| |
| self.write_to_db(&function).await?; |
| |
| let response = UpdateFunctionResponse::new(function.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| async fn get_function( |
| &self, |
| request: Request<GetFunctionRequest>, |
| ) -> TeaclaveServiceResponseResult<GetFunctionResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let role = get_request_role(&request)?; |
| let function_id = request |
| .into_inner() |
| .function_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let function: Function = self |
| .read_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| if function.public || role == UserRole::PlatformAdmin || function.owner == user_id { |
| let response = function.into(); |
| |
| Ok(Response::new(response)) |
| } else if function.user_allowlist.contains(&user_id.into()) { |
| let mut response = GetFunctionResponse::from(function); |
| response.payload = vec![]; |
| response.user_allowlist = vec![]; |
| |
| Ok(Response::new(response)) |
| } else { |
| Err(ManagementServiceError::PermissionDenied.into()) |
| } |
| } |
| |
| async fn get_function_usage_stats( |
| &self, |
| request: Request<GetFunctionUsageStatsRequest>, |
| ) -> TeaclaveServiceResponseResult<GetFunctionUsageStatsResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let role = get_request_role(&request)?; |
| let function_id = request |
| .into_inner() |
| .function_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let function: Function = self |
| .read_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| ensure!( |
| function.public |
| || role == UserRole::PlatformAdmin |
| || function.user_allowlist.contains(&user_id.to_string()), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let usage = FunctionUsage { |
| function_id: function.id, |
| ..Default::default() |
| }; |
| let external_id = usage.external_id(); |
| let function_usage = self |
| .read_from_db::<FunctionUsage>(&external_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let function_quota = function.usage_quota.unwrap_or(-1); |
| let response = GetFunctionUsageStatsResponse { |
| function_quota, |
| current_usage: function_usage.use_numbers, |
| }; |
| Ok(Response::new(response)) |
| } |
| |
| async fn delete_function( |
| &self, |
| request: Request<DeleteFunctionRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let user_id = get_request_user_id(&request)?; |
| let function_id = request |
| .into_inner() |
| .function_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let function: Function = self |
| .read_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| ensure!( |
| function.owner == user_id, |
| ManagementServiceError::PermissionDenied |
| ); |
| self.delete_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| Ok(Response::new(())) |
| } |
| |
| // 1. `List functions` does not show this function |
| // 2. `Create new task` with the function id fails |
| async fn disable_function( |
| &self, |
| request: Request<DisableFunctionRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let user_id = get_request_user_id(&request)?; |
| let role = get_request_role(&request)?; |
| let function_id = request |
| .into_inner() |
| .function_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let mut function: Function = self |
| .read_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| ensure!( |
| role == UserRole::PlatformAdmin || function.owner == user_id, |
| ManagementServiceError::PermissionDenied |
| ); |
| let func_id = function.external_id().to_string(); |
| |
| // Updated function owner |
| let u = User { |
| id: function.owner.clone(), |
| ..Default::default() |
| }; |
| let external_id = u.external_id(); |
| let user = self.read_from_db::<User>(&external_id).await; |
| if let Ok(mut us) = user { |
| us.allowed_functions.retain(|f| !f.eq(&func_id)); |
| us.registered_functions.retain(|f| !f.eq(&func_id)); |
| self.write_to_db(&us).await?; |
| } else { |
| log::warn!("Invalid user id from functions"); |
| } |
| |
| // Update allowed function list for users |
| for user_id in &function.user_allowlist { |
| let u = User { |
| id: user_id.into(), |
| ..Default::default() |
| }; |
| let external_id = u.external_id(); |
| let user = self.read_from_db::<User>(&external_id).await; |
| if let Ok(mut us) = user { |
| us.allowed_functions.retain(|f| !f.eq(&func_id)); |
| us.registered_functions.retain(|f| !f.eq(&func_id)); |
| self.write_to_db(&us).await?; |
| } else { |
| log::warn!("Invalid user id from functions"); |
| } |
| } |
| |
| function.user_allowlist.clear(); |
| self.write_to_db(&function).await?; |
| |
| Ok(Response::new(())) |
| } |
| |
| async fn list_functions( |
| &self, |
| request: Request<ListFunctionsRequest>, |
| ) -> TeaclaveServiceResponseResult<ListFunctionsResponse> { |
| let request_user_id = request.get_ref().user_id.clone().into(); |
| |
| let current_user_id = get_request_user_id(&request)?; |
| let role = get_request_role(&request)?; |
| |
| ensure!( |
| role == UserRole::PlatformAdmin || request_user_id == current_user_id, |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let u = User { |
| id: request_user_id, |
| ..Default::default() |
| }; |
| let external_id = u.external_id(); |
| |
| let user = self.read_from_db::<User>(&external_id).await; |
| match user { |
| Ok(us) => { |
| let mut response = ListFunctionsResponse { |
| registered_functions: us.registered_functions, |
| allowed_functions: us.allowed_functions, |
| }; |
| if role == UserRole::PlatformAdmin { |
| let allowed_functions = self |
| .get_keys_by_prefix_from_db(Function::key_prefix()) |
| .await?; |
| response.allowed_functions = allowed_functions; |
| } |
| |
| Ok(Response::new(response)) |
| } |
| Err(_) => { |
| let response = ListFunctionsResponse::default(); |
| Ok(Response::new(response)) |
| } |
| } |
| } |
| |
| // access control: none |
| // when a task is created, following rules will be verified: |
| // 1) arugments match function definition |
| // 2) input files match function definition |
| // 3) output files match function definition |
| // 4) requested user_id in the user_allowlist |
| async fn create_task( |
| &self, |
| request: Request<CreateTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<CreateTaskResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let role = get_request_role(&request)?; |
| |
| let request = request.into_inner(); |
| let function_id = request |
| .function_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| let function: Function = self |
| .read_from_db(&function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| match role { |
| UserRole::DataOwner(a) | UserRole::DataOwnerManager(a) => { |
| ensure!( |
| (function.public || function.user_allowlist.contains(&a)), |
| ManagementServiceError::PermissionDenied |
| ); |
| } |
| UserRole::PlatformAdmin => (), |
| _ => { |
| return Err(ManagementServiceError::PermissionDenied.into()); |
| } |
| } |
| let task = Task::<Create>::new( |
| user_id, |
| request.executor.try_into().map_err(tonic_error)?, |
| request.function_arguments.try_into().map_err(tonic_error)?, |
| from_proto_ownership(request.inputs_ownership), |
| from_proto_ownership(request.outputs_ownership), |
| function, |
| ) |
| .map_err(|_| ManagementServiceError::InvalidTask)?; |
| |
| log::debug!("CreateTask: {:?}", task); |
| let ts: TaskState = task.into(); |
| self.write_to_db(&ts).await?; |
| |
| let response = CreateTaskResponse::new(ts.external_id()); |
| Ok(Response::new(response)) |
| } |
| |
| async fn get_task( |
| &self, |
| request: Request<GetTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<GetTaskResponse> { |
| let user_id = get_request_user_id(&request)?; |
| let task_id = request |
| .into_inner() |
| .task_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| let ts: TaskState = self |
| .read_from_db(&task_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| ensure!( |
| ts.has_participant(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| log::debug!("GetTask: {:?}", ts); |
| |
| let response = GetTaskResponse { |
| task_id: ts.external_id().to_string(), |
| creator: ts.creator.to_string(), |
| function_id: ts.function_id.to_string(), |
| function_owner: ts.function_owner.to_string(), |
| function_arguments: ts.function_arguments.clone().into_string(), |
| inputs_ownership: to_proto_ownership(ts.inputs_ownership.clone()), |
| outputs_ownership: to_proto_ownership(ts.outputs_ownership.clone()), |
| participants: ts.participants.clone().into(), |
| approved_users: ts.approved_users.clone().into(), |
| assigned_inputs: to_proto_file_ids(ts.assigned_inputs.external_ids()), |
| assigned_outputs: to_proto_file_ids(ts.assigned_outputs.external_ids()), |
| result: Some(ts.result.into()), |
| status: i32_from_task_status(ts.status), |
| }; |
| Ok(Response::new(response)) |
| } |
| |
| // prerequisite: |
| // 1) task.participants.contains(user_id) |
| // 2) task.status == Created |
| // 3) user can use the data: |
| // * input file: user_id == input_file.owner contains user_id |
| // * output file: output_file.owner contains user_id && output_file.cmac.is_none() |
| // 4) the data can be assgined to the task: |
| // * inputs_ownership or outputs_ownership contains the data name |
| // * input file: OwnerList match input_file.owner |
| // * output file: OwnerList match output_file.owner |
| async fn assign_data( |
| &self, |
| request: Request<AssignDataRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let user_id = get_request_user_id(&request)?; |
| let request = request.into_inner(); |
| let task_id = request |
| .task_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| let ts: TaskState = self |
| .read_from_db(&task_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| ensure!( |
| ts.has_participant(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let mut task: Task<Assign> = ts.try_into().map_err(|e| { |
| log::warn!("Assign state error: {:?}", e); |
| ManagementServiceError::TaskAssignDataError |
| })?; |
| let inputs = from_proto_file_ids(request.inputs).map_err(tonic_error)?; |
| for (data_name, data_id) in inputs.iter() { |
| let file: TeaclaveInputFile = self |
| .read_from_db(data_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| task.assign_input(&user_id, data_name, file) |
| .map_err(|_| ManagementServiceError::PermissionDenied)?; |
| } |
| let outputs = from_proto_file_ids(request.outputs).map_err(tonic_error)?; |
| for (data_name, data_id) in outputs.iter() { |
| let file: TeaclaveOutputFile = self |
| .read_from_db(data_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidDataId)?; |
| task.assign_output(&user_id, data_name, file) |
| .map_err(|_| ManagementServiceError::PermissionDenied)?; |
| } |
| |
| log::debug!("AssignData: {:?}", task); |
| |
| let ts: TaskState = task.into(); |
| self.write_to_db(&ts).await?; |
| |
| Ok(Response::new(())) |
| } |
| |
| // prerequisite: |
| // 1) task status == Ready |
| // 2) user_id in task.participants |
| async fn approve_task( |
| &self, |
| request: Request<ApproveTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let user_id = get_request_user_id(&request)?; |
| |
| let task_id = request |
| .into_inner() |
| .task_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| let ts: TaskState = self |
| .read_from_db(&task_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| let mut task: Task<Approve> = ts.try_into().map_err(|e| { |
| log::warn!("Approve state error: {:?}", e); |
| ManagementServiceError::TaskApproveError |
| })?; |
| |
| task.approve(&user_id) |
| .map_err(|_| ManagementServiceError::PermissionDenied)?; |
| |
| log::debug!("ApproveTask: approve:{:?}", task); |
| |
| let ts: TaskState = task.into(); |
| self.write_to_db(&ts).await?; |
| |
| Ok(Response::new(())) |
| } |
| |
| // prerequisite: |
| // 1) task status == Approved |
| // 2) user_id == task.creator |
| async fn invoke_task( |
| &self, |
| request: Request<InvokeTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let user_id = get_request_user_id(&request)?; |
| let task_id = request |
| .into_inner() |
| .task_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| let ts: TaskState = self |
| .read_from_db(&task_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| // Early validation |
| ensure!( |
| ts.has_creator(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let function: Function = self |
| .read_from_db(&ts.function_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| |
| log::debug!("InvokeTask: get function: {:?}", function); |
| |
| let usage = FunctionUsage { |
| function_id: function.id, |
| ..Default::default() |
| }; |
| let external_id = usage.external_id(); |
| let mut function_usage = self |
| .read_from_db::<FunctionUsage>(&external_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidFunctionId)?; |
| let function_current_use_numbers = function_usage.use_numbers; |
| |
| if let Some(quota) = function.usage_quota { |
| if quota <= function_current_use_numbers { |
| return Err(ManagementServiceError::FunctionQuotaError.into()); |
| } |
| } |
| |
| let mut task: Task<Stage> = ts.try_into().map_err(|e| { |
| log::warn!("Stage state error: {:?}", e); |
| ManagementServiceError::TaskInvokeError |
| })?; |
| |
| log::debug!("InvokeTask: get task: {:?}", task); |
| let staged_task = task |
| .stage_for_running(&user_id, function) |
| .map_err(|_| ManagementServiceError::PermissionDenied)?; |
| log::debug!("InvokeTask: staged task: {:?}", staged_task); |
| self.enqueue_to_db(StagedTask::get_queue_key().as_bytes(), &staged_task) |
| .await?; |
| |
| let ts: TaskState = task.into(); |
| self.write_to_db(&ts).await?; |
| |
| function_usage.use_numbers = function_current_use_numbers + 1; |
| self.write_to_db(&function_usage).await?; |
| Ok(Response::new(())) |
| } |
| |
| async fn cancel_task( |
| &self, |
| request: Request<CancelTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let user_id = get_request_user_id(&request)?; |
| let role = get_request_role(&request)?; |
| let task_id = request |
| .into_inner() |
| .task_id |
| .try_into() |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| let ts: TaskState = self |
| .read_from_db(&task_id) |
| .await |
| .map_err(|_| ManagementServiceError::InvalidTaskId)?; |
| |
| ensure!( |
| role == UserRole::PlatformAdmin || ts.has_creator(&user_id), |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| match ts.status { |
| // need scheduler to cancel the task |
| TaskStatus::Staged | TaskStatus::Running => { |
| self.enqueue_to_db(CANCEL_QUEUE_KEY.as_bytes(), &ts).await?; |
| } |
| _ => { |
| // early cancelation |
| // race will not affect correctness/privacy |
| let mut task: Task<Cancel> = ts.try_into().map_err(|e| { |
| log::warn!("Cancel state error: {:?}", e); |
| ManagementServiceError::TaskCancelError( |
| "task has already been canceled".to_string(), |
| ) |
| })?; |
| |
| log::debug!("Canceled Task: {:?}", task); |
| |
| task.update_result(TaskResult::Err(TaskFailure { |
| reason: "Task canceled".to_string(), |
| })) |
| .map_err(|_| { |
| ManagementServiceError::TaskCancelError("cannot update result".to_string()) |
| })?; |
| let ts: TaskState = task.into(); |
| self.write_to_db(&ts).await?; |
| |
| log::warn!("Canceled Task: writtenback"); |
| } |
| } |
| |
| Ok(Response::new(())) |
| } |
| |
| // access control: none |
| async fn save_logs( |
| &self, |
| request: Request<SaveLogsRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let request = request.into_inner(); |
| |
| let entries: Result<Vec<Entry>> = request.logs.into_iter().map(Entry::try_from).collect(); |
| let logs = entries.map_err(|e| { |
| let err_msg = format!("failed to transform entries {:?}", e); |
| ManagementServiceError::AuditError(err_msg) |
| })?; |
| |
| let auditor = self.auditor.clone(); |
| task::spawn_blocking(move || auditor.add_logs(logs)) |
| .await |
| .map_err(|e| anyhow!("{}", e.to_string())) |
| .flatten() |
| .map_err(|e| { |
| let err_msg = format!("failed to save logs {:?}", e); |
| ManagementServiceError::AuditError(err_msg) |
| })?; |
| |
| Ok(Response::new(())) |
| } |
| |
| async fn query_audit_logs( |
| &self, |
| request: Request<QueryAuditLogsRequest>, |
| ) -> TeaclaveServiceResponseResult<QueryAuditLogsResponse> { |
| let role = get_request_role(&request)?; |
| ensure!( |
| role == UserRole::PlatformAdmin, |
| ManagementServiceError::PermissionDenied |
| ); |
| |
| let request = request.into_inner(); |
| let auditor = self.auditor.clone(); |
| let logs = task::spawn_blocking(move || { |
| auditor.query_logs(&request.query, request.limit as usize) |
| }) |
| .await |
| .map_err(|e| anyhow!("{}", e.to_string())) |
| .flatten() |
| .map_err(|e| { |
| let err_msg = format!("failed to query logs {:?}", e); |
| ManagementServiceError::AuditError(err_msg) |
| })?; |
| |
| let response = QueryAuditLogsResponse::new(logs); |
| Ok(Response::new(response)) |
| } |
| } |
| |
| impl TeaclaveManagementService { |
| pub(crate) async fn new(storage_service_endpoint: Endpoint) -> anyhow::Result<Self> { |
| let channel = storage_service_endpoint |
| .connect() |
| .await |
| .map_err(|e| anyhow!("Failed to connect to storage service, {:?}", e))?; |
| let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new_with_builtin_config( |
| channel, |
| ))); |
| let client_clone = storage_client.clone(); |
| let auditor = task::spawn_blocking(move || Auditor::try_new(client_clone)).await??; |
| let service = Self { |
| storage_client, |
| auditor, |
| }; |
| |
| #[cfg(test_mode)] |
| service.add_mock_data().await?; |
| |
| Ok(service) |
| } |
| |
| async fn write_to_db(&self, item: &impl Storable) -> Result<(), ManagementServiceError> { |
| let k = item.key(); |
| let v = item.to_vec()?; |
| let put_request = PutRequest::new(k.as_slice(), v.as_slice()); |
| let _put_response = self |
| .storage_client |
| .clone() |
| .lock() |
| .await |
| .put(put_request) |
| .await |
| .map_err(|e| ManagementServiceError::Service(e.into()))?; |
| Ok(()) |
| } |
| |
| async fn read_from_db<T: Storable>( |
| &self, |
| key: &ExternalID, |
| ) -> Result<T, ManagementServiceError> { |
| ensure!( |
| T::match_prefix(&key.prefix), |
| anyhow!("key prefix doesn't match") |
| ); |
| |
| let request = GetRequest::new(key.to_bytes()); |
| let response = self |
| .storage_client |
| .clone() |
| .lock() |
| .await |
| .get(request) |
| .await |
| .map_err(|e| ManagementServiceError::Service(e.into()))? |
| .into_inner(); |
| T::from_slice(response.value.as_slice()).map_err(ManagementServiceError::Service) |
| } |
| |
| async fn get_keys_by_prefix_from_db( |
| &self, |
| prefix: impl Into<Vec<u8>>, |
| ) -> Result<Vec<String>, ManagementServiceError> { |
| let request = GetKeysByPrefixRequest::new(prefix.into()); |
| let response = self |
| .storage_client |
| .clone() |
| .lock() |
| .await |
| .get_keys_by_prefix(request) |
| .await |
| .map_err(|e| ManagementServiceError::Service(e.into()))?; |
| Ok(response |
| .into_inner() |
| .keys |
| .into_iter() |
| .map(String::from_utf8) |
| .collect::<Result<Vec<_>, _>>() |
| .map_err(|_| anyhow!("cannot convert keys"))?) |
| } |
| |
| async fn delete_from_db(&self, key: &ExternalID) -> Result<(), ManagementServiceError> { |
| let request = DeleteRequest::new(key.to_bytes()); |
| self.storage_client |
| .clone() |
| .lock() |
| .await |
| .delete(request) |
| .await |
| .map_err(|e| ManagementServiceError::Service(e.into()))?; |
| Ok(()) |
| } |
| |
| async fn enqueue_to_db( |
| &self, |
| key: &[u8], |
| item: &impl Storable, |
| ) -> Result<(), ManagementServiceError> { |
| let value = item.to_vec()?; |
| let enqueue_request = EnqueueRequest::new(key, value); |
| let _enqueue_response = self |
| .storage_client |
| .clone() |
| .lock() |
| .await |
| .enqueue(enqueue_request) |
| .await |
| .map_err(|e| ManagementServiceError::Service(e.into()))?; |
| Ok(()) |
| } |
| |
| #[cfg(test_mode)] |
| async fn add_mock_data(&self) -> anyhow::Result<()> { |
| let mut output_file = create_fusion_data(vec!["mock_user1", "frontend_user"])?; |
| output_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000001")?; |
| output_file.cmac = Some(FileAuthTag::mock()); |
| self.write_to_db(&output_file).await?; |
| |
| let mut output_file = create_fusion_data(vec!["mock_user2", "mock_user3"])?; |
| output_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000002")?; |
| output_file.cmac = Some(FileAuthTag::mock()); |
| self.write_to_db(&output_file).await?; |
| |
| let mut input_file = TeaclaveInputFile::from_output(output_file)?; |
| input_file.uuid = Uuid::parse_str("00000000-0000-0000-0000-000000000002")?; |
| self.write_to_db(&input_file).await?; |
| |
| let function_input = FunctionInput::new("input", "input_desc", false); |
| let function_output = FunctionOutput::new("output", "output_desc", false); |
| let function_input2 = FunctionInput::new("input2", "input_desc", false); |
| let function_output2 = FunctionOutput::new("output2", "output_desc", false); |
| let function_arg1 = FunctionArgument::new("arg1", "", true); |
| let function_arg2 = FunctionArgument::new("arg2", "", true); |
| |
| let function_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); |
| let function = FunctionBuilder::new() |
| .id(function_id) |
| .name("mock-func-1") |
| .description("mock-desc") |
| .payload(b"mock-payload".to_vec()) |
| .public(true) |
| .arguments(vec![function_arg1, function_arg2]) |
| .inputs(vec![function_input, function_input2]) |
| .outputs(vec![function_output, function_output2]) |
| .owner("teaclave".to_string()) |
| .build(); |
| |
| let function_usage = FunctionUsage { |
| function_id, |
| use_numbers: 0, |
| }; |
| |
| self.write_to_db(&function).await?; |
| self.write_to_db(&function_usage).await?; |
| |
| let function_output = FunctionOutput::new("output", "output_desc", false); |
| let function_arg1 = FunctionArgument::new("arg1", "", true); |
| let function_id = Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap(); |
| |
| let function = FunctionBuilder::new() |
| .id(function_id) |
| .name("mock-func-2") |
| .description("mock-desc") |
| .payload(b"mock-payload".to_vec()) |
| .public(true) |
| .arguments(vec![function_arg1.clone()]) |
| .outputs(vec![function_output]) |
| .owner("teaclave".to_string()) |
| .build(); |
| |
| let function_usage = FunctionUsage { |
| function_id, |
| use_numbers: 0, |
| }; |
| |
| self.write_to_db(&function).await?; |
| self.write_to_db(&function_usage).await?; |
| |
| let function_id = Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap(); |
| let function = FunctionBuilder::new() |
| .id(function_id) |
| .name("mock-func-3") |
| .description("Private mock function") |
| .payload(b"mock-payload".to_vec()) |
| .public(false) |
| .arguments(vec![function_arg1]) |
| .owner("mock_user".to_string()) |
| .user_allowlist(vec!["mock_user".to_string(), "mock_user1".to_string()]) |
| .build(); |
| |
| let function_usage = FunctionUsage { |
| function_id, |
| use_numbers: 0, |
| }; |
| |
| self.write_to_db(&function).await?; |
| self.write_to_db(&function_usage).await?; |
| |
| Ok(()) |
| } |
| } |
| |
| fn get_request_user_id<T>(request: &Request<T>) -> Result<UserID, ManagementServiceError> { |
| let user_id = request |
| .metadata() |
| .get("id") |
| .and_then(|x| x.to_str().ok()) |
| .ok_or(ManagementServiceError::MissingUserId)?; |
| Ok(user_id.to_string().into()) |
| } |
| |
| fn get_request_role<T>(request: &Request<T>) -> Result<UserRole, ManagementServiceError> { |
| let role = request |
| .metadata() |
| .get("role") |
| .and_then(|x| x.to_str().ok()) |
| .ok_or(ManagementServiceError::MissingUserRole)?; |
| Ok(UserRole::from_str(role)) |
| } |
| |
| fn create_fusion_data(owners: impl Into<OwnerList>) -> anyhow::Result<TeaclaveOutputFile> { |
| let uuid = Uuid::new_v4(); |
| let url = format!("fusion:///TEACLAVE_FUSION_BASE/{}.fusion", uuid); |
| let url = Url::parse(&url).map_err(|_| anyhow!("invalid url"))?; |
| let crypto_info = FileCrypto::default(); |
| |
| Ok(TeaclaveOutputFile::new(url, crypto_info, owners)) |
| } |
| |
| #[cfg(feature = "enclave_unit_test")] |
| pub mod tests { |
| use super::*; |
| use serde_json::json; |
| use std::collections::HashMap; |
| use teaclave_types::{ |
| hashmap, Executor, FileAuthTag, FileCrypto, FunctionArguments, FunctionInput, |
| FunctionInputFile, FunctionOutput, FunctionOutputFile, |
| }; |
| use url::Url; |
| |
| pub fn handle_input_file() { |
| let url = Url::parse("s3://bucket_id/path?token=mock_token").unwrap(); |
| let cmac = FileAuthTag::mock(); |
| let input_file = |
| TeaclaveInputFile::new(url, cmac, FileCrypto::default(), vec!["mock_user"]); |
| assert!(TeaclaveInputFile::match_prefix(&input_file.key_string())); |
| let value = input_file.to_vec().unwrap(); |
| let deserialized_file = TeaclaveInputFile::from_slice(&value).unwrap(); |
| debug!("file: {:?}", deserialized_file); |
| } |
| |
| pub fn handle_output_file() { |
| let url = Url::parse("s3://bucket_id/path?token=mock_token").unwrap(); |
| let output_file = TeaclaveOutputFile::new(url, FileCrypto::default(), vec!["mock_user"]); |
| assert!(TeaclaveOutputFile::match_prefix(&output_file.key_string())); |
| let value = output_file.to_vec().unwrap(); |
| let deserialized_file = TeaclaveOutputFile::from_slice(&value).unwrap(); |
| debug!("file: {:?}", deserialized_file); |
| } |
| |
| pub fn handle_function() { |
| let function_input = FunctionInput::new("input", "input_desc", false); |
| let function_output = FunctionOutput::new("output", "output_desc", false); |
| let function_arg = FunctionArgument::new("arg", "", true); |
| let function = FunctionBuilder::new() |
| .id(Uuid::new_v4()) |
| .name("mock_function") |
| .description("mock function") |
| .payload(b"python script".to_vec()) |
| .arguments(vec![function_arg]) |
| .inputs(vec![function_input]) |
| .outputs(vec![function_output]) |
| .public(true) |
| .owner("mock_user") |
| .build(); |
| assert!(Function::match_prefix(&function.key_string())); |
| let value = function.to_vec().unwrap(); |
| let deserialized_function = Function::from_slice(&value).unwrap(); |
| debug!("function: {:?}", deserialized_function); |
| } |
| |
| pub fn check_function_quota() { |
| let function = FunctionBuilder::new().build(); |
| assert_eq!(function.usage_quota, None); |
| |
| let function = FunctionBuilder::new().usage_quota(Some(-5)).build(); |
| assert_eq!(function.usage_quota, None); |
| |
| let function = FunctionBuilder::new().usage_quota(Some(5)).build(); |
| assert_eq!(function.usage_quota, Some(5)); |
| } |
| |
| pub fn handle_task() { |
| let function_arg = FunctionArgument::new("arg", "", true); |
| let function = FunctionBuilder::new() |
| .id(Uuid::new_v4()) |
| .name("mock_function") |
| .description("mock function") |
| .payload(b"python script".to_vec()) |
| .arguments(vec![function_arg]) |
| .public(true) |
| .owner("mock_user") |
| .build(); |
| let function_arguments = FunctionArguments::from_json(json!({"arg": "data"})).unwrap(); |
| |
| let task = Task::<Create>::new( |
| UserID::from("mock_user"), |
| Executor::MesaPy, |
| function_arguments, |
| HashMap::new(), |
| HashMap::new(), |
| function, |
| ) |
| .unwrap(); |
| |
| let ts: TaskState = task.try_into().unwrap(); |
| let value = ts.to_vec().unwrap(); |
| let deserialized_task = TaskState::from_slice(&value).unwrap(); |
| debug!("task: {:?}", deserialized_task); |
| } |
| |
| pub fn handle_staged_task() { |
| let function = FunctionBuilder::new() |
| .id(Uuid::new_v4()) |
| .name("mock_function") |
| .description("mock function") |
| .payload(b"python script".to_vec()) |
| .public(true) |
| .owner("mock_user") |
| .build(); |
| |
| let url = Url::parse("s3://bucket_id/path?token=mock_token").unwrap(); |
| let cmac = FileAuthTag::mock(); |
| let input_data = FunctionInputFile::new(url.clone(), cmac, FileCrypto::default()); |
| let output_data = FunctionOutputFile::new(url, FileCrypto::default()); |
| |
| let staged_task = StagedTaskBuilder::new() |
| .task_id(Uuid::new_v4()) |
| .executor(Executor::MesaPy) |
| .function_payload(function.payload) |
| .function_arguments(hashmap!("arg" => "data")) |
| .input_data(hashmap!("input" => input_data)) |
| .output_data(hashmap!("output" => output_data)) |
| .build(); |
| |
| let value = staged_task.to_vec().unwrap(); |
| let deserialized_data = StagedTask::from_slice(&value).unwrap(); |
| debug!("staged task: {:?}", deserialized_data); |
| } |
| |
| #[derive(serde::Deserialize, Debug)] |
| struct TestFunctionArguments { |
| arg_bool: bool, |
| arg_usize: usize, |
| } |
| |
| pub fn deserialize_function_arguments() { |
| let arguments = vec![ |
| FunctionArgument::new("arg_bool", "", true), |
| FunctionArgument::new("arg_usize", "", true), |
| ]; |
| let function = FunctionBuilder::new() |
| .id(Uuid::new_v4()) |
| .name("mock_function") |
| .description("mock function") |
| .arguments(arguments) |
| .public(true) |
| .owner("mock_user") |
| .build(); |
| let request = CreateTaskRequest::new() |
| .function_arguments(hashmap!("arg_bool" => true,"arg_usize" => 10)) |
| .executor(Executor::Builtin); |
| let task = Task::<Create>::new( |
| "mock_user".into(), |
| request.executor.try_into().unwrap(), |
| request.function_arguments.try_into().unwrap(), |
| from_proto_ownership(request.inputs_ownership), |
| from_proto_ownership(request.outputs_ownership), |
| function, |
| ) |
| .unwrap(); |
| let ts: TaskState = task.try_into().unwrap(); |
| let deserialized_argument: TestFunctionArguments = |
| serde_json::from_str(&ts.function_arguments.into_string()).unwrap(); |
| assert!(deserialized_argument.arg_bool); |
| assert_eq!(deserialized_argument.arg_usize, 10); |
| |
| let arguments = vec![ |
| FunctionArgument::new("arg_bool", "true", true), |
| FunctionArgument::new("arg_usize", "10", false), |
| ]; |
| let function = FunctionBuilder::new() |
| .id(Uuid::new_v4()) |
| .name("mock_function2") |
| .description("mock function") |
| .arguments(arguments) |
| .public(true) |
| .owner("mock_user") |
| .build(); |
| let request = CreateTaskRequest::new() |
| .function_arguments(hashmap!("arg_bool" => false)) |
| .executor(Executor::Builtin); |
| let task = Task::<Create>::new( |
| "mock_user".into(), |
| request.executor.try_into().unwrap(), |
| request.function_arguments.try_into().unwrap(), |
| from_proto_ownership(request.inputs_ownership), |
| from_proto_ownership(request.outputs_ownership), |
| function, |
| ) |
| .unwrap(); |
| let ts: TaskState = task.try_into().unwrap(); |
| let result = |
| serde_json::from_str::<TestFunctionArguments>(&ts.function_arguments.into_string()); |
| assert!(result.is_err()); |
| let err_msg = format!("{:?}", result.unwrap_err()); |
| assert!(err_msg.contains("invalid type: string \\\"10\\\", expected usize")); |
| } |
| } |