| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System.Collections.Generic; |
| using System.Globalization; |
| using System.Linq; |
| using System.Text; |
| using Org.Apache.REEF.Driver.Context; |
| using Org.Apache.REEF.Driver.Evaluator; |
| using Org.Apache.REEF.Driver.Task; |
| using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; |
| using Org.Apache.REEF.Tang.Interface; |
| using Org.Apache.REEF.Utilities.Attributes; |
| using Org.Apache.REEF.Utilities.Diagnostics; |
| using Org.Apache.REEF.Utilities.Logging; |
| |
| namespace Org.Apache.REEF.IMRU.OnREEF.Driver |
| { |
| /// <summary> |
| /// Manages Tasks, maintains task states and responsible for task submission |
| /// </summary> |
| [NotThreadSafe] |
| internal sealed class TaskManager |
| { |
| private static readonly Logger Logger = Logger.GetLogger(typeof(TaskManager)); |
| |
| /// <summary> |
| /// Error messages thrown in IMRU tasks when an exception happens |
| /// </summary> |
| internal const string TaskAppError = "TaskAppError"; |
| internal const string TaskSystemError = "TaskSystemError"; |
| internal const string TaskGroupCommunicationError = "TaskGroupCommunicationError"; |
| internal const string TaskEvaluatorError = "TaskEvaluatorError"; |
| |
| /// <summary> |
| /// Message sending from driver to evaluator to close a running task |
| /// </summary> |
| internal const string CloseTaskByDriver = "CloseTaskByDriver"; |
| |
| /// <summary> |
| /// Error message in Task exception to show the task received close event |
| /// </summary> |
| internal const string TaskKilledByDriver = "TaskKilledByDriver"; |
| |
| /// <summary> |
| /// This Dictionary contains task information. The key is the Id of the Task, the value is TaskInfo which contains |
| /// task state, task configuration, and active context that the task is running on. |
| /// </summary> |
| private readonly IDictionary<string, TaskInfo> _tasks = new Dictionary<string, TaskInfo>(); |
| |
| /// <summary> |
| /// This Dictionary keeps all the running tasks. The key is the Task Id and the value is IRunningTask. |
| /// After a task is running, it will be added to this collection. After the task is requested to close, |
| /// or fails, completed, it will be removed from this collection. |
| /// </summary> |
| private readonly IDictionary<string, IRunningTask> _runningTasks = new Dictionary<string, IRunningTask>(); |
| |
| /// <summary> |
| /// Total expected tasks |
| /// </summary> |
| private readonly int _totalExpectedTasks; |
| |
| /// <summary> |
| /// Master tasks Id is set in the IGroupCommDriver. It must be the same Id used in the TaskManager. |
| /// </summary> |
| private readonly string _masterTaskId; |
| |
| /// <summary> |
| /// Total number of Application error received from tasks |
| /// </summary> |
| private int _numberOfAppErrors = 0; |
| |
| /// <summary> |
| /// Creates a TaskManager with specified total number of tasks and master task id. |
| /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null. |
| /// </summary> |
| /// <param name="numTasks"></param> |
| /// <param name="masterTaskId"></param> |
| internal TaskManager(int numTasks, string masterTaskId) |
| { |
| if (numTasks <= 0) |
| { |
| Exceptions.Throw(new IMRUSystemException("Number of expected tasks must be positive"), Logger); |
| } |
| |
| if (string.IsNullOrWhiteSpace(masterTaskId)) |
| { |
| Exceptions.Throw(new IMRUSystemException("masterTaskId cannot be null"), Logger); |
| } |
| |
| _totalExpectedTasks = numTasks; |
| _masterTaskId = masterTaskId; |
| } |
| |
| /// <summary> |
| /// Adds a Task to the task collection |
| /// Throws IMRUSystemException in the following cases: |
| /// taskId is already added |
| /// taskConfiguration is null |
| /// activeContext is null |
| /// trying to add extra tasks |
| /// No Master Task is added in the collection |
| /// </summary> |
| /// <param name="taskId"></param> |
| /// <param name="taskConfiguration"></param> |
| /// <param name="activeContext"></param> |
| internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext) |
| { |
| if (taskId == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("The taskId is null."), Logger); |
| } |
| |
| if (_tasks.ContainsKey(taskId)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already exists.", taskId); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| if (taskConfiguration == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("The task configuration is null."), Logger); |
| } |
| |
| if (activeContext == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("The context is null."), Logger); |
| } |
| |
| if (NumberOfTasks >= _totalExpectedTasks) |
| { |
| string msg = string.Format("Trying to add an additional Task {0}, but the total expected Task number {1} has been reached.", taskId, _totalExpectedTasks); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| _tasks.Add(taskId, new TaskInfo(new TaskStateMachine(), taskConfiguration, activeContext)); |
| |
| if (NumberOfTasks == _totalExpectedTasks && !MasterTaskExists()) |
| { |
| Exceptions.Throw(new IMRUSystemException("There is no master task added."), Logger); |
| } |
| } |
| |
| /// <summary> |
| /// Returns the number of tasks in the task collection |
| /// </summary> |
| internal int NumberOfTasks |
| { |
| get { return _tasks.Count; } |
| } |
| |
| /// <summary> |
| /// This method is called when receiving IRunningTask event during task submitting. |
| /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning. |
| /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task. |
| /// </summary> |
| /// <param name="runningTask"></param> |
| internal void RecordRunningTask(IRunningTask runningTask) |
| { |
| if (_runningTasks.ContainsKey(runningTask.Id)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already running.", runningTask.Id); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| if (!_tasks.ContainsKey(runningTask.Id)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist.", runningTask.Id); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| _runningTasks.Add(runningTask.Id, runningTask); |
| UpdateState(runningTask.Id, TaskStateEvent.RunningTask); |
| } |
| |
| /// <summary> |
| /// This method is called at the beginning of the recovery. |
| /// Clears the task collection, running task collection and resets the number of application error. |
| /// </summary> |
| internal void Reset() |
| { |
| _tasks.Clear(); |
| _runningTasks.Clear(); |
| _numberOfAppErrors = 0; |
| } |
| |
| /// <summary> |
| /// This method is called when receiving ICompletedTask event during task running or system shutting down. |
| /// Removes the task from running tasks |
| /// Changes the task state from RunningTask to CompletedTask |
| /// </summary> |
| /// <param name="completedTask"></param> |
| internal void RecordCompletedTask(ICompletedTask completedTask) |
| { |
| RemoveRunningTask(completedTask.Id); |
| UpdateState(completedTask.Id, TaskStateEvent.CompletedTask); |
| } |
| |
| /// <summary> |
| /// This method is called when receiving IFailedTask event during task submitting or running |
| /// Removes the task from running tasks if the task was running |
| /// Updates the task state to fail based on the error message in the failed task |
| /// </summary> |
| /// <param name="failedTask"></param> |
| internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask) |
| { |
| //// Remove the task from running tasks if it exists there |
| _runningTasks.Remove(failedTask.Id); |
| UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask)); |
| } |
| |
| /// <summary> |
| /// This method is called when receiving IFailedTask event during system shutting down. |
| /// If the task failed because it receives the close command from driver, update the task state to TaskClosedByDriver. |
| /// Task could fail by communication error or any other application or system error during this time, as long as it is not |
| /// TaskFailedByEvaluatorFailure, update the task state based on the error received. |
| /// </summary> |
| /// <param name="failedTask"></param> |
| internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask) |
| { |
| var taskState = GetTaskState(failedTask.Id); |
| if (taskState == StateMachine.TaskState.TaskWaitingForClose) |
| { |
| UpdateState(failedTask.Id, TaskStateEvent.ClosedTask); |
| } |
| else if (taskState != StateMachine.TaskState.TaskFailedByEvaluatorFailure) |
| { |
| UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask)); |
| } |
| } |
| |
| /// <summary> |
| /// This method is called when receiving an IFailedEvaluator event during TaskSubmitted, TaskRunning or system shutting down. |
| /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running. |
| /// Sets the task state to TaskFailedByEvaluatorFailure |
| /// </summary> |
| /// <param name="failedEvaluator"></param> |
| internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator) |
| { |
| if (failedEvaluator.FailedTask.IsPresent()) |
| { |
| var taskId = failedEvaluator.FailedTask.Value.Id; |
| var taskState = GetTaskState(taskId); |
| if (taskState == StateMachine.TaskState.TaskRunning) |
| { |
| RemoveRunningTask(taskId); |
| } |
| |
| UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); |
| } |
| } |
| |
| /// <summary> |
| /// Removes a task from running tasks if it exists in the running tasks collection |
| /// </summary> |
| /// <param name="taskId"></param> |
| private void RemoveRunningTask(string taskId) |
| { |
| if (!_runningTasks.ContainsKey(taskId)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| _runningTasks.Remove(taskId); |
| } |
| |
| /// <summary> |
| /// Updates task state for a given taskId based on the task event |
| /// </summary> |
| /// <param name="taskId"></param> |
| /// <param name="taskEvent"></param> |
| private void UpdateState(string taskId, TaskStateEvent taskEvent) |
| { |
| GetTaskInfo(taskId).TaskState.MoveNext(taskEvent); |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are running. |
| /// </summary> |
| /// <returns></returns> |
| internal bool AreAllTasksRunning() |
| { |
| return AreAllTasksInState(StateMachine.TaskState.TaskRunning) && |
| _runningTasks.Count == _totalExpectedTasks; |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are completed. |
| /// </summary> |
| /// <returns></returns> |
| internal bool AreAllTasksCompleted() |
| { |
| return AreAllTasksInState(StateMachine.TaskState.TaskCompleted) && _tasks.Count == _totalExpectedTasks && _runningTasks.Count == 0; |
| } |
| |
| /// <summary> |
| /// This method is called when receiving either IFailedEvaluator or IFailedTask event |
| /// Driver tries to close all the running tasks and clean the running task collection in the end. |
| /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1 |
| /// If this happens before all the tasks are running, then the total number of running tasks should smaller than _totalExpectedTasks -1 |
| /// If this happens when no task is running, the total number of running tasks could be 0 |
| /// </summary> |
| internal void CloseAllRunningTasks(string closeMessage) |
| { |
| Logger.Log(Level.Verbose, "Closing [{0}] running tasks.", _runningTasks.Count); |
| foreach (var runningTask in _runningTasks.Values) |
| { |
| runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage)); |
| UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose); |
| } |
| _runningTasks.Clear(); |
| } |
| |
| /// <summary> |
| /// This method is called when receiving an IRunningTask event but system is either in shutting down or fail. |
| /// In this case, the task should not be added in Running Tasks yet. |
| /// Change the task state to TaskRunning if it is still in TaskSubmitted state |
| /// Closes the IRunningTask |
| /// Then move the task state to WaitingTaskToClose |
| /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection |
| /// </summary> |
| /// <param name="runningTask"></param> |
| /// <param name="closeMessage"></param> |
| internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage) |
| { |
| if (runningTask == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("RunningTask is null."), Logger); |
| } |
| |
| if (_runningTasks.ContainsKey(runningTask.Id)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] is already in running tasks.", runningTask.Id); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| UpdateState(runningTask.Id, TaskStateEvent.RunningTask); |
| runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage)); |
| UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose); |
| } |
| |
| /// <summary> |
| /// Gets error type based on the information in IFailedTask |
| /// Currently we use the Message in IFailedTask to distinguish different types of errors |
| /// </summary> |
| /// <param name="failedTask"></param> |
| /// <returns></returns> |
| private TaskStateEvent GetTaskErrorEvent(IFailedTask failedTask) |
| { |
| switch (failedTask.Message) |
| { |
| case TaskAppError: |
| _numberOfAppErrors++; |
| return TaskStateEvent.FailedTaskAppError; |
| case TaskSystemError: |
| return TaskStateEvent.FailedTaskSystemError; |
| case TaskGroupCommunicationError: |
| return TaskStateEvent.FailedTaskCommunicationError; |
| default: |
| return TaskStateEvent.FailedTaskSystemError; |
| } |
| } |
| |
| /// <summary> |
| /// Returns the number of application error caused by FailedTask |
| /// </summary> |
| /// <returns></returns> |
| internal int NumberOfAppErrors() |
| { |
| return _numberOfAppErrors; |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are in final states |
| /// </summary> |
| /// <returns></returns> |
| internal bool AllInFinalState() |
| { |
| return _tasks.All(t => t.Value.TaskState.IsFinalState()); |
| } |
| |
| /// <summary> |
| /// Gets current state of the task |
| /// </summary> |
| /// <param name="taskId"></param> |
| /// <returns></returns> |
| internal TaskState GetTaskState(string taskId) |
| { |
| var taskInfo = GetTaskInfo(taskId); |
| return taskInfo.TaskState.CurrentState; |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are in the state specified. |
| /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state |
| /// </summary> |
| /// <param name="taskState"></param> |
| /// <returns></returns> |
| internal bool AreAllTasksInState(TaskState taskState) |
| { |
| return _tasks.All(t => t.Value.TaskState.CurrentState == taskState); |
| } |
| |
| /// <summary> |
| /// Submit all the tasks |
| /// Tasks will be submitted after all the tasks are added in the collection and master task exists |
| /// IMRUSystemException will be thrown if not all the tasks are added or if there is no master task |
| /// </summary> |
| internal void SubmitTasks() |
| { |
| if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists()) |
| { |
| string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| foreach (var taskId in _tasks.Keys) |
| { |
| var taskInfo = GetTaskInfo(taskId); |
| taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration); |
| UpdateState(taskId, TaskStateEvent.SubmittedTask); |
| } |
| } |
| |
| /// <summary> |
| /// Checks if master task has been added |
| /// </summary> |
| /// <returns></returns> |
| private bool MasterTaskExists() |
| { |
| return _tasks.ContainsKey(_masterTaskId); |
| } |
| |
| /// <summary> |
| /// Gets task Tuple based on the given taskId. |
| /// Throws IMRUSystemException if the task Tuple is not in the task collection. |
| /// </summary> |
| /// <param name="taskId"></param> |
| /// <returns></returns> |
| private TaskInfo GetTaskInfo(string taskId) |
| { |
| TaskInfo taskInfo; |
| _tasks.TryGetValue(taskId, out taskInfo); |
| if (taskInfo == null) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] does not exist in the task collection.", taskId); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| return taskInfo; |
| } |
| } |
| } |