| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use crate::error::SchedulerServiceError; |
| |
| use std::collections::{HashMap, HashSet, VecDeque}; |
| use std::convert::TryInto; |
| use std::sync::Arc; |
| use std::time::{Duration, SystemTime}; |
| #[allow(unused_imports)] |
| use std::untrusted::time::SystemTimeEx; |
| use tokio::sync::Mutex; |
| |
| use anyhow::{anyhow, Result}; |
| use teaclave_proto::teaclave_common::{ExecutorCommand, ExecutorStatus}; |
| use teaclave_proto::teaclave_scheduler_service::*; |
| use teaclave_proto::teaclave_storage_service::*; |
| use teaclave_rpc::transport::{channel::Endpoint, Channel}; |
| use teaclave_rpc::{Request, Response}; |
| use teaclave_types::*; |
| use uuid::Uuid; |
| |
| const EXECUTOR_TIMEOUT_SECS: u64 = 30; |
| |
| #[derive(Clone)] |
| pub(crate) struct TeaclaveSchedulerService { |
| resources: Arc<Mutex<TeaclaveSchedulerResources>>, |
| } |
| |
| pub struct TeaclaveSchedulerResources { |
| storage_client: Arc<Mutex<TeaclaveStorageClient<Channel>>>, |
| // map executor_id to task_id |
| task_queue: VecDeque<StagedTask>, |
| executors_tasks: HashMap<Uuid, Uuid>, |
| executors_last_heartbeat: HashMap<Uuid, SystemTime>, |
| executors_status: HashMap<Uuid, ExecutorStatus>, |
| tasks_to_cancel: HashSet<Uuid>, |
| } |
| |
| pub struct TeaclaveSchedulerDeamon { |
| resources: Arc<Mutex<TeaclaveSchedulerResources>>, |
| } |
| |
| impl TeaclaveSchedulerDeamon { |
| pub async fn run(&self) -> Result<()> { |
| loop { |
| std::thread::sleep(std::time::Duration::from_secs(2)); |
| |
| let mut resources = self.resources.lock().await; |
| |
| let key = StagedTask::get_queue_key().as_bytes(); |
| |
| log::debug!("Pulling task/cancel queue"); |
| while let Ok(canceled_task) = resources.pull_cancel_queue().await { |
| resources.tasks_to_cancel.insert(canceled_task.task_id); |
| } |
| |
| while let Ok(staged_task) = resources.pull_staged_task::<StagedTask>(key).await { |
| log::debug!("deamon: Pulled staged task: {:?}", staged_task); |
| resources.task_queue.push_back(staged_task); |
| } |
| |
| let current_time = SystemTime::now(); |
| let mut to_remove = Vec::new(); |
| for (executor_id, last_heartbeat) in resources.executors_last_heartbeat.iter() { |
| if current_time |
| .duration_since(*last_heartbeat) |
| .unwrap_or_else(|_| Duration::from_secs(EXECUTOR_TIMEOUT_SECS + 1)) |
| > Duration::from_secs(EXECUTOR_TIMEOUT_SECS) |
| { |
| // executor lost |
| to_remove.push(*executor_id); |
| log::warn!("Executor {} lost", executor_id); |
| } |
| } |
| |
| for executor_id in to_remove { |
| resources.executors_last_heartbeat.remove(&executor_id); |
| resources.executors_status.remove(&executor_id); |
| if let Some(task_id) = resources.executors_tasks.remove(&executor_id) { |
| // report task faliure |
| let ts = resources.get_task_state(&task_id).await?; |
| if ts.is_ended() { |
| continue; |
| } |
| |
| log::warn!("Executor {} lost, canceling task {}", executor_id, task_id); |
| |
| let mut task: Task<Fail> = ts.try_into()?; |
| |
| log::debug!("Task failed because of Executor lost: Task {:?}", task); |
| // Only TaskStatus::Running/Staged is allowed here. |
| let result_err = |
| TaskResult::Err(TaskFailure::new("Runtime Error: Executor Timeout")); |
| |
| // Updating task result means we have finished execution |
| task.update_result(result_err)?; |
| |
| let ts = TaskState::from(task); |
| resources.put_into_db(&ts).await?; |
| } |
| } |
| } |
| } |
| } |
| |
| impl TeaclaveSchedulerService { |
| pub fn new(resources: &Arc<Mutex<TeaclaveSchedulerResources>>) -> Self { |
| Self { |
| resources: resources.clone(), |
| } |
| } |
| } |
| |
| impl TeaclaveSchedulerDeamon { |
| pub fn new(resources: &Arc<Mutex<TeaclaveSchedulerResources>>) -> Self { |
| Self { |
| resources: resources.clone(), |
| } |
| } |
| } |
| |
| impl TeaclaveSchedulerResources { |
| pub(crate) async fn new(storage_service_endpoint: Endpoint) -> Result<Self> { |
| let channel = storage_service_endpoint |
| .connect() |
| .await |
| .map_err(|e| anyhow!("Failed to connect to storage service.{:?}", e))?; |
| let storage_client = Arc::new(Mutex::new(TeaclaveStorageClient::new_with_builtin_config( |
| channel, |
| ))); |
| let task_queue = VecDeque::new(); |
| let executors_tasks = HashMap::new(); |
| let executors_status = HashMap::new(); |
| let tasks_to_cancel = HashSet::new(); |
| let executors_last_heartbeat = HashMap::new(); |
| |
| let resources = TeaclaveSchedulerResources { |
| storage_client, |
| task_queue, |
| executors_tasks, |
| executors_last_heartbeat, |
| executors_status, |
| tasks_to_cancel, |
| }; |
| |
| Ok(resources) |
| } |
| |
| async fn pull_staged_task<T: Storable>( |
| &self, |
| key: &[u8], |
| ) -> std::result::Result<T, SchedulerServiceError> { |
| let dequeue_request = DequeueRequest::new(key); |
| let dequeue_response = self |
| .storage_client |
| .clone() |
| .lock() |
| .await |
| .dequeue(dequeue_request) |
| .await |
| .map_err(|_| SchedulerServiceError::StorageError)? |
| .into_inner(); |
| T::from_slice(dequeue_response.value.as_slice()).map_err(SchedulerServiceError::Service) |
| } |
| |
| async fn pull_cancel_queue(&self) -> std::result::Result<TaskState, SchedulerServiceError> { |
| let dequeue_request = DequeueRequest::new(CANCEL_QUEUE_KEY.as_bytes()); |
| let dequeue_response = self |
| .storage_client |
| .clone() |
| .lock() |
| .await |
| .dequeue(dequeue_request) |
| .await |
| .map_err(|_| SchedulerServiceError::StorageError)? |
| .into_inner(); |
| TaskState::from_slice(dequeue_response.value.as_slice()) |
| .map_err(SchedulerServiceError::Service) |
| } |
| |
| async fn cancel_task(&self, task_id: Uuid) -> std::result::Result<(), SchedulerServiceError> { |
| let ts = self.get_task_state(&task_id).await?; |
| let mut task: Task<Cancel> = ts.try_into()?; |
| |
| // Only TaskStatus::Running/Staged is allowed here. |
| let result_err = TaskResult::Err(TaskFailure::new("Task Canceled by the user")); |
| |
| task.update_result(result_err)?; |
| |
| let ts = TaskState::from(task); |
| self.put_into_db(&ts).await?; |
| |
| Ok(()) |
| } |
| |
| async fn get_task_state(&self, task_id: &Uuid) -> Result<TaskState> { |
| let key = ExternalID::new(TaskState::key_prefix(), task_id.to_owned()); |
| self.get_from_db(&key).await |
| } |
| |
| async fn get_from_db<T: Storable>(&self, key: &ExternalID) -> Result<T> { |
| anyhow::ensure!(T::match_prefix(&key.prefix), "Key prefix doesn't match."); |
| let get_request = GetRequest::new(key.to_bytes()); |
| let storage = self.storage_client.clone(); |
| let mut storage = storage.lock().await; |
| |
| let response = storage.get(get_request).await?.into_inner(); |
| T::from_slice(response.value.as_slice()) |
| } |
| |
| async fn put_into_db(&self, item: &impl Storable) -> Result<()> { |
| let k = item.key(); |
| let v = item.to_vec()?; |
| let put_request = PutRequest::new(k.as_slice(), v.as_slice()); |
| let cli = self.storage_client.clone(); |
| let mut client = cli.lock().await; |
| |
| let _put_response = client.put(put_request).await?; |
| Ok(()) |
| } |
| } |
| |
| #[teaclave_rpc::async_trait] |
| impl TeaclaveScheduler for TeaclaveSchedulerService { |
| // Publisher |
| async fn publish_task( |
| &self, |
| request: Request<PublishTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| // XXX: Publisher is not implemented |
| |
| let mut resources = self.resources.lock().await; |
| |
| let staged_task = |
| StagedTask::from_slice(&request.get_ref().staged_task).map_err(tonic_error)?; |
| resources.task_queue.push_back(staged_task); |
| Ok(Response::new(())) |
| } |
| |
| // Subscriber |
| async fn subscribe( |
| &self, |
| _request: Request<()>, |
| ) -> TeaclaveServiceResponseResult<SubscribeResponse> { |
| // TODO: subscribe a specific topic |
| unimplemented!() |
| } |
| |
| async fn heartbeat( |
| &self, |
| request: Request<HeartbeatRequest>, |
| ) -> TeaclaveServiceResponseResult<HeartbeatResponse> { |
| let mut resources = self.resources.lock().await; |
| |
| let mut command = ExecutorCommand::NoAction; |
| |
| let executor_id = Uuid::parse_str(&request.get_ref().executor_id).map_err(tonic_error)?; |
| let status = request.get_ref().status.try_into().map_err(tonic_error)?; |
| |
| resources.executors_status.insert(executor_id, status); |
| |
| resources |
| .executors_last_heartbeat |
| .insert(executor_id, SystemTime::now()); |
| |
| // check if the executor need to be stopped |
| if let Some(task_id) = resources.executors_tasks.get(&executor_id) { |
| match status { |
| ExecutorStatus::Executing => { |
| if resources.tasks_to_cancel.contains(task_id) { |
| command = ExecutorCommand::Stop; |
| let task_id = task_id.to_owned(); |
| resources.tasks_to_cancel.remove(&task_id); |
| log::debug!( |
| "Sending stop command to executor {}, killing executor {} because of task cancelation", |
| executor_id, |
| task_id |
| ); |
| resources.cancel_task(task_id).await.map_err(tonic_error)?; |
| return Ok(Response::new(HeartbeatResponse::new(command))); |
| } |
| } |
| ExecutorStatus::Idle => { |
| resources.executors_tasks.remove(&executor_id); |
| } |
| } |
| } |
| |
| if !resources.task_queue.is_empty() { |
| command = ExecutorCommand::NewTask; |
| } |
| |
| let response = HeartbeatResponse::new(command); |
| Ok(Response::new(response)) |
| } |
| |
| async fn pull_task( |
| &self, |
| request: Request<PullTaskRequest>, |
| ) -> TeaclaveServiceResponseResult<PullTaskResponse> { |
| let request = request.get_ref(); |
| let mut resources = self.resources.lock().await; |
| match resources.task_queue.pop_front() { |
| Some(task) => match resources.tasks_to_cancel.take(&task.task_id) { |
| Some(task_id) => { |
| resources.cancel_task(task_id).await?; |
| Err(SchedulerServiceError::TaskCanceled.into()) |
| } |
| None => { |
| resources.executors_tasks.insert( |
| Uuid::parse_str(&request.executor_id).map_err(tonic_error)?, |
| task.task_id, |
| ); |
| Ok(Response::new(PullTaskResponse::new(task))) |
| } |
| }, |
| None => Err(SchedulerServiceError::TaskQueueEmpty.into()), |
| } |
| } |
| |
| async fn update_task_status( |
| &self, |
| request: Request<UpdateTaskStatusRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let resources = self.resources.lock().await; |
| |
| let task_id = Uuid::parse_str(&request.get_ref().task_id).map_err(tonic_error)?; |
| let ts = resources |
| .get_task_state(&task_id) |
| .await |
| .map_err(tonic_error)?; |
| let task: Task<Run> = ts.try_into().map_err(tonic_error)?; |
| |
| log::debug!("UpdateTaskStatus: Task {:?}", task); |
| // Only TaskStatus::Running is implicitly allowed here. |
| |
| let ts = TaskState::from(task); |
| resources.put_into_db(&ts).await.map_err(tonic_error)?; |
| Ok(Response::new(())) |
| } |
| |
| async fn update_task_result( |
| &self, |
| request: Request<UpdateTaskResultRequest>, |
| ) -> TeaclaveServiceResponseResult<()> { |
| let resources = self.resources.lock().await; |
| |
| let request = request.into_inner(); |
| let ts = resources |
| .get_task_state(&Uuid::parse_str(&request.task_id).map_err(tonic_error)?) |
| .await |
| .map_err(tonic_error)?; |
| let mut task: Task<Finish> = ts.try_into().map_err(tonic_error)?; |
| let task_result: TaskResult = request.result.try_into().map_err(tonic_error)?; |
| if let TaskResult::Ok(outputs) = task_result.clone() { |
| for (key, auth_tag) in outputs.tags_map.iter() { |
| let outfile = task |
| .update_output_cmac(key, auth_tag) |
| .map_err(tonic_error)?; |
| resources.put_into_db(outfile).await.map_err(tonic_error)?; |
| } |
| }; |
| |
| // Updating task result means we have finished execution |
| task.update_result(task_result).map_err(tonic_error)?; |
| log::debug!("UpdateTaskResult: Task {:?}", task); |
| |
| let ts = TaskState::from(task); |
| resources.put_into_db(&ts).await.map_err(tonic_error)?; |
| Ok(Response::new(())) |
| } |
| } |