blob: 308a079f5cdc7aa105897193ae366d9b649f396d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Timers;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.IMRU.API;
using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Pipelining;
using Org.Apache.REEF.Network.Group.Pipelining.Impl;
using Org.Apache.REEF.Network.Group.Topology;
using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Exceptions;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IMRU.OnREEF.Driver
{
/// <summary>
/// Implements the IMRU driver on REEF with fault tolerant
/// </summary>
/// <typeparam name="TMapInput">Map Input</typeparam>
/// <typeparam name="TMapOutput">Map output</typeparam>
/// <typeparam name="TResult">Result</typeparam>
/// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam>
internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> :
IObserver<IDriverStarted>,
IObserver<IAllocatedEvaluator>,
IObserver<IActiveContext>,
IObserver<ICompletedTask>,
IObserver<IFailedEvaluator>,
IObserver<IFailedContext>,
IObserver<IFailedTask>,
IObserver<IRunningTask>,
IObserver<IEnumerable<IActiveContext>>,
IObserver<IJobCancelled>
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>));
internal const string DoneActionPrefix = "DoneAction:";
internal const string FailActionPrefix = "FailAction:";
internal const string CompletedTaskMessage = "Received ICompletedTask";
internal const string RunningTaskMessage = "Received IRunningTask";
internal const string FailedTaskMessage = "Received IFailedTask";
internal const string FailedEvaluatorMessage = "Received IFailedEvaluator";
private readonly ConfigurationManager _configurationManager;
private readonly int _totalMappers;
private readonly IGroupCommDriver _groupCommDriver;
private readonly INameServer _nameServer;
private ConcurrentStack<IConfiguration> _perMapperConfigurationStack;
private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
private readonly bool _invokeGC;
private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider;
private IJobCancelled _cancelEvent;
/// <summary>
/// The lock for the driver.
/// </summary>
private readonly object _lock = new object();
/// <summary>
/// Multiply this fact on average closing time to give room for tasks to be closed by itself.
/// </summary>
private const int TaskWaitingForCloseTimeFactor = 3;
/// <summary>
/// Manages Tasks, maintains task states and responsible for task submission for the driver.
/// </summary>
private TaskManager _taskManager;
/// <summary>
/// Manages Active Contexts for the driver.
/// </summary>
private readonly ActiveContextManager _contextManager;
/// <summary>
/// Manages allocated and failed Evaluators for driver.
/// </summary>
private readonly EvaluatorManager _evaluatorManager;
/// <summary>
/// Defines the max retry number for recoveries. It is configurable for the driver.
/// </summary>
private readonly int _maxRetryNumberForFaultTolerant;
/// <summary>
/// System State of the driver.
/// <see href="https://issues.apache.org/jira/browse/REEF-1223"></see>
/// </summary>
private SystemStateMachine _systemState;
/// <summary>
/// Shows if the driver is first try. Once the system enters recovery, it is set to false.
/// </summary>
private bool _isFirstTry = true;
/// <summary>
/// It records the number of retry for the recoveries.
/// </summary>
private int _numberOfRetries;
/// <summary>
/// Minimum timeout in milliseconds for TaskWaitingForClose
/// </summary>
private readonly int _minTaskWaitingForCloseTimeout;
/// <summary>
/// Manages lifecycle events for driver, like JobCancelled event.
/// </summary>
private readonly List<IDisposable> _disposableResources = new List<IDisposable>();
/// <summary>
/// An internal timer that monitors the timeout for driver events
/// </summary>
private Timer _timeoutMonitorTimer;
/// <summary>
/// Record evaluator ids that are closed after timeout.
/// The CompletedTask and failedEvaluator events from those tasks should be ignored to avoid double counted.
/// </summary>
private readonly IList<string> _evaluatorsForceClosed = new List<string>();
[Inject]
private IMRUDriver(IPartitionedInputDataSet dataSet,
[Parameter(typeof(PerMapConfigGeneratorSet))] ISet<IPerMapperConfigGenerator> perMapperConfigs,
ConfigurationManager configurationManager,
IEvaluatorRequestor evaluatorRequestor,
[Parameter(typeof(CoresPerMapper))] int coresPerMapper,
[Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask,
[Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
[Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
[Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction,
[Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery,
[Parameter(typeof(MinTaskWaitingForCloseTimeout))] int minTaskWaitingForCloseTimeout,
[Parameter(typeof(TimeoutMonitoringInterval))] int timeoutMonitoringInterval,
[Parameter(typeof(InvokeGC))] bool invokeGC,
IGroupCommDriver groupCommDriver,
INameServer nameServer,
IJobLifecycleManager lifecycleManager)
{
_configurationManager = configurationManager;
_groupCommDriver = groupCommDriver;
_nameServer = nameServer;
_perMapperConfigs = perMapperConfigs;
_totalMappers = dataSet.Count;
_invokeGC = invokeGC;
_maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery;
_minTaskWaitingForCloseTimeout = minTaskWaitingForCloseTimeout;
_contextManager = new ActiveContextManager(_totalMappers + 1);
_contextManager.Subscribe(this);
var updateSpec = new EvaluatorSpecification(memoryForUpdateTask, coresForUpdateTask);
var mapperSpec = new EvaluatorSpecification(memoryPerMapper, coresPerMapper);
var allowedFailedEvaluators = (int)(failedEvaluatorsFraction * _totalMappers);
_evaluatorManager = new EvaluatorManager(_totalMappers + 1, allowedFailedEvaluators, evaluatorRequestor, updateSpec, mapperSpec);
_systemState = new SystemStateMachine();
_serviceAndContextConfigurationProvider =
new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet, configurationManager);
if (lifecycleManager != null)
{
var handle = lifecycleManager.Subscribe(this as IObserver<IJobCancelled>);
_disposableResources.Add(handle);
}
_timeoutMonitorTimer = new Timer();
_timeoutMonitorTimer.Elapsed += TimeoutMonitor;
_timeoutMonitorTimer.Interval = timeoutMonitoringInterval;
if (timeoutMonitoringInterval > 0)
{
_timeoutMonitorTimer.Enabled = true;
}
var msg =
string.Format(CultureInfo.InvariantCulture, "map task memory: {0}, update task memory: {1}, map task cores: {2}, update task cores: {3}, maxRetry: {4}, allowedFailedEvaluators: {5}, minTaskWaitingForCloseTimeout: {6}, timeoutMonitoringInterval: {7}.",
memoryPerMapper,
memoryForUpdateTask,
coresPerMapper,
coresForUpdateTask,
_maxRetryNumberForFaultTolerant,
allowedFailedEvaluators,
minTaskWaitingForCloseTimeout,
timeoutMonitoringInterval);
Logger.Log(Level.Info, msg);
}
#region IDriverStarted
/// <summary>
/// Requests evaluators when driver starts
/// </summary>
/// <param name="value">Event fired when driver started</param>
public void OnNext(IDriverStarted value)
{
//// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver.
_evaluatorManager.RequestUpdateEvaluator();
}
#endregion IDriverStarted
#region IAllocatedEvaluator
/// <summary>
/// IAllocatedEvaluator handler. It will take the following action based on the system state:
/// Case WaitingForEvaluator
/// Add Evaluator to the Evaluator Manager
/// submit Context and Services
/// Case Fail
/// Do nothing. This is because the code that sets system Fail has executed FailedAction. It has shut down all the allocated evaluators/contexts.
/// If a new IAllocatedEvaluator comes after it, we should not submit anything so that the evaluator is returned.
/// Other cases - not expected
/// </summary>
/// <param name="allocatedEvaluator">The allocated evaluator</param>
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
Logger.Log(Level.Info, "AllocatedEvaluator memory [{0}], systemState {1}.", allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState);
lock (_lock)
{
using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator"))
{
switch (_systemState.CurrentState)
{
case SystemState.WaitingForEvaluator:
if (!_evaluatorManager.IsMasterEvaluatorAllocated())
{
_evaluatorManager.AddMasterEvaluator(allocatedEvaluator);
_evaluatorManager.RequestMapEvaluators(_totalMappers);
}
else
{
_evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator);
}
SubmitContextAndService(allocatedEvaluator);
break;
case SystemState.Fail:
Logger.Log(Level.Info,
"Receiving IAllocatedEvaluator event, but system is in FAIL state, ignore it.");
allocatedEvaluator.Dispose();
break;
default:
UnexpectedState(allocatedEvaluator.Id, "IAllocatedEvaluator");
break;
}
}
}
}
/// <summary>
/// Gets context and service configuration for evaluator depending
/// on whether it is for update/master function or for mapper function.
/// Then submits Context and Service with the corresponding configuration
/// </summary>
/// <param name="allocatedEvaluator"></param>
private void SubmitContextAndService(IAllocatedEvaluator allocatedEvaluator)
{
ContextAndServiceConfiguration configs;
if (_evaluatorManager.IsMasterEvaluatorId(allocatedEvaluator.Id))
{
configs =
_serviceAndContextConfigurationProvider
.GetContextConfigurationForMasterEvaluatorById(
allocatedEvaluator.Id);
}
else
{
configs = _serviceAndContextConfigurationProvider
.GetDataLoadingConfigurationForEvaluatorById(
allocatedEvaluator.Id);
}
allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service);
}
#endregion IAllocatedEvaluator
#region IActiveContext
/// <summary>
/// IActiveContext handler. It will take the following actions based on the system state:
/// Case WaitingForEvaluator:
/// Adds Active Context to Active Context Manager
/// Case Fail:
/// Closes the ActiveContext
/// Other cases - not expected
/// </summary>
/// <param name="activeContext"></param>
public void OnNext(IActiveContext activeContext)
{
Logger.Log(Level.Info, "Received Active Context {0}, systemState {1}.", activeContext.Id, _systemState.CurrentState);
lock (_lock)
{
using (Logger.LogFunction("IMRUDriver::IActiveContext"))
{
switch (_systemState.CurrentState)
{
case SystemState.WaitingForEvaluator:
_contextManager.Add(activeContext);
break;
case SystemState.Fail:
Logger.Log(Level.Info,
"Received IActiveContext event, but system is in FAIL state. Closing the context.");
activeContext.Dispose();
break;
default:
UnexpectedState(activeContext.Id, "IActiveContext");
break;
}
}
}
}
#endregion IActiveContext
#region submit tasks
/// <summary>
/// Called from ActiveContextManager when all the expected active context are received.
/// It changes the system state then calls SubmitTasks().
/// </summary>
/// <param name="value"></param>
public void OnNext(IEnumerable<IActiveContext> value)
{
Logger.Log(Level.Info, "Received event from ActiveContextManager with NumberOfActiveContexts:" + (value != null ? value.Count() : 0));
lock (_lock)
{
// When the event AllContextsAreReady happens, change the system state from WaitingForEvaluator to SubmittingTasks
_systemState.MoveNext(SystemStateEvent.AllContextsAreReady);
SubmitTasks(value);
}
}
/// <summary>
/// This method is responsible to prepare for the task submission then call SubmitTasks in TaskManager.
/// It is called in both first time and recovery scenarios.
/// Creates a new Communication Group and adds Group Communication Operators
/// For each context, adds a task to the communication group.
/// After all the tasks are added to the group, for each task, gets GroupCommTaskConfiguration from IGroupCommDriver
/// and merges it with the task configuration.
/// When all the tasks are added, calls TaskManager to SubmitTasks().
/// </summary>
private void SubmitTasks(IEnumerable<IActiveContext> activeContexts)
{
Logger.Log(Level.Info, "SubmitTasks with system state : {0} at time: {1}.", _systemState.CurrentState, DateTime.Now);
using (Logger.LogFunction("IMRUDriver::SubmitTasksConfiguration"))
{
if (!_isFirstTry)
{
_groupCommDriver.RemoveCommunicationGroup(IMRUConstants.CommunicationGroupName);
}
UpdateMaterTaskId();
_taskManager = new TaskManager(_totalMappers + 1, _groupCommDriver.MasterTaskId);
var commGroup = AddCommunicationGroupWithOperators();
_perMapperConfigurationStack = ConstructPerMapperConfigStack(_totalMappers);
var taskIdAndContextMapping = new Dictionary<string, IActiveContext>();
foreach (var activeContext in activeContexts)
{
var taskId = _evaluatorManager.IsMasterEvaluatorId(activeContext.EvaluatorId)
? _groupCommDriver.MasterTaskId
: GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId);
commGroup.AddTask(taskId);
taskIdAndContextMapping.Add(taskId, activeContext);
}
foreach (var mapping in taskIdAndContextMapping)
{
var taskConfig = _evaluatorManager.IsMasterEvaluatorId(mapping.Value.EvaluatorId)
? GetMasterTaskConfiguration(mapping.Key)
: GetMapperTaskConfiguration(mapping.Value, mapping.Key);
var groupCommTaskConfiguration = _groupCommDriver.GetGroupCommTaskConfiguration(mapping.Key);
var mergedTaskConf = Configurations.Merge(taskConfig, groupCommTaskConfiguration);
_taskManager.AddTask(mapping.Key, mergedTaskConf, mapping.Value);
}
}
_taskManager.SubmitTasks();
}
private void UpdateMaterTaskId()
{
if (_isFirstTry)
{
_groupCommDriver.MasterTaskId = _groupCommDriver.MasterTaskId + "-" + _numberOfRetries;
}
else
{
_groupCommDriver.MasterTaskId =
_groupCommDriver.MasterTaskId.Substring(0, _groupCommDriver.MasterTaskId.Length - 1) +
_numberOfRetries;
}
}
#endregion submit tasks
#region IRunningTask
/// <summary>
/// IRunningTask handler. The method is called when a task is running. The following action will be taken based on the system state:
/// Case SubmittingTasks
/// Add it to RunningTasks and set task state to TaskRunning
/// When all the tasks are running, change system state to TasksRunning
/// Case ShuttingDown/Fail
/// Call TaskManager to record RunningTask during SystemFailure
/// Other cases - not expected
/// </summary>
/// <param name="runningTask"></param>
public void OnNext(IRunningTask runningTask)
{
Logger.Log(Level.Info, "{0} {1} from endpoint {2} at SystemState {3} retry # {4}.", RunningTaskMessage, runningTask.Id, GetEndPointFromTaskId(runningTask.Id), _systemState.CurrentState, _numberOfRetries);
lock (_lock)
{
using (Logger.LogFunction("IMRUDriver::IRunningTask"))
{
switch (_systemState.CurrentState)
{
case SystemState.SubmittingTasks:
_taskManager.RecordRunningTask(runningTask);
if (_taskManager.AreAllTasksRunning())
{
_systemState.MoveNext(SystemStateEvent.AllTasksAreRunning);
Logger.Log(Level.Info,
"All tasks are running, SystemState {0}",
_systemState.CurrentState);
}
break;
case SystemState.ShuttingDown:
case SystemState.Fail:
_taskManager.RecordRunningTaskDuringSystemFailure(runningTask, TaskManager.CloseTaskByDriver);
break;
default:
UnexpectedState(runningTask.Id, "IRunningTask");
break;
}
}
}
}
#endregion IRunningTask
#region ICompletedTask
/// <summary>
/// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State:
/// Case TasksRunning
/// Check if it is master task, then set master task completed
/// Then record completed running and updates task state from TaskRunning to TaskCompleted
/// If all tasks are completed, sets system state to TasksCompleted and then go to Done action
/// Case TasksCompleted:
/// Record, log and then ignore the event
/// Case ShuttingDown
/// Record completed running and updates task state to TaskCompleted
/// Try to recover
/// Other cases - not expected
/// </summary>
/// <param name="completedTask">The link to the completed task</param>
public void OnNext(ICompletedTask completedTask)
{
Logger.Log(Level.Info, "{0} {1}, with systemState {2} in retry# {3}.", CompletedTaskMessage, completedTask.Id, _systemState.CurrentState, _numberOfRetries);
lock (_lock)
{
if (_evaluatorsForceClosed.Contains(completedTask.ActiveContext.EvaluatorId))
{
Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring ICompletedTask event.", completedTask.ActiveContext.EvaluatorId, completedTask.Id);
return;
}
switch (_systemState.CurrentState)
{
case SystemState.TasksRunning:
_taskManager.RecordCompletedTask(completedTask);
if (_taskManager.IsJobDone())
{
_systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState);
DoneAction();
}
break;
case SystemState.ShuttingDown:
// The task might be in running state or waiting for close, record the completed task
_taskManager.RecordCompletedTask(completedTask);
if (_taskManager.IsJobDone())
{
_systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState);
DoneAction();
}
else
{
TryRecovery();
}
break;
case SystemState.TasksCompleted:
_taskManager.RecordCompletedTask(completedTask);
break;
default:
UnexpectedState(completedTask.Id, "ICompletedTask");
break;
}
}
}
#endregion ICompletedTask
#region IFailedEvaluator
/// <summary>
/// IFailedEvaluator handler. It specifies what to do when an evaluator fails.
/// Case WaitingForEvaluator
/// This happens in the middle of submitting contexts. We just need to remove the failed evaluator
/// from EvaluatorManager and remove associated active context, if any, from ActiveContextManager
/// then checks if the system is recoverable. If yes, request another Evaluator
/// If not recoverable, set system state to Fail then execute Fail action
/// Case SubmittingTasks/TasksRunning
/// This happens either in the middle of Task submitting or all the tasks are running
/// Changes the system state to ShuttingDown
/// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
/// Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure
/// Closes all the other running tasks
/// Try to recover in case it is the last failure received
/// Case TasksCompleted:
/// Record, log and then ignore the failure.
/// Case ShuttingDown
/// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
/// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
/// Removes associated task from running task if it was running, changes the task state to ClosedTask if it was waiting for close
/// otherwise changes the task state to FailedTaskEvaluatorError
/// Try to recover in case it is the last failure received
/// Other cases - not expected
/// </summary>
/// <param name="failedEvaluator"></param>
public void OnNext(IFailedEvaluator failedEvaluator)
{
var endpoint = failedEvaluator.FailedTask.IsPresent()
? GetEndPoint(failedEvaluator.FailedTask.Value)
: failedEvaluator.FailedContexts.Any()
? GetEndPointFromContext(failedEvaluator.FailedContexts.First())
: "unknown_endpoint";
Logger.Log(Level.Warning, "{0} {1} from endpoint {2} with systemState {3} in retry# {4} with Exception: {5}.", FailedEvaluatorMessage, failedEvaluator.Id, endpoint, _systemState.CurrentState, _numberOfRetries, failedEvaluator.EvaluatorException);
lock (_lock)
{
using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
{
if (_evaluatorsForceClosed.Contains(failedEvaluator.Id))
{
Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedEvaluator event.", failedEvaluator.Id, failedEvaluator.FailedTask.IsPresent() ? failedEvaluator.FailedTask.Value.Id : "NoTaskId");
return;
}
var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
_evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
_contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
switch (_systemState.CurrentState)
{
case SystemState.WaitingForEvaluator:
if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() && !isMaster)
{
_serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
failedEvaluator.Id);
Logger.Log(Level.Info, "Requesting mapper Evaluators.");
_evaluatorManager.RequestMapEvaluators(1);
}
else
{
var reason1 = _evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()
? "it exceeded MaximumNumberOfEvaluatorFailures, "
: string.Empty;
var reason2 = isMaster ? "master evaluator failed, " : string.Empty;
Logger.Log(Level.Error, "The system is not recoverable because " + reason1 + reason2 + " changing the system state to Fail.");
_systemState.MoveNext(SystemStateEvent.NotRecoverable);
FailAction();
}
break;
case SystemState.SubmittingTasks:
case SystemState.TasksRunning:
// When the event FailedNode happens, change the system state to ShuttingDown
_systemState.MoveNext(SystemStateEvent.FailedNode);
_taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
_taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
// Push evaluator id back to PartitionIdProvider if it is not master
if (!isMaster)
{
_serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
failedEvaluator.Id);
}
TryRecovery();
break;
case SystemState.TasksCompleted:
_taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
Logger.Log(Level.Info, "The Job has been completed. So ignoring the Evaluator {0} failure.", failedEvaluator.Id);
break;
case SystemState.ShuttingDown:
_taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
// Push evaluator id back to PartitionIdProvider if it is not master
if (!isMaster)
{
_serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
failedEvaluator.Id);
}
TryRecovery();
break;
case SystemState.Fail:
FailAction();
break;
default:
UnexpectedState(failedEvaluator.Id, "IFailedEvaluator");
break;
}
}
}
}
#endregion IFailedEvaluator
#region IFailedContext
/// <summary>
/// IFailedContext handler. It specifies what to do if Failed Context is received.
/// If we get all completed tasks then ignore the failure otherwise throw exception
/// Fault tolerant would be similar to FailedEvaluator.
/// </summary>
/// <param name="failedContext"></param>
public void OnNext(IFailedContext failedContext)
{
Logger.Log(Level.Warning, "Received IFailedContext with Id: {0} from endpoint {1} with systemState {2} in retry#: {3}.", failedContext.Id, GetEndPointFromContext(failedContext), _systemState.CurrentState, _numberOfRetries);
lock (_lock)
{
using (Logger.LogFunction("IMRUDriver::IFailedContext"))
{
switch (_systemState.CurrentState)
{
case SystemState.TasksCompleted:
Logger.Log(Level.Info, "The Job has been completed. So ignoring the Context {0} failure.", failedContext.Id);
break;
case SystemState.ShuttingDown:
case SystemState.Fail:
break;
default:
var msg = string.Format(CultureInfo.InvariantCulture, "Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
throw new NotImplementedException(msg);
}
}
}
}
#endregion IFailedContext
#region IFailedTask
/// <summary>
/// Case SubmittingTasks/TasksRunning
/// This is the first failure received
/// Changes the system state to ShuttingDown
/// Record failed task in TaskManager
/// Closes all the other running tasks and set their state to TaskWaitingForClose
/// Try to recover
/// Case TasksCompleted:
/// Record, log and then ignore the failure.
/// Case ShuttingDown
/// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
/// Record failed task in TaskManager.
/// Try to recover
/// Other cases - not expected
/// </summary>
/// <param name="failedTask"></param>
public void OnNext(IFailedTask failedTask)
{
Logger.Log(Level.Warning, "{0}: {1} and message: {2} from endpoint {3} with systemState {4} in retry#: {5}.", FailedTaskMessage, failedTask.Id, failedTask.Message, GetEndPointFromContext(failedTask.GetActiveContext()), _systemState.CurrentState, _numberOfRetries);
lock (_lock)
{
using (Logger.LogFunction("IMRUDriver::IFailedTask"))
{
if (_evaluatorsForceClosed.Contains(failedTask.GetActiveContext().Value.EvaluatorId))
{
Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedTask event..", failedTask.GetActiveContext().Value.EvaluatorId, failedTask.Id);
return;
}
switch (_systemState.CurrentState)
{
case SystemState.SubmittingTasks:
case SystemState.TasksRunning:
// When the event FailedNode happens, change the system state to ShuttingDown
_systemState.MoveNext(SystemStateEvent.FailedNode);
_taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask);
_taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
TryRecovery();
break;
case SystemState.TasksCompleted:
_taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask);
Logger.Log(Level.Info, "The Job has been completed. So ignoring the Task {0} failure.", failedTask.Id);
break;
case SystemState.ShuttingDown:
_taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask);
TryRecovery();
break;
default:
UnexpectedState(failedTask.Id, "IFailedTask");
break;
}
}
}
}
#endregion IFailedTask
private void TimeoutMonitor(object source, ElapsedEventArgs e)
{
Logger.Log(Level.Info, "Entering TimeoutMonitor at {0}", DateTime.Now);
lock (_lock)
{
switch (_systemState.CurrentState)
{
// TODO: Handle time out if ActiveContexts are not received in timeout limit
case SystemState.WaitingForEvaluator:
break;
// TODO: Handle time out if RunningTasks are not received in timeout limit
case SystemState.SubmittingTasks:
break;
// TODO: Handle time out if CompletedTasks are not received in timeout limit
case SystemState.TasksRunning:
break;
// Handle timeout for closing tasks
case SystemState.ShuttingDown:
Logger.Log(Level.Info, "_taskManager.AverageClosingTime {0}, _minTaskWaitingForCloseTimeout: {1}", _taskManager.AverageClosingTime(), _minTaskWaitingForCloseTimeout);
int taskClosingTimeout = Math.Max(_minTaskWaitingForCloseTimeout, _taskManager.AverageClosingTime() * TaskWaitingForCloseTimeFactor);
var waitingTasks = _taskManager.TasksTimeoutInState(TaskState.TaskWaitingForClose, taskClosingTimeout);
if (waitingTasks.Any())
{
Logger.Log(Level.Info, "There are {0} tasks that timed out", waitingTasks.Count);
WaitingForCloseTaskNoResponseAction(waitingTasks);
}
break;
case SystemState.TasksCompleted:
break;
case SystemState.Fail:
break;
}
}
}
/// <summary>
/// For tasks that are in WaitingForCloseState and has no response in specified timeout
/// kill the evaluator and set the other states as if we received the FailedEvaluator
/// Then try recovery
/// </summary>
/// <param name="tasks"></param>
private void WaitingForCloseTaskNoResponseAction(IList<KeyValuePair<string, TaskInfo>> tasks)
{
foreach (var t in tasks)
{
string evaluatorId = t.Value.ActiveContext.EvaluatorId;
if (!_evaluatorsForceClosed.Contains(evaluatorId))
{
_evaluatorsForceClosed.Add(evaluatorId);
Logger.Log(Level.Info,
"WaitingForCloseTask [{0}] has no response after timeout. Kill the evaluator: [{1}] and dispose the context: [{2}].",
t.Key,
evaluatorId,
t.Value.ActiveContext.Id);
t.Value.ActiveContext.Dispose();
var isMaster = _evaluatorManager.IsMasterEvaluatorId(evaluatorId);
_evaluatorManager.RecordFailedEvaluator(evaluatorId);
_contextManager.Remove(t.Value.ActiveContext.Id);
_taskManager.RecordKillClosingTask(t.Key);
// Push evaluator id back to PartitionIdProvider if it is not master
if (!isMaster)
{
_serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(evaluatorId);
}
}
}
TryRecovery();
}
public void OnNext(IJobCancelled value)
{
lock (_lock)
{
_cancelEvent = value;
_systemState.MoveNext(SystemStateEvent.NotRecoverable);
FailAction();
}
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
private void UnexpectedState(string id, string eventName)
{
var msg = string.Format(CultureInfo.InvariantCulture,
"Received {0} for [{1}], but system status is {2}.",
eventName,
id,
_systemState.CurrentState);
Exceptions.Throw(new IMRUSystemException(msg), Logger);
}
/// <summary>
/// If all the tasks are in final state, if the system is recoverable, start recovery
/// else, change the system state to Fail then take Fail action
/// </summary>
private void TryRecovery()
{
if (_taskManager.AreAllTasksInFinalState())
{
if (IsRecoverable())
{
_isFirstTry = false;
RecoveryAction();
}
else
{
Logger.Log(Level.Warning, "The system is not recoverable, change the state to Fail.");
_systemState.MoveNext(SystemStateEvent.NotRecoverable);
FailAction();
}
}
}
private string GetMapperTaskIdByEvaluatorId(string evaluatorId)
{
return string.Format("{0}-{1}-{2}",
IMRUConstants.MapTaskPrefix,
_serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId),
_numberOfRetries);
}
/// <summary>
/// This method is called when all the tasks are successfully completed.
/// </summary>
private void DoneAction()
{
Logger.Log(Level.Info, "Shutting down Evaluators!!!");
ShutDownAllEvaluators();
Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries);
DisposeResources();
}
/// <summary>
/// This method is called when there are failures and the system is not recoverable.
/// </summary>
private void FailAction()
{
ShutDownAllEvaluators();
var failMessage = _cancelEvent != null
? string.Format(CultureInfo.InvariantCulture,
"{0} Job cancelled at {1}. cancellation message: {2}",
FailActionPrefix, _cancelEvent.Timestamp.ToString("u"), _cancelEvent.Message)
: string.Format(CultureInfo.InvariantCulture,
"{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}, master evaluator failed is {3}.",
FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers(), _evaluatorManager.IsMasterEvaluatorFailed());
DisposeResources();
Exceptions.Throw(new ApplicationException(failMessage), Logger);
}
/// <summary>
/// Dispose resources
/// </summary>
private void DisposeResources()
{
lock (_disposableResources)
{
_disposableResources.ForEach(handle =>
{
try
{
handle.Dispose();
}
catch (Exception ex)
{
Logger.Log(Level.Error, "Failed to dispose a resource: {0}", ex);
}
});
_disposableResources.Clear();
}
}
/// <summary>
/// Shuts down evaluators
/// </summary>
private void ShutDownAllEvaluators()
{
foreach (var context in _contextManager.ActiveContexts)
{
Logger.Log(Level.Verbose, "Disposing active context: {0}", context.Id);
context.Dispose();
}
}
/// <summary>
/// This method is called for recovery. It resets Failed Evaluators and changes state to WaitingForEvaluator
/// If there is no failed mappers, meaning the recovery is caused by failed tasks, resubmit all the tasks.
/// Else, based on the number of failed evaluators, requests missing map evaluators
/// </summary>
private void RecoveryAction()
{
lock (_lock)
{
_numberOfRetries++;
var msg = string.Format(CultureInfo.InvariantCulture,
"Start recovery with _numberOfRetryForFaultTolerant {0}, NumberofFailedMappersToRequest {1}.",
_numberOfRetries,
_evaluatorManager.MappersToRequest());
Logger.Log(Level.Info, msg);
_systemState.MoveNext(SystemStateEvent.Recover);
var mappersToRequest = _evaluatorManager.MappersToRequest();
_evaluatorManager.ResetFailedEvaluators();
if (mappersToRequest == 0)
{
Logger.Log(Level.Info, "There is no failed Evaluator in this recovery but failed tasks.");
if (_contextManager.AreAllContextsReceived)
{
OnNext(_contextManager.ActiveContexts);
}
else
{
Exceptions.Throw(new IMRUSystemException("In recovery, there are no Failed evaluators but not all the contexts are received"), Logger);
}
}
else
{
Logger.Log(Level.Info, "Requesting {0} map Evaluators.", mappersToRequest);
_evaluatorManager.RequestMapEvaluators(mappersToRequest);
}
}
}
/// <summary>
/// Checks if the system is recoverable.
/// </summary>
/// <returns></returns>
private bool IsRecoverable()
{
var msg = string.Format(CultureInfo.InvariantCulture,
"IsRecoverable: _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}, NumberOfAppErrors {2}, IsMasterEvaluatorFailed {3} AllowedNumberOfEvaluatorFailures {4}, _maxRetryNumberForFaultTolerant {5}.",
_numberOfRetries,
_evaluatorManager.NumberofFailedMappers(),
_taskManager.NumberOfAppErrors(),
_evaluatorManager.IsMasterEvaluatorFailed(),
_evaluatorManager.AllowedNumberOfEvaluatorFailures,
_maxRetryNumberForFaultTolerant);
Logger.Log(Level.Info, msg);
return !_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()
&& _taskManager.NumberOfAppErrors() == 0
&& !_evaluatorManager.IsMasterEvaluatorFailed()
&& _numberOfRetries < _maxRetryNumberForFaultTolerant;
}
/// <summary>
/// Generates map task configuration given the active context.
/// Merge configurations of all the inputs to the MapTaskHost.
/// </summary>
/// <param name="activeContext">Active context to which task needs to be submitted</param>
/// <param name="taskId">Task Id</param>
/// <returns>Map task configuration</returns>
private IConfiguration GetMapperTaskConfiguration(IActiveContext activeContext, string taskId)
{
IConfiguration mapSpecificConfig;
if (!_perMapperConfigurationStack.TryPop(out mapSpecificConfig))
{
Exceptions.Throw(
new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}",
activeContext.Id)),
Logger);
}
return TangFactory.GetTang()
.NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier, taskId)
.Set(TaskConfiguration.Task, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
.Set(TaskConfiguration.OnClose, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
.Build(),
_configurationManager.MapFunctionConfiguration,
mapSpecificConfig,
GetGroupCommConfiguration())
.BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString())
.Build();
}
/// <summary>
/// Generates the update task configuration.
/// Merge configurations of all the inputs to the UpdateTaskHost.
/// </summary>
/// <returns>Update task configuration</returns>
private IConfiguration GetMasterTaskConfiguration(string taskId)
{
var partialTaskConf =
TangFactory.GetTang()
.NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
.Set(TaskConfiguration.Identifier,
taskId)
.Set(TaskConfiguration.Task,
GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class)
.Set(TaskConfiguration.OnClose,
GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class)
.Build(),
_configurationManager.UpdateFunctionConfiguration,
_configurationManager.ResultHandlerConfiguration,
GetGroupCommConfiguration())
.BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString())
.Build();
// This piece of code basically checks if user has given any implementation
// of IIMRUResultHandler. If not then bind it to default implementation which
// does nothing. For interfaces with generic type we cannot assign default
// implementation.
try
{
TangFactory.GetTang()
.NewInjector(partialTaskConf)
.GetInstance<IIMRUResultHandler<TResult>>();
}
catch (InjectionException)
{
partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf)
.BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class,
GenericType<DefaultResultHandler<TResult>>.Class)
.Build();
Logger.Log(Level.Info,
"User has not given any way to handle IMRU result, defaulting to ignoring it");
}
return partialTaskConf;
}
/// <summary>
/// Creates the group communication configuration to be added to the tasks
/// </summary>
/// <returns>The group communication configuration</returns>
private IConfiguration GetGroupCommConfiguration()
{
var codecConfig =
TangFactory.GetTang()
.NewConfigurationBuilder(
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
_configurationManager.UpdateFunctionCodecsConfiguration)
.Build();
return Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig);
}
/// <summary>
/// Adds broadcast and reduce operators to the default communication group
/// </summary>
private ICommunicationGroupDriver AddCommunicationGroupWithOperators()
{
var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration;
var mapOutputPipelineDataConverterConfig = _configurationManager.MapOutputPipelineDataConverterConfiguration;
var mapInputPipelineDataConverterConfig = _configurationManager.MapInputPipelineDataConverterConfiguration;
// TODO check the specific exception type
try
{
TangFactory.GetTang()
.NewInjector(mapInputPipelineDataConverterConfig)
.GetInstance<IPipelineDataConverter<TMapInput>>();
mapInputPipelineDataConverterConfig =
TangFactory.GetTang()
.NewConfigurationBuilder(mapInputPipelineDataConverterConfig)
.BindImplementation(
GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class,
GenericType<MapInputwithControlMessagePipelineDataConverter<TMapInput>>.Class)
.Build();
}
catch (Exception)
{
mapInputPipelineDataConverterConfig = TangFactory.GetTang()
.NewConfigurationBuilder()
.BindImplementation(
GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class,
GenericType<DefaultPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class)
.Build();
}
try
{
TangFactory.GetTang()
.NewInjector(mapOutputPipelineDataConverterConfig)
.GetInstance<IPipelineDataConverter<TMapOutput>>();
}
catch (Exception)
{
mapOutputPipelineDataConverterConfig =
TangFactory.GetTang()
.NewConfigurationBuilder()
.BindImplementation(GenericType<IPipelineDataConverter<TMapOutput>>.Class,
GenericType<DefaultPipelineDataConverter<TMapOutput>>.Class)
.Build();
}
var commGroup =
_groupCommDriver.NewCommunicationGroup(IMRUConstants.CommunicationGroupName, _totalMappers + 1)
.AddBroadcast<MapInputWithControlMessage<TMapInput>>(
IMRUConstants.BroadcastOperatorName,
_groupCommDriver.MasterTaskId,
TopologyTypes.Tree,
mapInputPipelineDataConverterConfig)
.AddReduce<TMapOutput>(
IMRUConstants.ReduceOperatorName,
_groupCommDriver.MasterTaskId,
TopologyTypes.Tree,
reduceFunctionConfig,
mapOutputPipelineDataConverterConfig)
.Build();
return commGroup;
}
/// <summary>
/// Construct the stack of map configuration which is specific to each mapper. If user does not
/// specify any then its empty configuration
/// </summary>
/// <param name="totalMappers">Total mappers</param>
/// <returns>Stack of configuration</returns>
private ConcurrentStack<IConfiguration> ConstructPerMapperConfigStack(int totalMappers)
{
var perMapperConfiguration = new ConcurrentStack<IConfiguration>();
for (int i = 0; i < totalMappers; i++)
{
var emptyConfig = TangFactory.GetTang().NewConfigurationBuilder().Build();
IConfiguration config = _perMapperConfigs.Aggregate(emptyConfig,
(current, configGenerator) =>
Configurations.Merge(current, configGenerator.GetMapperConfiguration(i, totalMappers)));
perMapperConfiguration.Push(config);
}
return perMapperConfiguration;
}
/// <summary>
/// look up endpoint for given id
/// </summary>
/// <param name="taskId">Registered identifier in name server></param>
/// <returns></returns>
private string GetEndPointFromTaskId(string taskId)
{
List<string> t = new List<string>();
t.Add(taskId);
var ips = _nameServer.Lookup(t);
if (ips.Count > 0)
{
var ip = ips.FirstOrDefault();
if (ip != null)
{
return ip.Endpoint.ToString();
}
}
return null;
}
private string GetEndPoint(IFailedTask failedTask)
{
return GetEndPointFromTaskId(failedTask.Id) ?? GetEndPointFromContext(failedTask.GetActiveContext());
}
private string GetEndPointFromContext(IFailedContext context)
{
if (context == null || context.EvaluatorDescriptor == null || context.EvaluatorDescriptor.NodeDescriptor == null)
{
return null;
}
return context.EvaluatorDescriptor.NodeDescriptor.HostName;
}
private string GetEndPointFromContext(Optional<IActiveContext> context)
{
if (!context.IsPresent() || context.Value == null || context.Value.EvaluatorDescriptor == null || context.Value.EvaluatorDescriptor.NodeDescriptor == null)
{
return null;
}
return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName;
}
/// <summary>
/// Ensure the Timer is disposed when the driver object is deleted
/// </summary>
~IMRUDriver()
{
if (_timeoutMonitorTimer != null)
{
_timeoutMonitorTimer.Stop();
_timeoutMonitorTimer.Dispose();
_timeoutMonitorTimer = null;
}
}
}
}