| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| using System; |
| using System.Collections.Concurrent; |
| using System.Collections.Generic; |
| using System.Globalization; |
| using System.Linq; |
| using System.Timers; |
| using Org.Apache.REEF.Common.Tasks; |
| using Org.Apache.REEF.Driver; |
| using Org.Apache.REEF.Driver.Context; |
| using Org.Apache.REEF.Driver.Evaluator; |
| using Org.Apache.REEF.Driver.Task; |
| using Org.Apache.REEF.IMRU.API; |
| using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; |
| using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; |
| using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; |
| using Org.Apache.REEF.IMRU.OnREEF.Parameters; |
| using Org.Apache.REEF.IMRU.OnREEF.ResultHandler; |
| using Org.Apache.REEF.IO.PartitionedData; |
| using Org.Apache.REEF.Network.Group.Config; |
| using Org.Apache.REEF.Network.Group.Driver; |
| using Org.Apache.REEF.Network.Group.Pipelining; |
| using Org.Apache.REEF.Network.Group.Pipelining.Impl; |
| using Org.Apache.REEF.Network.Group.Topology; |
| using Org.Apache.REEF.Network.Naming; |
| using Org.Apache.REEF.Tang.Annotations; |
| using Org.Apache.REEF.Tang.Exceptions; |
| using Org.Apache.REEF.Tang.Implementations.Configuration; |
| using Org.Apache.REEF.Tang.Implementations.Tang; |
| using Org.Apache.REEF.Tang.Interface; |
| using Org.Apache.REEF.Tang.Util; |
| using Org.Apache.REEF.Utilities; |
| using Org.Apache.REEF.Utilities.Diagnostics; |
| using Org.Apache.REEF.Utilities.Logging; |
| |
| namespace Org.Apache.REEF.IMRU.OnREEF.Driver |
| { |
| /// <summary> |
| /// Implements the IMRU driver on REEF with fault tolerant |
| /// </summary> |
| /// <typeparam name="TMapInput">Map Input</typeparam> |
| /// <typeparam name="TMapOutput">Map output</typeparam> |
| /// <typeparam name="TResult">Result</typeparam> |
| /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> |
| internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> : |
| IObserver<IDriverStarted>, |
| IObserver<IAllocatedEvaluator>, |
| IObserver<IActiveContext>, |
| IObserver<ICompletedTask>, |
| IObserver<IFailedEvaluator>, |
| IObserver<IFailedContext>, |
| IObserver<IFailedTask>, |
| IObserver<IRunningTask>, |
| IObserver<IEnumerable<IActiveContext>>, |
| IObserver<IJobCancelled> |
| { |
| private static readonly Logger Logger = |
| Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>)); |
| |
| internal const string DoneActionPrefix = "DoneAction:"; |
| internal const string FailActionPrefix = "FailAction:"; |
| internal const string CompletedTaskMessage = "Received ICompletedTask"; |
| internal const string RunningTaskMessage = "Received IRunningTask"; |
| internal const string FailedTaskMessage = "Received IFailedTask"; |
| internal const string FailedEvaluatorMessage = "Received IFailedEvaluator"; |
| |
| private readonly ConfigurationManager _configurationManager; |
| private readonly int _totalMappers; |
| private readonly IGroupCommDriver _groupCommDriver; |
| private readonly INameServer _nameServer; |
| private ConcurrentStack<IConfiguration> _perMapperConfigurationStack; |
| private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs; |
| private readonly bool _invokeGC; |
| private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider; |
| private IJobCancelled _cancelEvent; |
| |
| /// <summary> |
| /// The lock for the driver. |
| /// </summary> |
| private readonly object _lock = new object(); |
| |
| /// <summary> |
| /// Multiply this fact on average closing time to give room for tasks to be closed by itself. |
| /// </summary> |
| private const int TaskWaitingForCloseTimeFactor = 3; |
| |
| /// <summary> |
| /// Manages Tasks, maintains task states and responsible for task submission for the driver. |
| /// </summary> |
| private TaskManager _taskManager; |
| |
| /// <summary> |
| /// Manages Active Contexts for the driver. |
| /// </summary> |
| private readonly ActiveContextManager _contextManager; |
| |
| /// <summary> |
| /// Manages allocated and failed Evaluators for driver. |
| /// </summary> |
| private readonly EvaluatorManager _evaluatorManager; |
| |
| /// <summary> |
| /// Defines the max retry number for recoveries. It is configurable for the driver. |
| /// </summary> |
| private readonly int _maxRetryNumberForFaultTolerant; |
| |
| /// <summary> |
| /// System State of the driver. |
| /// <see href="https://issues.apache.org/jira/browse/REEF-1223"></see> |
| /// </summary> |
| private SystemStateMachine _systemState; |
| |
| /// <summary> |
| /// Shows if the driver is first try. Once the system enters recovery, it is set to false. |
| /// </summary> |
| private bool _isFirstTry = true; |
| |
| /// <summary> |
| /// It records the number of retry for the recoveries. |
| /// </summary> |
| private int _numberOfRetries; |
| |
| /// <summary> |
| /// Minimum timeout in milliseconds for TaskWaitingForClose |
| /// </summary> |
| private readonly int _minTaskWaitingForCloseTimeout; |
| |
| /// <summary> |
| /// Manages lifecycle events for driver, like JobCancelled event. |
| /// </summary> |
| private readonly List<IDisposable> _disposableResources = new List<IDisposable>(); |
| |
| /// <summary> |
| /// An internal timer that monitors the timeout for driver events |
| /// </summary> |
| private Timer _timeoutMonitorTimer; |
| |
| /// <summary> |
| /// Record evaluator ids that are closed after timeout. |
| /// The CompletedTask and failedEvaluator events from those tasks should be ignored to avoid double counted. |
| /// </summary> |
| private readonly IList<string> _evaluatorsForceClosed = new List<string>(); |
| |
| [Inject] |
| private IMRUDriver(IPartitionedInputDataSet dataSet, |
| [Parameter(typeof(PerMapConfigGeneratorSet))] ISet<IPerMapperConfigGenerator> perMapperConfigs, |
| ConfigurationManager configurationManager, |
| IEvaluatorRequestor evaluatorRequestor, |
| [Parameter(typeof(CoresPerMapper))] int coresPerMapper, |
| [Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask, |
| [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper, |
| [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask, |
| [Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction, |
| [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery, |
| [Parameter(typeof(MinTaskWaitingForCloseTimeout))] int minTaskWaitingForCloseTimeout, |
| [Parameter(typeof(TimeoutMonitoringInterval))] int timeoutMonitoringInterval, |
| [Parameter(typeof(InvokeGC))] bool invokeGC, |
| IGroupCommDriver groupCommDriver, |
| INameServer nameServer, |
| IJobLifecycleManager lifecycleManager) |
| { |
| _configurationManager = configurationManager; |
| _groupCommDriver = groupCommDriver; |
| _nameServer = nameServer; |
| _perMapperConfigs = perMapperConfigs; |
| _totalMappers = dataSet.Count; |
| _invokeGC = invokeGC; |
| _maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery; |
| _minTaskWaitingForCloseTimeout = minTaskWaitingForCloseTimeout; |
| |
| _contextManager = new ActiveContextManager(_totalMappers + 1); |
| _contextManager.Subscribe(this); |
| |
| var updateSpec = new EvaluatorSpecification(memoryForUpdateTask, coresForUpdateTask); |
| var mapperSpec = new EvaluatorSpecification(memoryPerMapper, coresPerMapper); |
| var allowedFailedEvaluators = (int)(failedEvaluatorsFraction * _totalMappers); |
| _evaluatorManager = new EvaluatorManager(_totalMappers + 1, allowedFailedEvaluators, evaluatorRequestor, updateSpec, mapperSpec); |
| |
| _systemState = new SystemStateMachine(); |
| _serviceAndContextConfigurationProvider = |
| new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet, configurationManager); |
| |
| if (lifecycleManager != null) |
| { |
| var handle = lifecycleManager.Subscribe(this as IObserver<IJobCancelled>); |
| _disposableResources.Add(handle); |
| } |
| |
| _timeoutMonitorTimer = new Timer(); |
| _timeoutMonitorTimer.Elapsed += TimeoutMonitor; |
| _timeoutMonitorTimer.Interval = timeoutMonitoringInterval; |
| if (timeoutMonitoringInterval > 0) |
| { |
| _timeoutMonitorTimer.Enabled = true; |
| } |
| |
| var msg = |
| string.Format(CultureInfo.InvariantCulture, "map task memory: {0}, update task memory: {1}, map task cores: {2}, update task cores: {3}, maxRetry: {4}, allowedFailedEvaluators: {5}, minTaskWaitingForCloseTimeout: {6}, timeoutMonitoringInterval: {7}.", |
| memoryPerMapper, |
| memoryForUpdateTask, |
| coresPerMapper, |
| coresForUpdateTask, |
| _maxRetryNumberForFaultTolerant, |
| allowedFailedEvaluators, |
| minTaskWaitingForCloseTimeout, |
| timeoutMonitoringInterval); |
| Logger.Log(Level.Info, msg); |
| } |
| |
| #region IDriverStarted |
| /// <summary> |
| /// Requests evaluators when driver starts |
| /// </summary> |
| /// <param name="value">Event fired when driver started</param> |
| public void OnNext(IDriverStarted value) |
| { |
| //// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver. |
| _evaluatorManager.RequestUpdateEvaluator(); |
| } |
| #endregion IDriverStarted |
| |
| #region IAllocatedEvaluator |
| /// <summary> |
| /// IAllocatedEvaluator handler. It will take the following action based on the system state: |
| /// Case WaitingForEvaluator |
| /// Add Evaluator to the Evaluator Manager |
| /// submit Context and Services |
| /// Case Fail |
| /// Do nothing. This is because the code that sets system Fail has executed FailedAction. It has shut down all the allocated evaluators/contexts. |
| /// If a new IAllocatedEvaluator comes after it, we should not submit anything so that the evaluator is returned. |
| /// Other cases - not expected |
| /// </summary> |
| /// <param name="allocatedEvaluator">The allocated evaluator</param> |
| public void OnNext(IAllocatedEvaluator allocatedEvaluator) |
| { |
| Logger.Log(Level.Info, "AllocatedEvaluator memory [{0}], systemState {1}.", allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState); |
| lock (_lock) |
| { |
| using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator")) |
| { |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.WaitingForEvaluator: |
| if (!_evaluatorManager.IsMasterEvaluatorAllocated()) |
| { |
| _evaluatorManager.AddMasterEvaluator(allocatedEvaluator); |
| _evaluatorManager.RequestMapEvaluators(_totalMappers); |
| } |
| else |
| { |
| _evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator); |
| } |
| SubmitContextAndService(allocatedEvaluator); |
| break; |
| case SystemState.Fail: |
| Logger.Log(Level.Info, |
| "Receiving IAllocatedEvaluator event, but system is in FAIL state, ignore it."); |
| allocatedEvaluator.Dispose(); |
| break; |
| default: |
| UnexpectedState(allocatedEvaluator.Id, "IAllocatedEvaluator"); |
| break; |
| } |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Gets context and service configuration for evaluator depending |
| /// on whether it is for update/master function or for mapper function. |
| /// Then submits Context and Service with the corresponding configuration |
| /// </summary> |
| /// <param name="allocatedEvaluator"></param> |
| private void SubmitContextAndService(IAllocatedEvaluator allocatedEvaluator) |
| { |
| ContextAndServiceConfiguration configs; |
| if (_evaluatorManager.IsMasterEvaluatorId(allocatedEvaluator.Id)) |
| { |
| configs = |
| _serviceAndContextConfigurationProvider |
| .GetContextConfigurationForMasterEvaluatorById( |
| allocatedEvaluator.Id); |
| } |
| else |
| { |
| configs = _serviceAndContextConfigurationProvider |
| .GetDataLoadingConfigurationForEvaluatorById( |
| allocatedEvaluator.Id); |
| } |
| allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service); |
| } |
| #endregion IAllocatedEvaluator |
| |
| #region IActiveContext |
| /// <summary> |
| /// IActiveContext handler. It will take the following actions based on the system state: |
| /// Case WaitingForEvaluator: |
| /// Adds Active Context to Active Context Manager |
| /// Case Fail: |
| /// Closes the ActiveContext |
| /// Other cases - not expected |
| /// </summary> |
| /// <param name="activeContext"></param> |
| public void OnNext(IActiveContext activeContext) |
| { |
| Logger.Log(Level.Info, "Received Active Context {0}, systemState {1}.", activeContext.Id, _systemState.CurrentState); |
| lock (_lock) |
| { |
| using (Logger.LogFunction("IMRUDriver::IActiveContext")) |
| { |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.WaitingForEvaluator: |
| _contextManager.Add(activeContext); |
| break; |
| case SystemState.Fail: |
| Logger.Log(Level.Info, |
| "Received IActiveContext event, but system is in FAIL state. Closing the context."); |
| activeContext.Dispose(); |
| break; |
| default: |
| UnexpectedState(activeContext.Id, "IActiveContext"); |
| break; |
| } |
| } |
| } |
| } |
| #endregion IActiveContext |
| |
| #region submit tasks |
| /// <summary> |
| /// Called from ActiveContextManager when all the expected active context are received. |
| /// It changes the system state then calls SubmitTasks(). |
| /// </summary> |
| /// <param name="value"></param> |
| public void OnNext(IEnumerable<IActiveContext> value) |
| { |
| Logger.Log(Level.Info, "Received event from ActiveContextManager with NumberOfActiveContexts:" + (value != null ? value.Count() : 0)); |
| lock (_lock) |
| { |
| // When the event AllContextsAreReady happens, change the system state from WaitingForEvaluator to SubmittingTasks |
| _systemState.MoveNext(SystemStateEvent.AllContextsAreReady); |
| SubmitTasks(value); |
| } |
| } |
| |
| /// <summary> |
| /// This method is responsible to prepare for the task submission then call SubmitTasks in TaskManager. |
| /// It is called in both first time and recovery scenarios. |
| /// Creates a new Communication Group and adds Group Communication Operators |
| /// For each context, adds a task to the communication group. |
| /// After all the tasks are added to the group, for each task, gets GroupCommTaskConfiguration from IGroupCommDriver |
| /// and merges it with the task configuration. |
| /// When all the tasks are added, calls TaskManager to SubmitTasks(). |
| /// </summary> |
| private void SubmitTasks(IEnumerable<IActiveContext> activeContexts) |
| { |
| Logger.Log(Level.Info, "SubmitTasks with system state : {0} at time: {1}.", _systemState.CurrentState, DateTime.Now); |
| using (Logger.LogFunction("IMRUDriver::SubmitTasksConfiguration")) |
| { |
| if (!_isFirstTry) |
| { |
| _groupCommDriver.RemoveCommunicationGroup(IMRUConstants.CommunicationGroupName); |
| } |
| |
| UpdateMaterTaskId(); |
| _taskManager = new TaskManager(_totalMappers + 1, _groupCommDriver.MasterTaskId); |
| var commGroup = AddCommunicationGroupWithOperators(); |
| _perMapperConfigurationStack = ConstructPerMapperConfigStack(_totalMappers); |
| |
| var taskIdAndContextMapping = new Dictionary<string, IActiveContext>(); |
| foreach (var activeContext in activeContexts) |
| { |
| var taskId = _evaluatorManager.IsMasterEvaluatorId(activeContext.EvaluatorId) |
| ? _groupCommDriver.MasterTaskId |
| : GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId); |
| commGroup.AddTask(taskId); |
| taskIdAndContextMapping.Add(taskId, activeContext); |
| } |
| |
| foreach (var mapping in taskIdAndContextMapping) |
| { |
| var taskConfig = _evaluatorManager.IsMasterEvaluatorId(mapping.Value.EvaluatorId) |
| ? GetMasterTaskConfiguration(mapping.Key) |
| : GetMapperTaskConfiguration(mapping.Value, mapping.Key); |
| var groupCommTaskConfiguration = _groupCommDriver.GetGroupCommTaskConfiguration(mapping.Key); |
| var mergedTaskConf = Configurations.Merge(taskConfig, groupCommTaskConfiguration); |
| _taskManager.AddTask(mapping.Key, mergedTaskConf, mapping.Value); |
| } |
| } |
| _taskManager.SubmitTasks(); |
| } |
| |
| private void UpdateMaterTaskId() |
| { |
| if (_isFirstTry) |
| { |
| _groupCommDriver.MasterTaskId = _groupCommDriver.MasterTaskId + "-" + _numberOfRetries; |
| } |
| else |
| { |
| _groupCommDriver.MasterTaskId = |
| _groupCommDriver.MasterTaskId.Substring(0, _groupCommDriver.MasterTaskId.Length - 1) + |
| _numberOfRetries; |
| } |
| } |
| #endregion submit tasks |
| |
| #region IRunningTask |
| /// <summary> |
| /// IRunningTask handler. The method is called when a task is running. The following action will be taken based on the system state: |
| /// Case SubmittingTasks |
| /// Add it to RunningTasks and set task state to TaskRunning |
| /// When all the tasks are running, change system state to TasksRunning |
| /// Case ShuttingDown/Fail |
| /// Call TaskManager to record RunningTask during SystemFailure |
| /// Other cases - not expected |
| /// </summary> |
| /// <param name="runningTask"></param> |
| public void OnNext(IRunningTask runningTask) |
| { |
| Logger.Log(Level.Info, "{0} {1} from endpoint {2} at SystemState {3} retry # {4}.", RunningTaskMessage, runningTask.Id, GetEndPointFromTaskId(runningTask.Id), _systemState.CurrentState, _numberOfRetries); |
| lock (_lock) |
| { |
| using (Logger.LogFunction("IMRUDriver::IRunningTask")) |
| { |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.SubmittingTasks: |
| _taskManager.RecordRunningTask(runningTask); |
| if (_taskManager.AreAllTasksRunning()) |
| { |
| _systemState.MoveNext(SystemStateEvent.AllTasksAreRunning); |
| Logger.Log(Level.Info, |
| "All tasks are running, SystemState {0}", |
| _systemState.CurrentState); |
| } |
| break; |
| case SystemState.ShuttingDown: |
| case SystemState.Fail: |
| _taskManager.RecordRunningTaskDuringSystemFailure(runningTask, TaskManager.CloseTaskByDriver); |
| break; |
| default: |
| UnexpectedState(runningTask.Id, "IRunningTask"); |
| break; |
| } |
| } |
| } |
| } |
| #endregion IRunningTask |
| |
| #region ICompletedTask |
| /// <summary> |
| /// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State: |
| /// Case TasksRunning |
| /// Check if it is master task, then set master task completed |
| /// Then record completed running and updates task state from TaskRunning to TaskCompleted |
| /// If all tasks are completed, sets system state to TasksCompleted and then go to Done action |
| /// Case TasksCompleted: |
| /// Record, log and then ignore the event |
| /// Case ShuttingDown |
| /// Record completed running and updates task state to TaskCompleted |
| /// Try to recover |
| /// Other cases - not expected |
| /// </summary> |
| /// <param name="completedTask">The link to the completed task</param> |
| public void OnNext(ICompletedTask completedTask) |
| { |
| Logger.Log(Level.Info, "{0} {1}, with systemState {2} in retry# {3}.", CompletedTaskMessage, completedTask.Id, _systemState.CurrentState, _numberOfRetries); |
| |
| lock (_lock) |
| { |
| if (_evaluatorsForceClosed.Contains(completedTask.ActiveContext.EvaluatorId)) |
| { |
| Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring ICompletedTask event.", completedTask.ActiveContext.EvaluatorId, completedTask.Id); |
| return; |
| } |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.TasksRunning: |
| _taskManager.RecordCompletedTask(completedTask); |
| if (_taskManager.IsJobDone()) |
| { |
| _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted); |
| Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState); |
| DoneAction(); |
| } |
| break; |
| |
| case SystemState.ShuttingDown: |
| // The task might be in running state or waiting for close, record the completed task |
| _taskManager.RecordCompletedTask(completedTask); |
| if (_taskManager.IsJobDone()) |
| { |
| _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted); |
| Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState); |
| DoneAction(); |
| } |
| else |
| { |
| TryRecovery(); |
| } |
| break; |
| |
| case SystemState.TasksCompleted: |
| _taskManager.RecordCompletedTask(completedTask); |
| break; |
| |
| default: |
| UnexpectedState(completedTask.Id, "ICompletedTask"); |
| break; |
| } |
| } |
| } |
| #endregion ICompletedTask |
| |
| #region IFailedEvaluator |
| /// <summary> |
| /// IFailedEvaluator handler. It specifies what to do when an evaluator fails. |
| /// Case WaitingForEvaluator |
| /// This happens in the middle of submitting contexts. We just need to remove the failed evaluator |
| /// from EvaluatorManager and remove associated active context, if any, from ActiveContextManager |
| /// then checks if the system is recoverable. If yes, request another Evaluator |
| /// If not recoverable, set system state to Fail then execute Fail action |
| /// Case SubmittingTasks/TasksRunning |
| /// This happens either in the middle of Task submitting or all the tasks are running |
| /// Changes the system state to ShuttingDown |
| /// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager |
| /// Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure |
| /// Closes all the other running tasks |
| /// Try to recover in case it is the last failure received |
| /// Case TasksCompleted: |
| /// Record, log and then ignore the failure. |
| /// Case ShuttingDown |
| /// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing. |
| /// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager |
| /// Removes associated task from running task if it was running, changes the task state to ClosedTask if it was waiting for close |
| /// otherwise changes the task state to FailedTaskEvaluatorError |
| /// Try to recover in case it is the last failure received |
| /// Other cases - not expected |
| /// </summary> |
| /// <param name="failedEvaluator"></param> |
| public void OnNext(IFailedEvaluator failedEvaluator) |
| { |
| var endpoint = failedEvaluator.FailedTask.IsPresent() |
| ? GetEndPoint(failedEvaluator.FailedTask.Value) |
| : failedEvaluator.FailedContexts.Any() |
| ? GetEndPointFromContext(failedEvaluator.FailedContexts.First()) |
| : "unknown_endpoint"; |
| |
| Logger.Log(Level.Warning, "{0} {1} from endpoint {2} with systemState {3} in retry# {4} with Exception: {5}.", FailedEvaluatorMessage, failedEvaluator.Id, endpoint, _systemState.CurrentState, _numberOfRetries, failedEvaluator.EvaluatorException); |
| |
| lock (_lock) |
| { |
| using (Logger.LogFunction("IMRUDriver::IFailedEvaluator")) |
| { |
| if (_evaluatorsForceClosed.Contains(failedEvaluator.Id)) |
| { |
| Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedEvaluator event.", failedEvaluator.Id, failedEvaluator.FailedTask.IsPresent() ? failedEvaluator.FailedTask.Value.Id : "NoTaskId"); |
| return; |
| } |
| |
| var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id); |
| _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id); |
| _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator); |
| |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.WaitingForEvaluator: |
| if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() && !isMaster) |
| { |
| _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( |
| failedEvaluator.Id); |
| Logger.Log(Level.Info, "Requesting mapper Evaluators."); |
| _evaluatorManager.RequestMapEvaluators(1); |
| } |
| else |
| { |
| var reason1 = _evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() |
| ? "it exceeded MaximumNumberOfEvaluatorFailures, " |
| : string.Empty; |
| var reason2 = isMaster ? "master evaluator failed, " : string.Empty; |
| Logger.Log(Level.Error, "The system is not recoverable because " + reason1 + reason2 + " changing the system state to Fail."); |
| _systemState.MoveNext(SystemStateEvent.NotRecoverable); |
| FailAction(); |
| } |
| break; |
| |
| case SystemState.SubmittingTasks: |
| case SystemState.TasksRunning: |
| // When the event FailedNode happens, change the system state to ShuttingDown |
| _systemState.MoveNext(SystemStateEvent.FailedNode); |
| _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator); |
| _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); |
| |
| // Push evaluator id back to PartitionIdProvider if it is not master |
| if (!isMaster) |
| { |
| _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( |
| failedEvaluator.Id); |
| } |
| |
| TryRecovery(); |
| break; |
| |
| case SystemState.TasksCompleted: |
| _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator); |
| Logger.Log(Level.Info, "The Job has been completed. So ignoring the Evaluator {0} failure.", failedEvaluator.Id); |
| break; |
| |
| case SystemState.ShuttingDown: |
| _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator); |
| |
| // Push evaluator id back to PartitionIdProvider if it is not master |
| if (!isMaster) |
| { |
| _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( |
| failedEvaluator.Id); |
| } |
| TryRecovery(); |
| break; |
| |
| case SystemState.Fail: |
| FailAction(); |
| break; |
| |
| default: |
| UnexpectedState(failedEvaluator.Id, "IFailedEvaluator"); |
| break; |
| } |
| } |
| } |
| } |
| #endregion IFailedEvaluator |
| |
| #region IFailedContext |
| /// <summary> |
| /// IFailedContext handler. It specifies what to do if Failed Context is received. |
| /// If we get all completed tasks then ignore the failure otherwise throw exception |
| /// Fault tolerant would be similar to FailedEvaluator. |
| /// </summary> |
| /// <param name="failedContext"></param> |
| public void OnNext(IFailedContext failedContext) |
| { |
| Logger.Log(Level.Warning, "Received IFailedContext with Id: {0} from endpoint {1} with systemState {2} in retry#: {3}.", failedContext.Id, GetEndPointFromContext(failedContext), _systemState.CurrentState, _numberOfRetries); |
| lock (_lock) |
| { |
| using (Logger.LogFunction("IMRUDriver::IFailedContext")) |
| { |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.TasksCompleted: |
| Logger.Log(Level.Info, "The Job has been completed. So ignoring the Context {0} failure.", failedContext.Id); |
| break; |
| case SystemState.ShuttingDown: |
| case SystemState.Fail: |
| break; |
| default: |
| var msg = string.Format(CultureInfo.InvariantCulture, "Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId); |
| throw new NotImplementedException(msg); |
| } |
| } |
| } |
| } |
| #endregion IFailedContext |
| |
| #region IFailedTask |
| /// <summary> |
| /// Case SubmittingTasks/TasksRunning |
| /// This is the first failure received |
| /// Changes the system state to ShuttingDown |
| /// Record failed task in TaskManager |
| /// Closes all the other running tasks and set their state to TaskWaitingForClose |
| /// Try to recover |
| /// Case TasksCompleted: |
| /// Record, log and then ignore the failure. |
| /// Case ShuttingDown |
| /// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing. |
| /// Record failed task in TaskManager. |
| /// Try to recover |
| /// Other cases - not expected |
| /// </summary> |
| /// <param name="failedTask"></param> |
| public void OnNext(IFailedTask failedTask) |
| { |
| Logger.Log(Level.Warning, "{0}: {1} and message: {2} from endpoint {3} with systemState {4} in retry#: {5}.", FailedTaskMessage, failedTask.Id, failedTask.Message, GetEndPointFromContext(failedTask.GetActiveContext()), _systemState.CurrentState, _numberOfRetries); |
| lock (_lock) |
| { |
| using (Logger.LogFunction("IMRUDriver::IFailedTask")) |
| { |
| if (_evaluatorsForceClosed.Contains(failedTask.GetActiveContext().Value.EvaluatorId)) |
| { |
| Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedTask event..", failedTask.GetActiveContext().Value.EvaluatorId, failedTask.Id); |
| return; |
| } |
| switch (_systemState.CurrentState) |
| { |
| case SystemState.SubmittingTasks: |
| case SystemState.TasksRunning: |
| // When the event FailedNode happens, change the system state to ShuttingDown |
| _systemState.MoveNext(SystemStateEvent.FailedNode); |
| _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask); |
| _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); |
| TryRecovery(); |
| break; |
| |
| case SystemState.TasksCompleted: |
| _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask); |
| Logger.Log(Level.Info, "The Job has been completed. So ignoring the Task {0} failure.", failedTask.Id); |
| break; |
| |
| case SystemState.ShuttingDown: |
| _taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask); |
| TryRecovery(); |
| break; |
| |
| default: |
| UnexpectedState(failedTask.Id, "IFailedTask"); |
| break; |
| } |
| } |
| } |
| } |
| #endregion IFailedTask |
| |
| private void TimeoutMonitor(object source, ElapsedEventArgs e) |
| { |
| Logger.Log(Level.Info, "Entering TimeoutMonitor at {0}", DateTime.Now); |
| lock (_lock) |
| { |
| switch (_systemState.CurrentState) |
| { |
| // TODO: Handle time out if ActiveContexts are not received in timeout limit |
| case SystemState.WaitingForEvaluator: |
| break; |
| |
| // TODO: Handle time out if RunningTasks are not received in timeout limit |
| case SystemState.SubmittingTasks: |
| break; |
| |
| // TODO: Handle time out if CompletedTasks are not received in timeout limit |
| case SystemState.TasksRunning: |
| break; |
| |
| // Handle timeout for closing tasks |
| case SystemState.ShuttingDown: |
| Logger.Log(Level.Info, "_taskManager.AverageClosingTime {0}, _minTaskWaitingForCloseTimeout: {1}", _taskManager.AverageClosingTime(), _minTaskWaitingForCloseTimeout); |
| int taskClosingTimeout = Math.Max(_minTaskWaitingForCloseTimeout, _taskManager.AverageClosingTime() * TaskWaitingForCloseTimeFactor); |
| var waitingTasks = _taskManager.TasksTimeoutInState(TaskState.TaskWaitingForClose, taskClosingTimeout); |
| |
| if (waitingTasks.Any()) |
| { |
| Logger.Log(Level.Info, "There are {0} tasks that timed out", waitingTasks.Count); |
| WaitingForCloseTaskNoResponseAction(waitingTasks); |
| } |
| break; |
| |
| case SystemState.TasksCompleted: |
| break; |
| |
| case SystemState.Fail: |
| break; |
| } |
| } |
| } |
| |
| /// <summary> |
| /// For tasks that are in WaitingForCloseState and has no response in specified timeout |
| /// kill the evaluator and set the other states as if we received the FailedEvaluator |
| /// Then try recovery |
| /// </summary> |
| /// <param name="tasks"></param> |
| private void WaitingForCloseTaskNoResponseAction(IList<KeyValuePair<string, TaskInfo>> tasks) |
| { |
| foreach (var t in tasks) |
| { |
| string evaluatorId = t.Value.ActiveContext.EvaluatorId; |
| if (!_evaluatorsForceClosed.Contains(evaluatorId)) |
| { |
| _evaluatorsForceClosed.Add(evaluatorId); |
| Logger.Log(Level.Info, |
| "WaitingForCloseTask [{0}] has no response after timeout. Kill the evaluator: [{1}] and dispose the context: [{2}].", |
| t.Key, |
| evaluatorId, |
| t.Value.ActiveContext.Id); |
| |
| t.Value.ActiveContext.Dispose(); |
| var isMaster = _evaluatorManager.IsMasterEvaluatorId(evaluatorId); |
| _evaluatorManager.RecordFailedEvaluator(evaluatorId); |
| _contextManager.Remove(t.Value.ActiveContext.Id); |
| _taskManager.RecordKillClosingTask(t.Key); |
| |
| // Push evaluator id back to PartitionIdProvider if it is not master |
| if (!isMaster) |
| { |
| _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(evaluatorId); |
| } |
| } |
| } |
| TryRecovery(); |
| } |
| |
| public void OnNext(IJobCancelled value) |
| { |
| lock (_lock) |
| { |
| _cancelEvent = value; |
| _systemState.MoveNext(SystemStateEvent.NotRecoverable); |
| FailAction(); |
| } |
| } |
| |
| public void OnError(Exception error) |
| { |
| } |
| |
| public void OnCompleted() |
| { |
| } |
| |
| private void UnexpectedState(string id, string eventName) |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, |
| "Received {0} for [{1}], but system status is {2}.", |
| eventName, |
| id, |
| _systemState.CurrentState); |
| Exceptions.Throw(new IMRUSystemException(msg), Logger); |
| } |
| |
| /// <summary> |
| /// If all the tasks are in final state, if the system is recoverable, start recovery |
| /// else, change the system state to Fail then take Fail action |
| /// </summary> |
| private void TryRecovery() |
| { |
| if (_taskManager.AreAllTasksInFinalState()) |
| { |
| if (IsRecoverable()) |
| { |
| _isFirstTry = false; |
| RecoveryAction(); |
| } |
| else |
| { |
| Logger.Log(Level.Warning, "The system is not recoverable, change the state to Fail."); |
| _systemState.MoveNext(SystemStateEvent.NotRecoverable); |
| FailAction(); |
| } |
| } |
| } |
| |
| private string GetMapperTaskIdByEvaluatorId(string evaluatorId) |
| { |
| return string.Format("{0}-{1}-{2}", |
| IMRUConstants.MapTaskPrefix, |
| _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId), |
| _numberOfRetries); |
| } |
| |
| /// <summary> |
| /// This method is called when all the tasks are successfully completed. |
| /// </summary> |
| private void DoneAction() |
| { |
| Logger.Log(Level.Info, "Shutting down Evaluators!!!"); |
| ShutDownAllEvaluators(); |
| Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries); |
| DisposeResources(); |
| } |
| |
| /// <summary> |
| /// This method is called when there are failures and the system is not recoverable. |
| /// </summary> |
| private void FailAction() |
| { |
| ShutDownAllEvaluators(); |
| |
| var failMessage = _cancelEvent != null |
| ? string.Format(CultureInfo.InvariantCulture, |
| "{0} Job cancelled at {1}. cancellation message: {2}", |
| FailActionPrefix, _cancelEvent.Timestamp.ToString("u"), _cancelEvent.Message) |
| : string.Format(CultureInfo.InvariantCulture, |
| "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}, master evaluator failed is {3}.", |
| FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers(), _evaluatorManager.IsMasterEvaluatorFailed()); |
| |
| DisposeResources(); |
| Exceptions.Throw(new ApplicationException(failMessage), Logger); |
| } |
| |
| /// <summary> |
| /// Dispose resources |
| /// </summary> |
| private void DisposeResources() |
| { |
| lock (_disposableResources) |
| { |
| _disposableResources.ForEach(handle => |
| { |
| try |
| { |
| handle.Dispose(); |
| } |
| catch (Exception ex) |
| { |
| Logger.Log(Level.Error, "Failed to dispose a resource: {0}", ex); |
| } |
| }); |
| |
| _disposableResources.Clear(); |
| } |
| } |
| |
| /// <summary> |
| /// Shuts down evaluators |
| /// </summary> |
| private void ShutDownAllEvaluators() |
| { |
| foreach (var context in _contextManager.ActiveContexts) |
| { |
| Logger.Log(Level.Verbose, "Disposing active context: {0}", context.Id); |
| context.Dispose(); |
| } |
| } |
| |
| /// <summary> |
| /// This method is called for recovery. It resets Failed Evaluators and changes state to WaitingForEvaluator |
| /// If there is no failed mappers, meaning the recovery is caused by failed tasks, resubmit all the tasks. |
| /// Else, based on the number of failed evaluators, requests missing map evaluators |
| /// </summary> |
| private void RecoveryAction() |
| { |
| lock (_lock) |
| { |
| _numberOfRetries++; |
| var msg = string.Format(CultureInfo.InvariantCulture, |
| "Start recovery with _numberOfRetryForFaultTolerant {0}, NumberofFailedMappersToRequest {1}.", |
| _numberOfRetries, |
| _evaluatorManager.MappersToRequest()); |
| Logger.Log(Level.Info, msg); |
| |
| _systemState.MoveNext(SystemStateEvent.Recover); |
| |
| var mappersToRequest = _evaluatorManager.MappersToRequest(); |
| _evaluatorManager.ResetFailedEvaluators(); |
| |
| if (mappersToRequest == 0) |
| { |
| Logger.Log(Level.Info, "There is no failed Evaluator in this recovery but failed tasks."); |
| if (_contextManager.AreAllContextsReceived) |
| { |
| OnNext(_contextManager.ActiveContexts); |
| } |
| else |
| { |
| Exceptions.Throw(new IMRUSystemException("In recovery, there are no Failed evaluators but not all the contexts are received"), Logger); |
| } |
| } |
| else |
| { |
| Logger.Log(Level.Info, "Requesting {0} map Evaluators.", mappersToRequest); |
| _evaluatorManager.RequestMapEvaluators(mappersToRequest); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Checks if the system is recoverable. |
| /// </summary> |
| /// <returns></returns> |
| private bool IsRecoverable() |
| { |
| var msg = string.Format(CultureInfo.InvariantCulture, |
| "IsRecoverable: _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}, NumberOfAppErrors {2}, IsMasterEvaluatorFailed {3} AllowedNumberOfEvaluatorFailures {4}, _maxRetryNumberForFaultTolerant {5}.", |
| _numberOfRetries, |
| _evaluatorManager.NumberofFailedMappers(), |
| _taskManager.NumberOfAppErrors(), |
| _evaluatorManager.IsMasterEvaluatorFailed(), |
| _evaluatorManager.AllowedNumberOfEvaluatorFailures, |
| _maxRetryNumberForFaultTolerant); |
| Logger.Log(Level.Info, msg); |
| |
| return !_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() |
| && _taskManager.NumberOfAppErrors() == 0 |
| && !_evaluatorManager.IsMasterEvaluatorFailed() |
| && _numberOfRetries < _maxRetryNumberForFaultTolerant; |
| } |
| |
| /// <summary> |
| /// Generates map task configuration given the active context. |
| /// Merge configurations of all the inputs to the MapTaskHost. |
| /// </summary> |
| /// <param name="activeContext">Active context to which task needs to be submitted</param> |
| /// <param name="taskId">Task Id</param> |
| /// <returns>Map task configuration</returns> |
| private IConfiguration GetMapperTaskConfiguration(IActiveContext activeContext, string taskId) |
| { |
| IConfiguration mapSpecificConfig; |
| |
| if (!_perMapperConfigurationStack.TryPop(out mapSpecificConfig)) |
| { |
| Exceptions.Throw( |
| new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}", |
| activeContext.Id)), |
| Logger); |
| } |
| |
| return TangFactory.GetTang() |
| .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule |
| .Set(TaskConfiguration.Identifier, taskId) |
| .Set(TaskConfiguration.Task, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) |
| .Set(TaskConfiguration.OnClose, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) |
| .Build(), |
| _configurationManager.MapFunctionConfiguration, |
| mapSpecificConfig, |
| GetGroupCommConfiguration()) |
| .BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString()) |
| .Build(); |
| } |
| |
| /// <summary> |
| /// Generates the update task configuration. |
| /// Merge configurations of all the inputs to the UpdateTaskHost. |
| /// </summary> |
| /// <returns>Update task configuration</returns> |
| private IConfiguration GetMasterTaskConfiguration(string taskId) |
| { |
| var partialTaskConf = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule |
| .Set(TaskConfiguration.Identifier, |
| taskId) |
| .Set(TaskConfiguration.Task, |
| GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) |
| .Set(TaskConfiguration.OnClose, |
| GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) |
| .Build(), |
| _configurationManager.UpdateFunctionConfiguration, |
| _configurationManager.ResultHandlerConfiguration, |
| GetGroupCommConfiguration()) |
| .BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString()) |
| .Build(); |
| |
| // This piece of code basically checks if user has given any implementation |
| // of IIMRUResultHandler. If not then bind it to default implementation which |
| // does nothing. For interfaces with generic type we cannot assign default |
| // implementation. |
| try |
| { |
| TangFactory.GetTang() |
| .NewInjector(partialTaskConf) |
| .GetInstance<IIMRUResultHandler<TResult>>(); |
| } |
| catch (InjectionException) |
| { |
| partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf) |
| .BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class, |
| GenericType<DefaultResultHandler<TResult>>.Class) |
| .Build(); |
| Logger.Log(Level.Info, |
| "User has not given any way to handle IMRU result, defaulting to ignoring it"); |
| } |
| return partialTaskConf; |
| } |
| |
| /// <summary> |
| /// Creates the group communication configuration to be added to the tasks |
| /// </summary> |
| /// <returns>The group communication configuration</returns> |
| private IConfiguration GetGroupCommConfiguration() |
| { |
| var codecConfig = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder( |
| StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set( |
| StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec, |
| GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(), |
| StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(), |
| _configurationManager.UpdateFunctionCodecsConfiguration) |
| .Build(); |
| |
| return Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig); |
| } |
| |
| /// <summary> |
| /// Adds broadcast and reduce operators to the default communication group |
| /// </summary> |
| private ICommunicationGroupDriver AddCommunicationGroupWithOperators() |
| { |
| var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration; |
| var mapOutputPipelineDataConverterConfig = _configurationManager.MapOutputPipelineDataConverterConfiguration; |
| var mapInputPipelineDataConverterConfig = _configurationManager.MapInputPipelineDataConverterConfiguration; |
| |
| // TODO check the specific exception type |
| try |
| { |
| TangFactory.GetTang() |
| .NewInjector(mapInputPipelineDataConverterConfig) |
| .GetInstance<IPipelineDataConverter<TMapInput>>(); |
| |
| mapInputPipelineDataConverterConfig = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder(mapInputPipelineDataConverterConfig) |
| .BindImplementation( |
| GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class, |
| GenericType<MapInputwithControlMessagePipelineDataConverter<TMapInput>>.Class) |
| .Build(); |
| } |
| catch (Exception) |
| { |
| mapInputPipelineDataConverterConfig = TangFactory.GetTang() |
| .NewConfigurationBuilder() |
| .BindImplementation( |
| GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class, |
| GenericType<DefaultPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class) |
| .Build(); |
| } |
| |
| try |
| { |
| TangFactory.GetTang() |
| .NewInjector(mapOutputPipelineDataConverterConfig) |
| .GetInstance<IPipelineDataConverter<TMapOutput>>(); |
| } |
| catch (Exception) |
| { |
| mapOutputPipelineDataConverterConfig = |
| TangFactory.GetTang() |
| .NewConfigurationBuilder() |
| .BindImplementation(GenericType<IPipelineDataConverter<TMapOutput>>.Class, |
| GenericType<DefaultPipelineDataConverter<TMapOutput>>.Class) |
| .Build(); |
| } |
| |
| var commGroup = |
| _groupCommDriver.NewCommunicationGroup(IMRUConstants.CommunicationGroupName, _totalMappers + 1) |
| .AddBroadcast<MapInputWithControlMessage<TMapInput>>( |
| IMRUConstants.BroadcastOperatorName, |
| _groupCommDriver.MasterTaskId, |
| TopologyTypes.Tree, |
| mapInputPipelineDataConverterConfig) |
| .AddReduce<TMapOutput>( |
| IMRUConstants.ReduceOperatorName, |
| _groupCommDriver.MasterTaskId, |
| TopologyTypes.Tree, |
| reduceFunctionConfig, |
| mapOutputPipelineDataConverterConfig) |
| .Build(); |
| |
| return commGroup; |
| } |
| |
| /// <summary> |
| /// Construct the stack of map configuration which is specific to each mapper. If user does not |
| /// specify any then its empty configuration |
| /// </summary> |
| /// <param name="totalMappers">Total mappers</param> |
| /// <returns>Stack of configuration</returns> |
| private ConcurrentStack<IConfiguration> ConstructPerMapperConfigStack(int totalMappers) |
| { |
| var perMapperConfiguration = new ConcurrentStack<IConfiguration>(); |
| for (int i = 0; i < totalMappers; i++) |
| { |
| var emptyConfig = TangFactory.GetTang().NewConfigurationBuilder().Build(); |
| IConfiguration config = _perMapperConfigs.Aggregate(emptyConfig, |
| (current, configGenerator) => |
| Configurations.Merge(current, configGenerator.GetMapperConfiguration(i, totalMappers))); |
| perMapperConfiguration.Push(config); |
| } |
| return perMapperConfiguration; |
| } |
| |
| /// <summary> |
| /// look up endpoint for given id |
| /// </summary> |
| /// <param name="taskId">Registered identifier in name server></param> |
| /// <returns></returns> |
| private string GetEndPointFromTaskId(string taskId) |
| { |
| List<string> t = new List<string>(); |
| t.Add(taskId); |
| var ips = _nameServer.Lookup(t); |
| if (ips.Count > 0) |
| { |
| var ip = ips.FirstOrDefault(); |
| if (ip != null) |
| { |
| return ip.Endpoint.ToString(); |
| } |
| } |
| return null; |
| } |
| |
| private string GetEndPoint(IFailedTask failedTask) |
| { |
| return GetEndPointFromTaskId(failedTask.Id) ?? GetEndPointFromContext(failedTask.GetActiveContext()); |
| } |
| |
| private string GetEndPointFromContext(IFailedContext context) |
| { |
| if (context == null || context.EvaluatorDescriptor == null || context.EvaluatorDescriptor.NodeDescriptor == null) |
| { |
| return null; |
| } |
| return context.EvaluatorDescriptor.NodeDescriptor.HostName; |
| } |
| |
| private string GetEndPointFromContext(Optional<IActiveContext> context) |
| { |
| if (!context.IsPresent() || context.Value == null || context.Value.EvaluatorDescriptor == null || context.Value.EvaluatorDescriptor.NodeDescriptor == null) |
| { |
| return null; |
| } |
| return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName; |
| } |
| |
| /// <summary> |
| /// Ensure the Timer is disposed when the driver object is deleted |
| /// </summary> |
| ~IMRUDriver() |
| { |
| if (_timeoutMonitorTimer != null) |
| { |
| _timeoutMonitorTimer.Stop(); |
| _timeoutMonitorTimer.Dispose(); |
| _timeoutMonitorTimer = null; |
| } |
| } |
| } |
| } |