| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use crate::FunctionArguments; |
| use crate::Storable; |
| use crate::*; |
| use anyhow::{anyhow, bail, ensure, Error, Result}; |
| use serde::{Deserialize, Serialize}; |
| use std::collections::hash_map::Iter; |
| use std::collections::{HashMap, HashSet}; |
| use std::convert::TryInto; |
| use uuid::Uuid; |
| |
| #[derive(Debug, Default, Clone, Deserialize, PartialEq, Eq, Hash, Serialize)] |
| pub struct UserID(String); |
| |
| impl std::convert::From<String> for UserID { |
| fn from(uid: String) -> UserID { |
| UserID(uid) |
| } |
| } |
| |
| impl std::convert::From<&str> for UserID { |
| fn from(uid: &str) -> UserID { |
| UserID(uid.to_string()) |
| } |
| } |
| |
| impl std::convert::From<UserID> for String { |
| fn from(user_id: UserID) -> String { |
| user_id.to_string() |
| } |
| } |
| |
| impl std::fmt::Display for UserID { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "{}", self.0) |
| } |
| } |
| |
| pub type UserList = OwnerList; |
| |
| #[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)] |
| pub struct OwnerList { |
| pub uids: HashSet<UserID>, |
| } |
| |
| impl OwnerList { |
| pub fn new<T: IntoIterator>(uids: T) -> Self |
| where |
| <T as IntoIterator>::Item: ToString, |
| { |
| OwnerList { |
| uids: uids.into_iter().map(|x| UserID(x.to_string())).collect(), |
| } |
| } |
| |
| pub fn contains(&self, uid: &UserID) -> bool { |
| self.uids.contains(uid) |
| } |
| |
| pub fn len(&self) -> usize { |
| self.uids.len() |
| } |
| |
| pub fn is_empty(&self) -> bool { |
| self.len() == 0 |
| } |
| |
| pub fn insert(&mut self, value: UserID) -> bool { |
| self.uids.insert(value) |
| } |
| |
| pub fn union(mut self, other: Self) -> Self { |
| for value in other.uids { |
| self.uids.insert(value); |
| } |
| self |
| } |
| |
| pub fn unions<I>(i: I) -> Self |
| where |
| I: IntoIterator<Item = Self>, |
| { |
| i.into_iter().fold(Self::default(), Self::union) |
| } |
| } |
| |
| impl<T> std::convert::From<Vec<T>> for OwnerList |
| where |
| T: ToString, |
| { |
| fn from(owners: Vec<T>) -> Self { |
| OwnerList::new(owners) |
| } |
| } |
| |
| impl std::convert::From<OwnerList> for Vec<String> { |
| fn from(owners: OwnerList) -> Vec<String> { |
| owners.into_iter().map(|uid| uid.to_string()).collect() |
| } |
| } |
| |
| impl IntoIterator for OwnerList { |
| type Item = UserID; |
| type IntoIter = std::collections::hash_set::IntoIter<Self::Item>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| self.uids.into_iter() |
| } |
| } |
| |
| #[derive(Debug, Deserialize, Serialize, std::cmp::PartialEq)] |
| pub enum TaskStatus { |
| Created, |
| DataAssigned, |
| Approved, |
| Staged, |
| Running, |
| Finished, |
| } |
| |
| impl Default for TaskStatus { |
| fn default() -> Self { |
| Self::Created |
| } |
| } |
| |
| #[derive(Debug, Default, Deserialize, Serialize)] |
| pub struct OutputsTags { |
| inner: HashMap<String, FileAuthTag>, |
| } |
| |
| impl OutputsTags { |
| pub fn new(hm: HashMap<String, FileAuthTag>) -> Self { |
| Self { inner: hm } |
| } |
| |
| pub fn iter(&self) -> Iter<String, FileAuthTag> { |
| self.inner.iter() |
| } |
| |
| pub fn get(&self, key: &str) -> Option<&FileAuthTag> { |
| self.inner.get(key) |
| } |
| |
| pub fn len(&self) -> usize { |
| self.inner.len() |
| } |
| |
| pub fn is_empty(&self) -> bool { |
| self.len() == 0 |
| } |
| } |
| |
| impl std::convert::TryFrom<HashMap<String, String>> for OutputsTags { |
| type Error = anyhow::Error; |
| fn try_from(input: HashMap<String, String>) -> Result<Self> { |
| let mut ret = HashMap::with_capacity(input.len()); |
| for (k, v) in input.iter() { |
| let tag = FileAuthTag::from_hex(v)?; |
| ret.insert(k.to_string(), tag); |
| } |
| Ok(OutputsTags::new(ret)) |
| } |
| } |
| |
| impl<S: std::default::Default + std::hash::BuildHasher> std::convert::From<OutputsTags> |
| for HashMap<String, String, S> |
| { |
| fn from(tags: OutputsTags) -> HashMap<String, String, S> { |
| tags.iter() |
| .map(|(k, v)| (k.to_string(), v.to_hex())) |
| .collect() |
| } |
| } |
| |
| impl std::iter::FromIterator<(String, FileAuthTag)> for OutputsTags { |
| fn from_iter<T: IntoIterator<Item = (String, FileAuthTag)>>(iter: T) -> Self { |
| OutputsTags { |
| inner: HashMap::from_iter(iter), |
| } |
| } |
| } |
| |
| #[derive(Debug, Deserialize, Serialize)] |
| pub struct TaskOutputs { |
| pub return_value: Vec<u8>, |
| pub tags_map: OutputsTags, |
| } |
| |
| impl TaskOutputs { |
| pub fn new(value: impl Into<Vec<u8>>, tags_map: HashMap<String, FileAuthTag>) -> Self { |
| TaskOutputs { |
| return_value: value.into(), |
| tags_map: OutputsTags::new(tags_map), |
| } |
| } |
| } |
| |
| #[derive(Debug, Deserialize, Serialize)] |
| pub struct TaskFailure { |
| pub reason: String, |
| } |
| |
| impl TaskFailure { |
| pub fn new(reason: impl ToString) -> Self { |
| TaskFailure { |
| reason: reason.to_string(), |
| } |
| } |
| } |
| |
| impl std::fmt::Display for TaskFailure { |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| write!(f, "TaskFailure {}", self.reason) |
| } |
| } |
| |
| #[derive(Debug, Clone, PartialEq, Default, Deserialize, Serialize)] |
| pub struct ExternalID { |
| pub prefix: String, |
| pub uuid: Uuid, |
| } |
| |
| impl ExternalID { |
| pub fn new(prefix: impl ToString, uuid: Uuid) -> Self { |
| ExternalID { |
| prefix: prefix.to_string(), |
| uuid, |
| } |
| } |
| |
| pub fn to_bytes(&self) -> Vec<u8> { |
| self.to_string().into_bytes() |
| } |
| } |
| |
| impl ToString for ExternalID { |
| fn to_string(&self) -> String { |
| format!("{}-{}", self.prefix, self.uuid.to_string()) |
| } |
| } |
| |
| impl std::convert::From<&ExternalID> for ExternalID { |
| fn from(s: &ExternalID) -> ExternalID { |
| s.clone() |
| } |
| } |
| |
| impl std::convert::TryFrom<&str> for ExternalID { |
| type Error = anyhow::Error; |
| fn try_from(ext_id: &str) -> Result<Self> { |
| let pos = ext_id |
| .find('-') |
| .ok_or_else(|| anyhow!("Invalid external id: {}", ext_id))?; |
| let (part0, part1) = ext_id.split_at(pos); |
| ensure!(part1.len() > 1, "Invalid external id: {}", ext_id); |
| let eid = ExternalID { |
| prefix: part0.to_string(), |
| uuid: Uuid::parse_str(&part1[1..])?, |
| }; |
| Ok(eid) |
| } |
| } |
| |
| impl std::convert::TryFrom<String> for ExternalID { |
| type Error = anyhow::Error; |
| fn try_from(ext_id: String) -> Result<Self> { |
| ext_id.as_str().try_into() |
| } |
| } |
| |
| #[derive(Debug, Deserialize, Serialize)] |
| pub enum TaskResult { |
| NotReady, |
| Ok(TaskOutputs), |
| Err(TaskFailure), |
| } |
| |
| impl TaskResult { |
| pub fn is_ok(&self) -> bool { |
| match self { |
| TaskResult::Ok(_) => true, |
| _ => false, |
| } |
| } |
| |
| #[cfg(test_mode)] |
| pub fn unwrap(self) -> TaskOutputs { |
| match self { |
| TaskResult::Ok(t) => t, |
| TaskResult::Err(e) => { |
| panic!("called `TaskResult::unwrap()` on an `Err` value: {:?}", &e) |
| } |
| TaskResult::NotReady => panic!("called `TaskResult::unwrap()` on NotReady case"), |
| } |
| } |
| } |
| |
| impl Default for TaskResult { |
| fn default() -> Self { |
| TaskResult::NotReady |
| } |
| } |
| |
| // This is intended for proto::TaskResult field |
| // Since proto::TaskResult is a wrapper of One-Of keywords, |
| // it is always converted to an Option<proto::TaskResult> |
| // when referenced in a request/response structure. |
| impl<T> std::convert::TryFrom<Option<T>> for TaskResult |
| where |
| T: TryInto<TaskResult, Error = Error>, |
| { |
| type Error = Error; |
| fn try_from(option: Option<T>) -> Result<Self> { |
| let ret = match option { |
| Some(result) => result.try_into()?, |
| None => unreachable!(), |
| }; |
| Ok(ret) |
| } |
| } |
| |
| impl<T, E> std::convert::From<TaskResult> for Option<std::result::Result<T, E>> |
| where |
| T: From<TaskOutputs>, |
| E: From<TaskFailure>, |
| { |
| fn from(task_result: TaskResult) -> Option<std::result::Result<T, E>> { |
| match task_result { |
| TaskResult::Ok(t) => Some(Ok(t.into())), |
| TaskResult::Err(e) => Some(Err(e.into())), |
| TaskResult::NotReady => None, |
| } |
| } |
| } |
| |
| const TASK_PREFIX: &str = "task"; |
| |
| #[derive(Debug, Default, Deserialize, Serialize)] |
| pub struct Task { |
| pub task_id: Uuid, |
| pub creator: UserID, |
| pub function_id: ExternalID, |
| pub function_arguments: FunctionArguments, |
| pub executor: Executor, |
| pub input_owners_map: HashMap<String, OwnerList>, |
| pub output_owners_map: HashMap<String, OwnerList>, |
| pub function_owner: UserID, |
| pub participants: UserList, |
| pub approved_users: UserList, |
| pub input_map: HashMap<String, ExternalID>, |
| pub output_map: HashMap<String, ExternalID>, |
| pub result: TaskResult, |
| pub status: TaskStatus, |
| } |
| |
| impl Storable for Task { |
| fn key_prefix() -> &'static str { |
| TASK_PREFIX |
| } |
| |
| fn uuid(&self) -> Uuid { |
| self.task_id |
| } |
| } |
| |
| impl Task { |
| pub fn new( |
| requester: UserID, |
| req_executor: Executor, |
| req_func_args: FunctionArguments, |
| req_input_owners: HashMap<String, OwnerList>, |
| req_output_owners: HashMap<String, OwnerList>, |
| function: Function, |
| ) -> Result<Self> { |
| // gather all participants |
| let input_owners = UserList::unions(req_input_owners.values().cloned()); |
| let output_owners = UserList::unions(req_output_owners.values().cloned()); |
| let mut participants = UserList::unions(vec![input_owners, output_owners]); |
| participants.insert(requester.clone()); |
| if !function.public { |
| participants.insert(function.owner.clone()); |
| } |
| |
| //check function compatibility |
| let fn_args_spec: HashSet<&String> = function.arguments.iter().collect(); |
| let req_args: HashSet<&String> = req_func_args.inner().keys().collect(); |
| ensure!(fn_args_spec == req_args, "function_arguments mismatch"); |
| |
| // check input fkeys |
| let inputs_spec: HashSet<&String> = function.inputs.iter().map(|f| &f.name).collect(); |
| let req_input_fkeys: HashSet<&String> = req_input_owners.keys().collect(); |
| ensure!(inputs_spec == req_input_fkeys, "input keys mismatch"); |
| |
| // check output fkeys |
| let outputs_spec: HashSet<&String> = function.outputs.iter().map(|f| &f.name).collect(); |
| let req_output_fkeys: HashSet<&String> = req_output_owners.keys().collect(); |
| ensure!(outputs_spec == req_output_fkeys, "output keys mismatch"); |
| |
| // Skip the assignment if no file is required |
| let status = if req_input_owners.is_empty() && req_output_owners.is_empty() { |
| TaskStatus::DataAssigned |
| } else { |
| TaskStatus::Created |
| }; |
| |
| let task = Task { |
| task_id: Uuid::new_v4(), |
| creator: requester, |
| executor: req_executor, |
| function_id: function.external_id(), |
| function_owner: function.owner.clone(), |
| function_arguments: req_func_args, |
| input_owners_map: req_input_owners, |
| output_owners_map: req_output_owners, |
| participants, |
| status, |
| ..Default::default() |
| }; |
| |
| Ok(task) |
| } |
| |
| pub fn approve(&mut self, requester: &UserID) -> Result<()> { |
| ensure!( |
| self.status == TaskStatus::DataAssigned, |
| "Unexpected task status when approving: {:?}", |
| self.status |
| ); |
| |
| ensure!( |
| self.participants.contains(requester), |
| "Unexpected user trying to approve a task: {:?}", |
| requester |
| ); |
| |
| self.approved_users.insert(requester.clone()); |
| if self.participants == self.approved_users { |
| self.update_status(TaskStatus::Approved); |
| } |
| |
| Ok(()) |
| } |
| |
| pub fn invoking_by_executor(&mut self) -> Result<()> { |
| ensure!( |
| self.status == TaskStatus::Staged, |
| "Unexpected task status when invoked: {:?}", |
| self.status |
| ); |
| self.status = TaskStatus::Running; |
| Ok(()) |
| } |
| |
| pub fn finish(&mut self, result: TaskResult) -> Result<()> { |
| ensure!( |
| self.status == TaskStatus::Running, |
| "Unexpected task status when invoked: {:?}", |
| self.status |
| ); |
| self.result = result; |
| self.status = TaskStatus::Finished; |
| Ok(()) |
| } |
| |
| pub fn stage_for_running( |
| &mut self, |
| requester: &UserID, |
| function: Function, |
| input_map: HashMap<String, FunctionInputFile>, |
| output_map: HashMap<String, FunctionOutputFile>, |
| ) -> Result<StagedTask> { |
| ensure!( |
| &self.creator == requester, |
| "Unexpected user trying to invoke a task: {:?}", |
| requester |
| ); |
| ensure!( |
| self.status == TaskStatus::Approved, |
| "Unexpected task status when invoked: {:?}", |
| self.status |
| ); |
| let function_arguments = self.function_arguments.clone(); |
| let staged_task = StagedTask { |
| task_id: self.task_id, |
| executor: self.executor, |
| executor_type: function.executor_type, |
| function_id: function.id, |
| function_payload: function.payload, |
| function_arguments, |
| input_data: input_map.into(), |
| output_data: output_map.into(), |
| }; |
| |
| self.update_status(TaskStatus::Staged); |
| Ok(staged_task) |
| } |
| |
| pub fn assign_input( |
| &mut self, |
| requester: &UserID, |
| fname: &str, |
| file: &TeaclaveInputFile, |
| ) -> Result<()> { |
| ensure!( |
| self.status == TaskStatus::Created, |
| "Unexpected task status during input assignment: {:?}", |
| self.status |
| ); |
| |
| ensure!( |
| file.owner.contains(requester), |
| "Assign: requester is not in the owner list. {:?}.", |
| file.external_id() |
| ); |
| |
| match self.input_owners_map.get(fname) { |
| Some(owner_list) => { |
| ensure!( |
| owner_list == &file.owner, |
| "Assign: file ownership mismatch. {:?}", |
| file.external_id() |
| ); |
| } |
| None => bail!( |
| "Assign: file name not exist in input_owners_map. {:?}", |
| fname |
| ), |
| }; |
| |
| ensure!( |
| self.input_map.get(fname).is_none(), |
| "Assign: file already assigned. {:?}", |
| fname |
| ); |
| |
| self.input_map.insert(fname.to_owned(), file.external_id()); |
| |
| if self.all_data_assigned() { |
| self.update_status(TaskStatus::DataAssigned); |
| } |
| Ok(()) |
| } |
| |
| pub fn assign_output( |
| &mut self, |
| requester: &UserID, |
| fname: &str, |
| file: &TeaclaveOutputFile, |
| ) -> Result<()> { |
| ensure!( |
| self.status == TaskStatus::Created, |
| "Unexpected task status during output assignment: {:?}", |
| self.status |
| ); |
| |
| ensure!( |
| file.owner.contains(requester), |
| "Assign: requester is not in the owner list. {:?}.", |
| file.external_id() |
| ); |
| |
| match self.output_owners_map.get(fname) { |
| Some(owner_list) => { |
| ensure!( |
| owner_list == &file.owner, |
| "Assign: file ownership mismatch. {:?}", |
| file.external_id() |
| ); |
| } |
| None => bail!( |
| "Assign: file name not exist in output_owners_map. {:?}", |
| fname |
| ), |
| }; |
| |
| ensure!( |
| self.output_map.get(fname).is_none(), |
| "Assign: file already assigned. {:?}", |
| fname |
| ); |
| |
| self.output_map.insert(fname.to_owned(), file.external_id()); |
| |
| if self.all_data_assigned() { |
| self.update_status(TaskStatus::DataAssigned); |
| } |
| Ok(()) |
| } |
| |
| fn update_status(&mut self, status: TaskStatus) { |
| self.status = status; |
| } |
| |
| fn all_data_assigned(&self) -> bool { |
| let input_args: HashSet<&String> = self.input_owners_map.keys().collect(); |
| let assiged_inputs: HashSet<&String> = self.input_map.keys().collect(); |
| if input_args != assiged_inputs { |
| return false; |
| } |
| |
| let output_args: HashSet<&String> = self.output_owners_map.keys().collect(); |
| let assiged_outputs: HashSet<&String> = self.output_map.keys().collect(); |
| if output_args != assiged_outputs { |
| return false; |
| } |
| |
| true |
| } |
| } |