[REEF-1511] Add timeout for Task shutdown during IMRU recovery

During IMRU FT recovery sometimes the tasks that are supposed
to be closed by driver don't report back, causing the system hang.
This change adds a timeout for tasks closed by driver, so that
evaluators of unresponsive tasks are shut down after timeout.

The average task closing time is recorded in the TaskManager.
This number is a reference to define the timeout on the fly.
In case the average number is accessed before the data is accumulated,
or the average number is too low in some scenarios, we have
a configurable MinTaskWaitingForCloseTimeout to ensure that
the driver waits long enough before killing the evaluators.

JIRA:
  [REEF-1511](https://issues.apache.org/jira/browse/REEF-1511)

Pull request:
  This closes #1201
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index 7787d1a..d0cdda8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Threading;
 using NSubstitute;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
@@ -177,6 +178,22 @@
         }
 
         /// <summary>
+        /// Tests AverageClosingTime
+        /// </summary>
+        [Fact]
+        public void TestTasksClosingTime()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+            Thread.Sleep(100);
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+
+            Assert.True(taskManager.AverageClosingTime() > 0);
+        }
+
+        /// <summary>
         /// Tests RecordCompletedRunningTask
         /// </summary>
         [Fact]
@@ -214,6 +231,32 @@
         }
 
         /// <summary>
+        /// Tests closing running tasks
+        /// </summary>
+        [Fact]
+        public void TestTasksWaitingForClose()
+        {
+            var taskManager = TaskManagerWithTasksSubmitted();
+
+            taskManager.RecordRunningTask(CreateMockRunningTask(MasterTaskId));
+            taskManager.RecordRunningTask(CreateMockRunningTask(MapperTaskIdPrefix + 1));
+
+            var runningTask2 = CreateMockRunningTask(MapperTaskIdPrefix + 2);
+            taskManager.RecordRunningTaskDuringSystemFailure(runningTask2, TaskManager.CloseTaskByDriver);
+
+            taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver);
+            Thread.Sleep(100);
+            var tasks = taskManager.TasksTimeoutInState(TaskState.TaskWaitingForClose, 50);
+            Assert.Equal(tasks.Count, 3);
+
+            foreach (var t in tasks)
+            {
+                taskManager.RecordKillClosingTask(t.Key);
+            }
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskClosedByDriver));
+        }
+
+        /// <summary>
         /// Tests record failed tasks after all the tasks are running
         /// </summary>
         [Fact]
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 78e0c4d..d0cd537 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -20,6 +20,7 @@
 using System.Collections.Generic;
 using System.Globalization;
 using System.Linq;
+using System.Timers;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
@@ -91,6 +92,11 @@
         private readonly object _lock = new object();
 
         /// <summary>
+        /// Multiply this fact on average closing time to give room for tasks to be closed by itself.
+        /// </summary>
+        private const int TaskWaitingForCloseTimeFactor = 3;
+
+        /// <summary>
         /// Manages Tasks, maintains task states and responsible for task submission for the driver.
         /// </summary>
         private TaskManager _taskManager;
@@ -127,10 +133,26 @@
         private int _numberOfRetries;
 
         /// <summary>
+        /// Minimum timeout in milliseconds for TaskWaitingForClose
+        /// </summary>
+        private readonly int _minTaskWaitingForCloseTimeout;
+
+        /// <summary>
         /// Manages lifecycle events for driver, like JobCancelled event.
         /// </summary>
         private readonly List<IDisposable> _disposableResources = new List<IDisposable>();
 
