| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Globalization; |
| using System.Linq; |
| using System.Text; |
| using Org.Apache.REEF.Common.Runtime.Evaluator.Task; |
| using Org.Apache.REEF.Driver.Context; |
| using Org.Apache.REEF.Driver.Evaluator; |
| using Org.Apache.REEF.Driver.Task; |
| using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; |
| using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; |
| using Org.Apache.REEF.Tang.Exceptions; |
| using Org.Apache.REEF.Tang.Interface; |
| using Org.Apache.REEF.Utilities.Attributes; |
| using Org.Apache.REEF.Utilities.Diagnostics; |
| using Org.Apache.REEF.Utilities.Logging; |
| |
| namespace Org.Apache.REEF.IMRU.OnREEF.Driver |
| { |
| /// <summary> |
| /// Manages Tasks, maintains task states and responsible for task submission |
| /// </summary> |
| [NotThreadSafe] |
| internal sealed class TaskManager |
| { |
| private static readonly Logger Logger = Logger.GetLogger(typeof(TaskManager)); |
| |
| /// <summary> |
| /// Error messages thrown in IMRU tasks when an exception happens |
| /// </summary> |
| internal const string TaskAppError = "TaskAppError"; |
| internal const string TaskSystemError = "TaskSystemError"; |
| internal const string TaskGroupCommunicationError = "TaskGroupCommunicationError"; |
| internal const string TaskEvaluatorError = "TaskEvaluatorError"; |
| |
| /// <summary> |
| /// Message sending from driver to evaluator to close a running task |
| /// </summary> |
| internal const string CloseTaskByDriver = "CloseTaskByDriver"; |
| |
| /// <summary> |
| /// Error message in Task exception to show the task received close event |
| /// </summary> |
| internal const string TaskKilledByDriver = "TaskKilledByDriver"; |
| |
| /// <summary> |
| /// This Dictionary contains task information. The key is the Id of the Task, the value is TaskInfo which contains |
| /// task state, task configuration, and active context that the task is running on. |
| /// </summary> |
| private readonly IDictionary<string, TaskInfo> _tasks = new Dictionary<string, TaskInfo>(); |
| |
| /// <summary> |
| /// This Dictionary keeps all the running tasks. The key is the Task Id and the value is IRunningTask. |
| /// After a task is running, it will be added to this collection. After the task is requested to close, |
| /// or fails, completed, it will be removed from this collection. |
| /// </summary> |
| private readonly IDictionary<string, IRunningTask> _runningTasks = new Dictionary<string, IRunningTask>(); |
| |
| /// <summary> |
| /// Total expected tasks |
| /// </summary> |
| private readonly int _totalExpectedTasks; |
| |
| /// <summary> |
| /// Master tasks Id is set in the IGroupCommDriver. It must be the same Id used in the TaskManager. |
| /// </summary> |
| private readonly string _masterTaskId; |
| |
| /// <summary> |
| /// Total number of Application error received from tasks |
| /// </summary> |
| private int _numberOfAppErrors = 0; |
| |
| /// <summary> |
| /// Total Task closing time span. It is used to calculate the average closing time. |
| /// </summary> |
| private TimeSpan _totalTaskClosingTimeSpan; |
| |
| /// <summary> |
| /// Total number of the tasks that is closed by driver and then completed. |
| /// </summary> |
| private int _totalNumberOfClosedTasksByDriver; |
| |
| /// <summary> |
| /// Indicate if master task is completed running properly |
| /// </summary> |
| private bool _masterTaskCompletedRunning = false; |
| |
| /// <summary> |
| /// Creates a TaskManager with specified total number of tasks and master task id. |
| /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null. |
| /// </summary> |
| /// <param name="numTasks"></param> |
| /// <param name="masterTaskId"></param> |
| internal TaskManager(int numTasks, string masterTaskId) |
| { |
| if (numTasks <= 0) |
| { |
| Exceptions.Throw(new IMRUSystemException("Number of expected tasks must be positive"), Logger); |
| } |
| |
| if (string.IsNullOrWhiteSpace(masterTaskId)) |
| { |
| Exceptions.Throw(new IMRUSystemException("masterTaskId cannot be null"), Logger); |
| } |
| |
| _totalExpectedTasks = numTasks; |
| _masterTaskId = masterTaskId; |
| } |
| |
| /// <summary> |
| /// Adds a Task to the task collection |
| /// Throws IMRUSystemException in the following cases: |
| /// taskId is already added |
| /// taskConfiguration is null |
| /// activeContext is null |
| /// trying to add extra tasks |
| /// No Master Task is added in the collection |
| /// </summary> |
| internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext) |
| { |
| if (taskId == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("The taskId is null."), Logger); |
| } |
| |
| if (_tasks.ContainsKey(taskId)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already exists.", taskId); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| if (taskConfiguration == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("The task configuration is null."), Logger); |
| } |
| |
| if (activeContext == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("The context is null."), Logger); |
| } |
| |
| if (NumberOfTasks >= _totalExpectedTasks) |
| { |
| string msg = string.Format("Trying to add an additional Task {0}, but the total expected Task number {1} has been reached.", taskId, _totalExpectedTasks); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| _tasks.Add(taskId, new TaskInfo(new TaskStateMachine(), taskConfiguration, activeContext)); |
| |
| if (NumberOfTasks == _totalExpectedTasks && !MasterTaskExists()) |
| { |
| Exceptions.Throw(new IMRUSystemException("There is no master task added."), Logger); |
| } |
| } |
| |
| /// <summary> |
| /// Returns the number of tasks in the task collection |
| /// </summary> |
| internal int NumberOfTasks |
| { |
| get { return _tasks.Count; } |
| } |
| |
| /// <summary> |
| /// This method is called when receiving IRunningTask event during task submitting. |
| /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning. |
| /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task. |
| /// </summary> |
| internal void RecordRunningTask(IRunningTask runningTask) |
| { |
| if (_runningTasks.ContainsKey(runningTask.Id)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] already running.", runningTask.Id); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| if (!_tasks.ContainsKey(runningTask.Id)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist.", runningTask.Id); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| _runningTasks.Add(runningTask.Id, runningTask); |
| UpdateState(runningTask.Id, TaskStateEvent.RunningTask); |
| } |
| |
| /// <summary> |
| /// This method is called at the beginning of the recovery. |
| /// Clears the task collection, running task collection and resets the number of application error. |
| /// </summary> |
| internal void Reset() |
| { |
| _tasks.Clear(); |
| _runningTasks.Clear(); |
| _numberOfAppErrors = 0; |
| } |
| |
| /// <summary> |
| /// This method is called when receiving ICompletedTask event during task running or system shutting down. |
| /// If it is master task and if the master task was running, mark _masterTaskCompletedRunning true. That indicates |
| /// master task has successfully completed, which means the system has got the result from master task. |
| /// Removes the task from running tasks if it was running |
| /// Changes the task state from RunningTask to CompletedTask if the task was running |
| /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state |
| /// </summary> |
| internal void RecordCompletedTask(ICompletedTask completedTask) |
| { |
| if (completedTask.Id.Equals(_masterTaskId)) |
| { |
| if (GetTaskInfo(completedTask.Id).TaskState.CurrentState.Equals(TaskState.TaskRunning)) |
| { |
| _masterTaskCompletedRunning = true; |
| } |
| } |
| _runningTasks.Remove(completedTask.Id); |
| UpdateState(completedTask.Id, TaskStateEvent.CompletedTask); |
| } |
| |
| /// <summary> |
| /// This method is called when receiving IFailedTask event during task submitting or running |
| /// Removes the task from running tasks if the task was running |
| /// Updates the task state to fail based on the error message in the failed task |
| /// </summary> |
| internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask) |
| { |
| //// Remove the task from running tasks if it exists there |
| _runningTasks.Remove(failedTask.Id); |
| UpdateState(failedTask.Id, GetTaskErrorEventByExceptionType(failedTask)); |
| } |
| |
| /// <summary> |
| /// This method is called when receiving IFailedTask event during system shutting down. |
| /// If the task failed because it receives the close command from driver, update the task state to TaskClosedByDriver. |
| /// Task could fail by communication error or any other application or system error during this time, as long as it is not |
| /// TaskFailedByEvaluatorFailure, update the task state based on the error received. |
| /// </summary> |
| internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask) |
| { |
| Logger.Log(Level.Info, "RecordFailedTaskDuringSystemShuttingDownState, exceptionType: {0}", GetTaskErrorEventByExceptionType(failedTask).ToString()); |
| |
| var taskState = GetTaskState(failedTask.Id); |
| if (taskState == StateMachine.TaskState.TaskWaitingForClose) |
| { |
| UpdateState(failedTask.Id, TaskStateEvent.ClosedTask); |
| } |
| else if (taskState != StateMachine.TaskState.TaskFailedByEvaluatorFailure) |
| { |
| UpdateState(failedTask.Id, GetTaskErrorEventByExceptionType(failedTask)); |
| } |
| } |
| |
| /// <summary> |
| /// This method is called when receiving an IFailedEvaluator event during TaskSubmitted, TaskRunning or system shutting down. |
| /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running. |
| /// Sets the task state to TaskFailedByEvaluatorFailure |
| /// </summary> |
| internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator) |
| { |
| if (failedEvaluator.FailedTask.IsPresent()) |
| { |
| var taskId = failedEvaluator.FailedTask.Value.Id; |
| var taskState = GetTaskState(taskId); |
| if (taskState == StateMachine.TaskState.TaskRunning) |
| { |
| if (!_runningTasks.ContainsKey(taskId)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, |
| "The task [{0}] doesn't exist in Running Tasks.", |
| taskId); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| _runningTasks.Remove(taskId); |
| } |
| |
| UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); |
| } |
| else |
| { |
| var taskId = FindTaskAssociatedWithTheEvalutor(failedEvaluator.Id); |
| var taskState = GetTaskState(taskId); |
| if (taskState == StateMachine.TaskState.TaskSubmitted) |
| { |
| UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Waiting for close task has no response in given time |
| /// Driver will kill the evaluator and move the task to TaskFailedByEvaluatorFailure state |
| /// </summary> |
| internal void RecordKillClosingTask(string taskId) |
| { |
| var taskInfo = GetTaskInfo(taskId); |
| if (!taskInfo.TaskState.CurrentState.Equals(TaskState.TaskWaitingForClose)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, |
| "The task [{0}] is in [{1}] state, expecting it is in TaskWaitingForClose state.", |
| taskId, taskInfo.TaskState.CurrentState); |
| Logger.Log(Level.Error, msg); |
| throw new IMRUSystemException(msg); |
| } |
| UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); |
| } |
| |
| /// <summary> |
| /// Find the task that is associated with the given evaluator |
| /// </summary> |
| /// <param name="evaluatorId"></param> |
| /// <returns></returns> |
| private string FindTaskAssociatedWithTheEvalutor(string evaluatorId) |
| { |
| return _tasks.Where(e => e.Value.ActiveContext.EvaluatorId.Equals(evaluatorId)).Select(e => e.Key).FirstOrDefault(); |
| } |
| |
| /// <summary> |
| /// Updates task state for a given taskId based on the task event |
| /// </summary> |
| private void UpdateState(string taskId, TaskStateEvent taskEvent) |
| { |
| var taskInfo = GetTaskInfo(taskId); |
| RecordingTime(taskId, taskInfo, taskEvent); |
| taskInfo.TaskState.MoveNext(taskEvent); |
| taskInfo.TimeStateUpdated = DateTime.Now; |
| } |
| |
| /// <summary> |
| /// Recording timing from one task state to another |
| /// The method can be extended to record the time for other task states |
| /// The log level should be changed to verb once we complete the testing |
| /// </summary> |
| private void RecordingTime(string taskId, TaskInfo taskInfo, TaskStateEvent taskEvent) |
| { |
| if (taskInfo.TaskState.CurrentState.Equals(TaskState.TaskWaitingForClose) && taskEvent.Equals(TaskStateEvent.CompletedTask)) |
| { |
| var timeSpan = DateTime.Now - taskInfo.TimeStateUpdated; |
| _totalNumberOfClosedTasksByDriver++; |
| _totalTaskClosingTimeSpan = _totalTaskClosingTimeSpan.Add(timeSpan); |
| Logger.Log(Level.Info, "RecordClosingTime for task id {0}, closing time: {1}, average closing time: {2}.", taskId, timeSpan.Milliseconds, _totalTaskClosingTimeSpan.Milliseconds/_totalNumberOfClosedTasksByDriver); |
| } |
| } |
| |
| /// <summary> |
| /// Get average closing time |
| /// </summary> |
| /// <returns></returns> |
| internal int AverageClosingTime() |
| { |
| if (_totalNumberOfClosedTasksByDriver != 0) |
| { |
| return _totalTaskClosingTimeSpan.Milliseconds/_totalNumberOfClosedTasksByDriver; |
| } |
| return 0; |
| } |
| |
| /// <summary> |
| /// Returns true if master task has completed and produced result |
| /// </summary> |
| internal bool IsMasterTaskCompletedRunning() |
| { |
| return _masterTaskCompletedRunning; |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are running. |
| /// </summary> |
| internal bool AreAllTasksRunning() |
| { |
| return AreAllTasksInState(StateMachine.TaskState.TaskRunning) && |
| _runningTasks.Count == _totalExpectedTasks; |
| } |
| |
| /// <summary> |
| /// When master task is completed, that means the system has got the result expected |
| /// regardless of other mapper tasks returned or not. |
| /// </summary> |
| internal bool IsJobDone() |
| { |
| return IsMasterTaskCompletedRunning(); |
| } |
| |
| /// <summary> |
| /// Finds all the tasks that are waiting for close and waiting time is timeout |
| /// </summary> |
| internal IList<KeyValuePair<string, TaskInfo>> TasksTimeoutInState(TaskState state, int timeoutMilliseconds) |
| { |
| return _tasks.Where(t => t.Value.TaskState.CurrentState.Equals(state) && Timeout(t.Value.TimeStateUpdated, timeoutMilliseconds)) |
| .ToList(); |
| } |
| |
| /// <summary> |
| /// Check if the given DateTime has passed the timeoutMilliseconds |
| /// </summary> |
| private static bool Timeout(DateTime time, int timeoutMilliseconds) |
| { |
| TimeSpan span = DateTime.Now - time; |
| return span.Milliseconds > timeoutMilliseconds; |
| } |
| |
| /// <summary> |
| /// This method is called when receiving either IFailedEvaluator or IFailedTask event |
| /// Driver tries to close all the running tasks and clean the running task collection in the end. |
| /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1 |
| /// If this happens before all the tasks are running, then the total number of running tasks should smaller than _totalExpectedTasks -1 |
| /// If this happens when no task is running, the total number of running tasks could be 0 |
| /// </summary> |
| internal void CloseAllRunningTasks(string closeMessage) |
| { |
| Logger.Log(Level.Verbose, "Closing [{0}] running tasks.", _runningTasks.Count); |
| foreach (var runningTask in _runningTasks.Values) |
| { |
| runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage)); |
| UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose); |
| } |
| _runningTasks.Clear(); |
| } |
| |
| /// <summary> |
| /// This method is called when receiving an IRunningTask event but system is either in shutting down or fail. |
| /// In this case, the task should not be added in Running Tasks yet. |
| /// Change the task state to TaskRunning if it is still in TaskSubmitted state |
| /// Closes the IRunningTask |
| /// Then move the task state to WaitingTaskToClose |
| /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection |
| /// </summary> |
| internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage) |
| { |
| if (runningTask == null) |
| { |
| Exceptions.Throw(new IMRUSystemException("RunningTask is null."), Logger); |
| } |
| |
| if (_runningTasks.ContainsKey(runningTask.Id)) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] is already in running tasks.", runningTask.Id); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| UpdateState(runningTask.Id, TaskStateEvent.RunningTask); |
| runningTask.Dispose(Encoding.UTF8.GetBytes(closeMessage)); |
| UpdateState(runningTask.Id, TaskStateEvent.WaitingTaskToClose); |
| } |
| |
| /// <summary> |
| /// Gets error type (encoded as TaskStateEvent) based on the exception type in IFailedTask. |
| /// For unknown exceptions or exceptions that doesn't belong to defined IMRU task exceptions |
| /// treat then as application error. |
| /// </summary> |
| private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask) |
| { |
| var exception = failedTask.AsError(); |
| var innerExceptionType = exception.InnerException != null ? exception.InnerException.GetType().ToString() : "InnerException null"; |
| var innerExceptionMsg = exception.InnerException != null ? exception.InnerException.Message : "No InnerException"; |
| |
| |
| if (failedTask.GetActiveContext().IsPresent()) |
| { |
| Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}, evaluator id: {4}", |
| failedTask.Id, |
| exception.GetType(), |
| innerExceptionType, |
| innerExceptionMsg, |
| failedTask.GetActiveContext().Value.EvaluatorId); |
| } |
| else |
| { |
| Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}", |
| failedTask.Id, |
| exception.GetType(), |
| innerExceptionType, |
| innerExceptionMsg); |
| } |
| |
| if (exception is IMRUTaskAppException) |
| { |
| _numberOfAppErrors++; |
| return TaskStateEvent.FailedTaskAppError; |
| } |
| if (exception is IMRUTaskGroupCommunicationException) |
| { |
| return TaskStateEvent.FailedTaskCommunicationError; |
| } |
| if (exception is IMRUTaskSystemException) |
| { |
| return TaskStateEvent.FailedTaskSystemError; |
| } |
| |
| // special case for communication error during group communication initialization |
| if (exception is TaskClientCodeException) |
| { |
| // try extract cause and check whether it is InjectionException for GroupCommClient |
| if (exception.InnerException != null && |
| exception.InnerException is InjectionException && |
| exception.InnerException.Message.Contains("GroupCommClient")) |
| { |
| Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType:FailedTaskCommunicationError with task id {0}", failedTask.Id); |
| return TaskStateEvent.FailedTaskCommunicationError; |
| } |
| } |
| |
| Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType for un-hanlded exception with task id {0} and exception type {1}", failedTask.Id, exception.GetType()); |
| return TaskStateEvent.FailedTaskSystemError; |
| } |
| |
| /// <summary> |
| /// Returns the number of application error caused by FailedTask |
| /// </summary> |
| internal int NumberOfAppErrors() |
| { |
| return _numberOfAppErrors; |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are in final states |
| /// </summary> |
| internal bool AreAllTasksInFinalState() |
| { |
| var notInFinalState = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Take(5).ToList(); |
| var count = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Count(); |
| |
| if (notInFinalState.Any()) |
| { |
| Logger.Log(Level.Info, "Total tasks that are not in final state: {0}, and first 5 are:\r\n {1}", count, string.Join("\r\n", notInFinalState.Select(ToLog))); |
| } |
| else |
| { |
| Logger.Log(Level.Info, "All the tasks are in final state"); |
| } |
| |
| return !notInFinalState.Any(); |
| } |
| |
| private string ToLog(KeyValuePair<string, TaskInfo> t) |
| { |
| try |
| { |
| return string.Format("State={0}, taskId={1}, ContextId={2}, evaluatorId={3}, evaluatorHost={4}", |
| t.Value.TaskState.CurrentState, |
| t.Key, |
| t.Value.ActiveContext.Id, |
| t.Value.ActiveContext.EvaluatorId, |
| t.Value.ActiveContext.EvaluatorDescriptor.NodeDescriptor.HostName); |
| } |
| catch (Exception ex) |
| { |
| return string.Format("Failed to get task string: {0}", ex); |
| } |
| } |
| |
| /// <summary> |
| /// Gets current state of the task |
| /// </summary> |
| internal TaskState GetTaskState(string taskId) |
| { |
| var taskInfo = GetTaskInfo(taskId); |
| return taskInfo.TaskState.CurrentState; |
| } |
| |
| /// <summary> |
| /// Checks if all the tasks are in the state specified. |
| /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state |
| /// </summary> |
| internal bool AreAllTasksInState(TaskState taskState) |
| { |
| return _tasks.All(t => t.Value.TaskState.CurrentState == taskState); |
| } |
| |
| /// <summary> |
| /// Submit all the tasks |
| /// Tasks will be submitted after all the tasks are added in the collection and master task exists |
| /// IMRUSystemException will be thrown if not all the tasks are added or if there is no master task |
| /// </summary> |
| internal void SubmitTasks() |
| { |
| using (Logger.LogFunction("TaskManager::SubmitTasks")) |
| { |
| if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists()) |
| { |
| string msg = |
| string.Format( |
| "Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", |
| NumberOfTasks, |
| _totalExpectedTasks); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| SubmitTask(_masterTaskId); |
| |
| foreach (var taskId in _tasks.Keys) |
| { |
| if (taskId.Equals(_masterTaskId)) |
| { |
| continue; |
| } |
| SubmitTask(taskId); |
| } |
| } |
| } |
| |
| private void SubmitTask(string taskId) |
| { |
| Logger.Log(Level.Info, "SubmitTask with task id: {0}.", taskId); |
| var taskInfo = GetTaskInfo(taskId); |
| taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration); |
| UpdateState(taskId, TaskStateEvent.SubmittedTask); |
| } |
| |
| /// <summary> |
| /// Checks if master task has been added |
| /// </summary> |
| private bool MasterTaskExists() |
| { |
| return _tasks.ContainsKey(_masterTaskId); |
| } |
| |
| /// <summary> |
| /// Gets task Tuple based on the given taskId. |
| /// Throws IMRUSystemException if the task Tuple is not in the task collection. |
| /// </summary> |
| private TaskInfo GetTaskInfo(string taskId) |
| { |
| TaskInfo taskInfo; |
| _tasks.TryGetValue(taskId, out taskInfo); |
| if (taskInfo == null) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] does not exist in the task collection.", taskId); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| return taskInfo; |
| } |
| } |
| } |