+        /// <summary>
+        /// An internal timer that monitors the timeout for driver events
+        /// </summary>
+        private Timer _timeoutMonitorTimer;
+
+        /// <summary>
+        /// Record evaluator ids that are closed after timeout.
+        /// The CompletedTask and failedEvaluator events from those tasks should be ignored to avoid double counted.
+        /// </summary>
+        private readonly IList<string> _evaluatorsForceClosed = new List<string>();
+
         [Inject]
         private IMRUDriver(IPartitionedInputDataSet dataSet,
             [Parameter(typeof(PerMapConfigGeneratorSet))] ISet<IPerMapperConfigGenerator> perMapperConfigs,
@@ -142,6 +164,8 @@
             [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
             [Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction,
             [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery,
+            [Parameter(typeof(MinTaskWaitingForCloseTimeout))] int minTaskWaitingForCloseTimeout,
+            [Parameter(typeof(TimeoutMonitoringInterval))] int timeoutMonitoringInterval,
             [Parameter(typeof(InvokeGC))] bool invokeGC,
             IGroupCommDriver groupCommDriver,
             INameServer nameServer,
@@ -154,6 +178,7 @@
             _totalMappers = dataSet.Count;
             _invokeGC = invokeGC;
             _maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery;
+            _minTaskWaitingForCloseTimeout = minTaskWaitingForCloseTimeout;
 
             _contextManager = new ActiveContextManager(_totalMappers + 1);
             _contextManager.Subscribe(this);
@@ -172,15 +197,25 @@
                 var handle = lifecycleManager.Subscribe(this as IObserver<IJobCancelled>);
                 _disposableResources.Add(handle);
             }
-            
+
+            _timeoutMonitorTimer = new Timer();
+            _timeoutMonitorTimer.Elapsed += TimeoutMonitor;
+            _timeoutMonitorTimer.Interval = timeoutMonitoringInterval;
+            if (timeoutMonitoringInterval > 0)
+            {
+                _timeoutMonitorTimer.Enabled = true;
+            }
+
             var msg =
-                string.Format(CultureInfo.InvariantCulture, "map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, maxRetry {4}, allowedFailedEvaluators {5}.",
+                string.Format(CultureInfo.InvariantCulture, "map task memory: {0}, update task memory: {1}, map task cores: {2}, update task cores: {3}, maxRetry: {4}, allowedFailedEvaluators: {5}, minTaskWaitingForCloseTimeout: {6}, timeoutMonitoringInterval: {7}.",
                     memoryPerMapper,
                     memoryForUpdateTask,
                     coresPerMapper,
                     coresForUpdateTask,
                     _maxRetryNumberForFaultTolerant,
-                    allowedFailedEvaluators);
+                    allowedFailedEvaluators,
+                    minTaskWaitingForCloseTimeout,
+                    timeoutMonitoringInterval);
             Logger.Log(Level.Info, msg);
         }
 
@@ -446,6 +481,11 @@
             Logger.Log(Level.Info, "Received ICompletedTask {0}, with systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, _numberOfRetries);
             lock (_lock)
             {
+                if (_evaluatorsForceClosed.Contains(completedTask.ActiveContext.EvaluatorId))
+                {
+                    Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring ICompletedTask event.", completedTask.ActiveContext.EvaluatorId, completedTask.Id);
+                    return;
+                }
                 switch (_systemState.CurrentState)
                 {
                     case SystemState.TasksRunning:
@@ -516,6 +556,12 @@
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
                 {
+                    if (_evaluatorsForceClosed.Contains(failedEvaluator.Id))
+                    {
+                        Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedEvaluator event.", failedEvaluator.Id, failedEvaluator.FailedTask.IsPresent() ? failedEvaluator.FailedTask.Value.Id : "NoTaskId");
+                        return;
+                    }
+
                     var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
                     _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
                     _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
@@ -644,6 +690,11 @@
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedTask"))
                 {
+                    if (_evaluatorsForceClosed.Contains(failedTask.GetActiveContext().Value.EvaluatorId))
+                    {
+                        Logger.Log(Level.Info, "Evaluator {0} has been closed after task {1} timeout, ignoring IFailedTask event..", failedTask.GetActiveContext().Value.EvaluatorId, failedTask.Id);
+                        return;
+                    }
                     switch (_systemState.CurrentState)
                     {
                         case SystemState.SubmittingTasks:
@@ -674,6 +725,82 @@
         }
         #endregion IFailedTask
 
+        private void TimeoutMonitor(object source, ElapsedEventArgs e)
+        {
+            Logger.Log(Level.Info, "Entering TimeoutMonitor at {0}", DateTime.Now);
+            lock (_lock)
+            {
+                switch (_systemState.CurrentState)
+                {
+                    // TODO: Handle time out if ActiveContexts are not received in timeout limit
+                    case SystemState.WaitingForEvaluator:
+                        break;
+
+                    // TODO: Handle time out if RunningTasks are not received in timeout limit
+                    case SystemState.SubmittingTasks:
+                        break;
+
+                    // TODO: Handle time out if CompletedTasks are not received in timeout limit
+                    case SystemState.TasksRunning:
+                        break;
+
+                    // Handle timeout for closing tasks
+                    case SystemState.ShuttingDown:
+                        Logger.Log(Level.Info, "_taskManager.AverageClosingTime {0}, _minTaskWaitingForCloseTimeout: {1}", _taskManager.AverageClosingTime(), _minTaskWaitingForCloseTimeout);
+                        int taskClosingTimeout = Math.Max(_minTaskWaitingForCloseTimeout, _taskManager.AverageClosingTime() * TaskWaitingForCloseTimeFactor);
+                        var waitingTasks = _taskManager.TasksTimeoutInState(TaskState.TaskWaitingForClose, taskClosingTimeout);
+
+                        if (waitingTasks.Any())
+                        {
+                            WaitingForCloseTaskNoResponseAction(waitingTasks);
+                        }
+                        break;
+
+                    case SystemState.TasksCompleted:
+                        break;
+
+                    case SystemState.Fail:
+                        break;
+                }
+            }
+        }
+
+        /// <summary>
+        /// For tasks that are in WaitingForCloseState and has no response in specified timeout
+        /// kill the evaluator and set the other states as if we received the FailedEvaluator
+        /// Then try recovery
+        /// </summary>
+        /// <param name="tasks"></param>
+        private void WaitingForCloseTaskNoResponseAction(IList<KeyValuePair<string, TaskInfo>> tasks)
+        {
+            foreach (var t in tasks)
+            {
+                string evaluatorId = t.Value.ActiveContext.EvaluatorId;
+                if (!_evaluatorsForceClosed.Contains(evaluatorId))
+                {
+                    _evaluatorsForceClosed.Add(evaluatorId);
+                    Logger.Log(Level.Info,
+                        "WaitingForCloseTask [{0}] has no response after timeout. Kill the evaluator: [{1}] and dispose the context: [{2}].",
+                        t.Key,
+                        evaluatorId,
+                        t.Value.ActiveContext.Id);
+
+                    t.Value.ActiveContext.Dispose();
+                    var isMaster = _evaluatorManager.IsMasterEvaluatorId(evaluatorId);
+                    _evaluatorManager.RecordFailedEvaluator(evaluatorId);
+                    _contextManager.Remove(t.Value.ActiveContext.Id);
+                    _taskManager.RecordKillClosingTask(t.Key);
+
+                    // Push evaluator id back to PartitionIdProvider if it is not master
+                    if (!isMaster)
+                    {
+                        _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(evaluatorId);
+                    }
+                }
+            }
+            TryRecovery();
+        }
+
         public void OnNext(IJobCancelled value)
         {
             lock (_lock)
@@ -1087,5 +1214,18 @@
             } 
             return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName; 
         }
+
+        /// <summary>
+        /// Ensure the Timer is disposed when the driver object is deleted
+        /// </summary>
+        ~IMRUDriver()
+        {
+            if (_timeoutMonitorTimer != null)
+            {
+                _timeoutMonitorTimer.Stop();
+                _timeoutMonitorTimer.Dispose();
+                _timeoutMonitorTimer = null;
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
index 6ae992d..deb027d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskInfo.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
 using Org.Apache.REEF.Tang.Interface;
@@ -38,6 +39,7 @@
             _taskState = taskState;
             _taskConfiguration = config;
             _activeContext = context;
+            TimeStateUpdated = DateTime.Now;
         }
 
         internal TaskStateMachine TaskState
@@ -54,5 +56,10 @@
         {
             get { return _activeContext; }
         }
+
+        /// <summary>
+        /// time that the task state is updated
+        /// </summary>
+        internal DateTime TimeStateUpdated { get; set; }
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index b2fe281..4ba9745 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -89,6 +89,16 @@
         private int _numberOfAppErrors = 0;
 
         /// <summary>
+        /// Total Task closing time span. It is used to calculate the average closing time.
+        /// </summary>
+        private TimeSpan _totalTaskClosingTimeSpan;
+
+        /// <summary>
+        /// Total number of the tasks that is closed by driver and then completed.
+        /// </summary>
+        private int _totalNumberOfClosedTasksByDriver;
+
+        /// <summary>
         /// Indicate if master task is completed running properly
         /// </summary>
         private bool _masterTaskCompletedRunning = false;
@@ -124,9 +134,6 @@
         ///   trying to add extra tasks
         ///   No Master Task is added in the collection
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <param name="taskConfiguration"></param>
-        /// <param name="activeContext"></param>
         internal void AddTask(string taskId, IConfiguration taskConfiguration, IActiveContext activeContext)
         {
             if (taskId == null)
@@ -177,7 +184,6 @@
         /// Adds the IRunningTask to the running tasks collection and update the task state to TaskRunning.
         /// Throws IMRUSystemException if running tasks already contains this task or tasks collection doesn't contain this task.
         /// </summary>
-        /// <param name="runningTask"></param>
         internal void RecordRunningTask(IRunningTask runningTask)
         {
             if (_runningTasks.ContainsKey(runningTask.Id))
@@ -215,7 +221,6 @@
         /// Changes the task state from RunningTask to CompletedTask if the task was running
         /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state
         /// </summary>
-        /// <param name="completedTask"></param>
         internal void RecordCompletedTask(ICompletedTask completedTask)
         {
             if (completedTask.Id.Equals(_masterTaskId))
@@ -234,7 +239,6 @@
         /// Removes the task from running tasks if the task was running
         /// Updates the task state to fail based on the error message in the failed task
         /// </summary>
-        /// <param name="failedTask"></param>
         internal void RecordFailedTaskDuringRunningOrSubmissionState(IFailedTask failedTask)
         {
             //// Remove the task from running tasks if it exists there
@@ -248,7 +252,6 @@
         /// Task could fail by communication error or any other application or system error during this time, as long as it is not 
         /// TaskFailedByEvaluatorFailure, update the task state based on the error received. 
         /// </summary>
-        /// <param name="failedTask"></param>
         internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask)
         {
             Logger.Log(Level.Info, "RecordFailedTaskDuringSystemShuttingDownState, exceptionType: {0}", GetTaskErrorEventByExceptionType(failedTask).ToString());
@@ -269,7 +272,6 @@
         /// Removes the task from RunningTasks if the task associated with the FailedEvaluator is present and running. 
         /// Sets the task state to TaskFailedByEvaluatorFailure 
         /// </summary>
-        /// <param name="failedEvaluator"></param>
         internal void RecordTaskFailWhenReceivingFailedEvaluator(IFailedEvaluator failedEvaluator)
         {
             if (failedEvaluator.FailedTask.IsPresent())
@@ -301,6 +303,29 @@
             }
         }
 
+        /// <summary>
+        /// Waiting for close task has no response in given time
+        /// Driver will kill the evaluator and move the task to TaskFailedByEvaluatorFailure state
+        /// </summary>
+        internal void RecordKillClosingTask(string taskId)
+        {
+            var taskInfo = GetTaskInfo(taskId);
+            if (!taskInfo.TaskState.CurrentState.Equals(TaskState.TaskWaitingForClose))
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture,
+                           "The task [{0}] is in [{1}] state, expecting it is in TaskWaitingForClose state.",
+                           taskId, taskInfo.TaskState.CurrentState);
+                Logger.Log(Level.Error, msg);
+                throw new IMRUSystemException(msg);
+            }
+            UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
+        }
+
+        /// <summary>
+        /// Find the task that is associated with the given evaluator
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
         private string FindTaskAssociatedWithTheEvalutor(string evaluatorId)
         {
             return _tasks.Where(e => e.Value.ActiveContext.EvaluatorId.Equals(evaluatorId)).Select(e => e.Key).FirstOrDefault();
@@ -309,17 +334,46 @@
         /// <summary>
         /// Updates task state for a given taskId based on the task event
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <param name="taskEvent"></param>
         private void UpdateState(string taskId, TaskStateEvent taskEvent)
         {
-            GetTaskInfo(taskId).TaskState.MoveNext(taskEvent);
+            var taskInfo = GetTaskInfo(taskId);
+            RecordingTime(taskId, taskInfo, taskEvent);
+            taskInfo.TaskState.MoveNext(taskEvent);
+            taskInfo.TimeStateUpdated = DateTime.Now;
+        }
+
+        /// <summary>
+        /// Recording timing from one task state to another
+        /// The method can be extended to record the time for other task states
+        /// The log level should be changed to verb once we complete the testing
+        /// </summary>
+        private void RecordingTime(string taskId, TaskInfo taskInfo, TaskStateEvent taskEvent)
+        {
+            if (taskInfo.TaskState.CurrentState.Equals(TaskState.TaskWaitingForClose) && taskEvent.Equals(TaskStateEvent.CompletedTask))
+            {
+                var timeSpan = DateTime.Now - taskInfo.TimeStateUpdated;
+                _totalNumberOfClosedTasksByDriver++;
+                _totalTaskClosingTimeSpan = _totalTaskClosingTimeSpan.Add(timeSpan);
+                Logger.Log(Level.Info, "RecordClosingTime for task id {0}, closing time: {1}, average closing time: {2}.", taskId, timeSpan.Milliseconds, _totalTaskClosingTimeSpan.Milliseconds/_totalNumberOfClosedTasksByDriver);
+            }
+        }
+
+        /// <summary>
+        /// Get average closing time
+        /// </summary>
+        /// <returns></returns>
+        internal int AverageClosingTime()
+        {
+            if (_totalNumberOfClosedTasksByDriver != 0)
+            {
+                return _totalTaskClosingTimeSpan.Milliseconds/_totalNumberOfClosedTasksByDriver;
+            }
+            return 0;
         }
 
         /// <summary>
         /// Returns true if master task has completed and produced result
         /// </summary>
-        /// <returns></returns>
         internal bool IsMasterTaskCompletedRunning()
         {
             return _masterTaskCompletedRunning;
@@ -328,7 +382,6 @@
         /// <summary>
         /// Checks if all the tasks are running.
         /// </summary>
-        /// <returns></returns>
         internal bool AreAllTasksRunning()
         {
             return AreAllTasksInState(StateMachine.TaskState.TaskRunning) &&
@@ -339,13 +392,30 @@
         /// When master task is completed, that means the system has got the result expected 
         /// regardless of other mapper tasks returned or not. 
         /// </summary>
-        /// <returns></returns>
         internal bool IsJobDone()
         {
             return IsMasterTaskCompletedRunning();
         }
 
         /// <summary>
+        /// Finds all the tasks that are waiting for close and waiting time is timeout
+        /// </summary>
+        internal IList<KeyValuePair<string, TaskInfo>> TasksTimeoutInState(TaskState state, int timeoutMilliseconds)
+        {
+            return _tasks.Where(t => t.Value.TaskState.CurrentState.Equals(state) && Timeout(t.Value.TimeStateUpdated, timeoutMilliseconds))
+                .ToList();
+        }
+
+        /// <summary>
+        /// Check if the given DateTime has passed the timeoutMilliseconds
+        /// </summary>
+        private static bool Timeout(DateTime time, int timeoutMilliseconds)
+        {
+            TimeSpan span = DateTime.Now - time;
+            return span.Milliseconds > timeoutMilliseconds;
+        }
+
+        /// <summary>
         /// This method is called when receiving either IFailedEvaluator or IFailedTask event
         /// Driver tries to close all the running tasks and clean the running task collection in the end.
         /// If all the tasks are running, the total number of running tasks should be _totalExpectedTasks -1
@@ -371,8 +441,6 @@
         /// Then move the task state to WaitingTaskToClose
         /// Throw IMRUSystemException if runningTask is null or the running task is already added in the running task collection
         /// </summary>
-        /// <param name="runningTask"></param>
-        /// <param name="closeMessage"></param>
         internal void RecordRunningTaskDuringSystemFailure(IRunningTask runningTask, string closeMessage)
         {
             if (runningTask == null)
@@ -396,8 +464,6 @@
         /// For unknown exceptions or exceptions that doesn't belong to defined IMRU task exceptions
         /// treat then as application error.
         /// </summary>
-        /// <param name="failedTask"></param>
-        /// <returns></returns>
         private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask)
         {
             var exception = failedTask.AsError();
@@ -457,7 +523,6 @@
         /// <summary>
         /// Returns the number of application error caused by FailedTask
         /// </summary>
-        /// <returns></returns>
         internal int NumberOfAppErrors()
         {
             return _numberOfAppErrors;
@@ -466,7 +531,6 @@
         /// <summary>
         /// Checks if all the tasks are in final states
         /// </summary>
-        /// <returns></returns>
         internal bool AreAllTasksInFinalState()
         {
             var notInFinalState = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Take(5).ToList();
@@ -504,8 +568,6 @@
         /// <summary>
         /// Gets current state of the task
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <returns></returns>
         internal TaskState GetTaskState(string taskId)
         {
             var taskInfo = GetTaskInfo(taskId);
@@ -516,8 +578,6 @@
         /// Checks if all the tasks are in the state specified. 
         /// For example, passing TaskState.TaskRunning to check if all the tasks are in TaskRunning state
         /// </summary>
-        /// <param name="taskState"></param>
-        /// <returns></returns>
         internal bool AreAllTasksInState(TaskState taskState)
         {
             return _tasks.All(t => t.Value.TaskState.CurrentState == taskState);
@@ -566,7 +626,6 @@
         /// <summary>
         /// Checks if master task has been added
         /// </summary>
-        /// <returns></returns>
         private bool MasterTaskExists()
         {
             return _tasks.ContainsKey(_masterTaskId);
@@ -576,8 +635,6 @@
         /// Gets task Tuple based on the given taskId. 
         /// Throws IMRUSystemException if the task Tuple is not in the task collection.
         /// </summary>
-        /// <param name="taskId"></param>
-        /// <returns></returns>
         private TaskInfo GetTaskInfo(string taskId)
         {
             TaskInfo taskInfo;
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs
new file mode 100644
index 0000000..2124eef
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MinTaskWaitingForCloseTimeout.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("Minimum timeout after which unresponsive tasks which are supposed to be closed will be killed together with their evaluators.", "TaskWaitingForCloseTimeout", "30000")]
+    public sealed class MinTaskWaitingForCloseTimeout : Name<int>
+    {
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs
new file mode 100644
index 0000000..bf5247d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/TimeoutMonitoringInterval.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("Interval at which checks for timeout are done, in milliseconds.", "TimeoutMonitoringInterval", "50000")]
+    public sealed class TimeoutMonitoringInterval : Name<int>
+    {
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 330f4a0..9dc3e9a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -128,6 +128,8 @@
     <Compile Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SerializedUpdateTaskStateConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SleepIntervalParameter.cs" />
+    <Compile Include="OnREEF\Parameters\MinTaskWaitingForCloseTimeout.cs" />
+    <Compile Include="OnREEF\Parameters\TimeoutMonitoringInterval.cs" />
     <Compile Include="OnREEF\ResultHandler\DefaultResultHandler.cs" />
     <Compile Include="OnREEF\ResultHandler\ResultOutputLocation.cs" />
     <Compile Include="OnREEF\ResultHandler\WriteResultHandler.cs" />