[REEF-1685] Complete the Job properly if update/master task is completed from running state

In stress testing, we have seen the following scenario: master/update task is successfully completed,
and most of mapper tasks are also completed, but then driver receives an IFailedEvaluator event.
This would result in system transitioning to ShuttingDown state and doing an unnecessary retry.

When the driver receives ICompletedTask from master task which was in running state,
that means we have completed the calculation and result has been written to the output.
The driver should execute DoneAction to dispose of all the contexts and shut down the system.
After that, any FailedEvalutor/FailedTask events received should be ignored.

JIRA:
  [REEF-1685](https://issues.apache.org/jira/browse/REEF-1685)

Pull request:
  This closes #1200
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index d35f7c8..7787d1a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -177,6 +177,25 @@
         }
 
         /// <summary>
+        /// Tests RecordCompletedRunningTask
+        /// </summary>
+        [Fact]
+        public void TestIsMasterCompleted()
+        {
+            var taskManager = TaskManagerWithTasksRunning();
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskRunning));
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1));
+            Assert.False(taskManager.IsMasterTaskCompletedRunning());
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+            Assert.True(taskManager.IsMasterTaskCompletedRunning());
+
+            taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2));
+            Assert.True(taskManager.AreAllTasksInState(TaskState.TaskCompleted));
+        }
+
+        /// <summary>
         /// Tests closing running tasks
         /// </summary>
         [Fact]
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index a895a78..78e0c4d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -430,10 +430,13 @@
         /// <summary>
         /// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State:
         /// Case TasksRunning
-        ///     Updates task state to TaskCompleted
+        ///     Check if it is master task, then set master task completed    
+        ///     Then record completed running and updates task state from TaskRunning to TaskCompleted
         ///     If all tasks are completed, sets system state to TasksCompleted and then go to Done action
+        /// Case TasksCompleted:
+        ///     Record, log and then ignore the event        
         /// Case ShuttingDown
-        ///     Updates task state to TaskCompleted
+        ///     Record completed running and updates task state to TaskCompleted
         ///     Try to recover
         /// Other cases - not expected 
         /// </summary>
@@ -447,18 +450,24 @@
                 {
                     case SystemState.TasksRunning:
                         _taskManager.RecordCompletedTask(completedTask);
-                        if (_taskManager.AreAllTasksCompleted())
+                        if (_taskManager.IsJobDone())
                         {
                             _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
-                            Logger.Log(Level.Info, "All tasks are completed, systemState {0}", _systemState.CurrentState);
+                            Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState);
                             DoneAction();
                         }
                         break;
+
                     case SystemState.ShuttingDown:
                         // The task might be in running state or waiting for close, record the completed task
                         _taskManager.RecordCompletedTask(completedTask);
                         TryRecovery();
                         break;
+
+                    case SystemState.TasksCompleted:
+                        _taskManager.RecordCompletedTask(completedTask);
+                        break;
+
                     default:
                         UnexpectedState(completedTask.Id, "ICompletedTask");
                         break;
@@ -470,7 +479,6 @@
         #region IFailedEvaluator
         /// <summary>
         /// IFailedEvaluator handler. It specifies what to do when an evaluator fails.
-        /// If we get all completed tasks then ignore the failure. Otherwise, take the following actions based on the system state: 
         /// Case WaitingForEvaluator
         ///     This happens in the middle of submitting contexts. We just need to remove the failed evaluator 
         ///     from EvaluatorManager and remove associated active context, if any, from ActiveContextManager
@@ -483,6 +491,8 @@
         ///     Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure
         ///     Closes all the other running tasks
         ///     Try to recover in case it is the last failure received
+        /// Case TasksCompleted:
+        ///     Record, log and then ignore the failure. 
         /// Case ShuttingDown
         ///     This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
         ///     Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager
@@ -506,14 +516,6 @@
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedEvaluator"))
                 {
-                    if (_taskManager != null && _taskManager.AreAllTasksCompleted())
-                    {
-                        Logger.Log(Level.Verbose,
-                            "All IMRU tasks have been completed. So ignoring the Evaluator {0} failure.",
-                            failedEvaluator.Id);
-                        return;
-                    }
-
                     var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id);
                     _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id);
                     _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator);
@@ -557,6 +559,11 @@
                             TryRecovery();
                             break;
 
+                        case SystemState.TasksCompleted:
+                            _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
+                            Logger.Log(Level.Info, "The Job has been completed. So ignoring the Evaluator {0} failure.", failedEvaluator.Id);
+                            break;
+
                         case SystemState.ShuttingDown:
                             _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator);
 
@@ -591,30 +598,38 @@
         /// <param name="failedContext"></param>
         public void OnNext(IFailedContext failedContext)
         {
+            Logger.Log(Level.Warning, "Received IFailedContext with Id: {0} from endpoint {1} with systemState {2} in retry#: {3}.", failedContext.Id, GetEndPointFromContext(failedContext), _systemState.CurrentState, _numberOfRetries);
             lock (_lock)
             {
-                if (_taskManager.AreAllTasksCompleted())
+                using (Logger.LogFunction("IMRUDriver::IFailedContext"))
                 {
-                    Logger.Log(Level.Info, "Context with Id: {0} failed but IMRU tasks are completed. So ignoring.", failedContext.Id);
-                    return;
+                    switch (_systemState.CurrentState)
+                    {
+                        case SystemState.TasksCompleted:
+                            Logger.Log(Level.Info, "The Job has been completed. So ignoring the Context {0} failure.", failedContext.Id);
+                            break;
+                        case SystemState.ShuttingDown:
+                        case SystemState.Fail:
+                            break;
+                        default:
+                            var msg = string.Format(CultureInfo.InvariantCulture, "Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
+                            throw new NotImplementedException(msg);
+                    }
                 }
-
-                var msg = string.Format("Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId);
-                Exceptions.Throw(new Exception(msg), Logger);
             }
         }
         #endregion IFailedContext
 
         #region IFailedTask
         /// <summary>
-        /// IFailedTask handler. It specifies what to do when task fails.
-        /// If we get all completed tasks then ignore the failure. Otherwise take the following actions based on the System state:
         /// Case SubmittingTasks/TasksRunning
         ///     This is the first failure received
         ///     Changes the system state to ShuttingDown
         ///     Record failed task in TaskManager
         ///     Closes all the other running tasks and set their state to TaskWaitingForClose
         ///     Try to recover
+        /// Case TasksCompleted:
+        ///     Record, log and then ignore the failure. 
         /// Case ShuttingDown
         ///     This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing.
         ///     Record failed task in TaskManager.
@@ -629,14 +644,6 @@
             {
                 using (Logger.LogFunction("IMRUDriver::IFailedTask"))
                 {
-                    if (_taskManager.AreAllTasksCompleted())
-                    {
-                        Logger.Log(Level.Info,
-                            "Task with Id: {0} failed but all IMRU tasks are completed. So ignoring.",
-                            failedTask.Id);
-                        return;
-                    }
-
                     switch (_systemState.CurrentState)
                     {
                         case SystemState.SubmittingTasks:
@@ -648,6 +655,11 @@
                             TryRecovery();
                             break;
 
+                        case SystemState.TasksCompleted:
+                            _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask);
+                            Logger.Log(Level.Info, "The Job has been completed. So ignoring the Task {0} failure.", failedTask.Id);
+                            break;
+
                         case SystemState.ShuttingDown:
                             _taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask);
                             TryRecovery();
@@ -725,6 +737,7 @@
         /// </summary>
         private void DoneAction()
         {
+            Logger.Log(Level.Info, "Shutting down Evaluators!!!");
             ShutDownAllEvaluators();
             Logger.Log(Level.Info, "{0} done in retry {1}!!!", DoneActionPrefix, _numberOfRetries);
             DisposeResources();
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index a37fa3b..b2fe281 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -89,6 +89,11 @@
         private int _numberOfAppErrors = 0;
 
         /// <summary>
+        /// Indicate if master task is completed running properly
+        /// </summary>
+        private bool _masterTaskCompletedRunning = false;
+
+        /// <summary>
         /// Creates a TaskManager with specified total number of tasks and master task id.
         /// Throws IMRUSystemException if numTasks is smaller than or equals to 0 or masterTaskId is null.
         /// </summary>
@@ -204,6 +209,8 @@
 
         /// <summary>
         /// This method is called when receiving ICompletedTask event during task running or system shutting down.
+        /// If it is master task and if the master task was running, mark _masterTaskCompletedRunning true. That indicates 
+        /// master task has successfully completed, which means the system has got the result from master task. 
         /// Removes the task from running tasks if it was running
         /// Changes the task state from RunningTask to CompletedTask if the task was running
         /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state
@@ -211,6 +218,13 @@
         /// <param name="completedTask"></param>
         internal void RecordCompletedTask(ICompletedTask completedTask)
         {
+            if (completedTask.Id.Equals(_masterTaskId))
+            {
+                if (GetTaskInfo(completedTask.Id).TaskState.CurrentState.Equals(TaskState.TaskRunning))
+                {
+                    _masterTaskCompletedRunning = true;
+                }
+            }
             _runningTasks.Remove(completedTask.Id);
             UpdateState(completedTask.Id, TaskStateEvent.CompletedTask);
         }
@@ -303,6 +317,15 @@
         }
 
         /// <summary>
+        /// Returns true if master task has completed and produced result
+        /// </summary>
+        /// <returns></returns>
+        internal bool IsMasterTaskCompletedRunning()
+        {
+            return _masterTaskCompletedRunning;
+        }
+
+        /// <summary>
         /// Checks if all the tasks are running.
         /// </summary>
         /// <returns></returns>
@@ -313,12 +336,13 @@
         }
 
         /// <summary>
-        /// Checks if all the tasks are completed.
+        /// When master task is completed, that means the system has got the result expected 
+        /// regardless of other mapper tasks returned or not. 
         /// </summary>
         /// <returns></returns>
-        internal bool AreAllTasksCompleted()
+        internal bool IsJobDone()
         {
-            return AreAllTasksInState(StateMachine.TaskState.TaskCompleted) && _tasks.Count == _totalExpectedTasks && _runningTasks.Count == 0;
+            return IsMasterTaskCompletedRunning();
         }
 
         /// <summary>
@@ -528,8 +552,8 @@
                     }
                     SubmitTask(taskId);
                 }
+            }
         }
-    }
 
         private void SubmitTask(string taskId)
         {
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
index d1841e8..1db3d80 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
@@ -51,7 +51,8 @@
             var runningTaskCount = GetMessageCount(lines, "Received IRunningTask");
             var failedEvaluatorCount = GetMessageCount(lines, "Received IFailedEvaluator");
             var failedTaskCount = GetMessageCount(lines, "Received IFailedTask");
-            Assert.Equal((NumOfRetry + 1) * NumNodes, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True((NumOfRetry + 1) * NumNodes >= completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True(NumOfRetry * NumNodes < completedTaskCount + failedEvaluatorCount + failedTaskCount);
             Assert.Equal((NumOfRetry + 1) * NumNodes, runningTaskCount);
             CleanUp(testFolder);
         }
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index 1898066..cd94f43 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -67,8 +67,10 @@
 
             // on each try each task should fail or complete or disappear with failed evaluator
             // and on each try all tasks should start successfully
-            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True((NumberOfRetry + 1) * numTasks >= completedTaskCount + failedEvaluatorCount + failedTaskCount);
+            Assert.True(NumberOfRetry * numTasks < completedTaskCount + failedEvaluatorCount + failedTaskCount);
             Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);
             CleanUp(testFolder);
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
index b94d699..71c14d4 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -64,7 +64,8 @@
             // Rest of the tasks should be canceled and send completed task event to the driver. 
             Assert.Equal(NumberOfRetry * 2, failedEvaluatorCount);
             Assert.Equal(0, failedTaskCount);
-            Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+            Assert.True(((NumberOfRetry + 1) * numTasks) - failedEvaluatorCount >= completedTaskCount);
+            Assert.True((NumberOfRetry * numTasks) - failedEvaluatorCount < completedTaskCount);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
index 1d1176c..d027b8f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnDispose.cs
@@ -62,7 +62,8 @@
             // No failed evaluators or tasks.
             Assert.Equal(0, failedEvaluatorCount);
             Assert.Equal(0, failedTaskCount);
-            Assert.Equal(numTasks, completedTaskCount);
+            Assert.True(numTasks >= completedTaskCount);
+            Assert.True(completedTaskCount >= 1);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
index cd2d6b3..b695d8a 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasksOnInit.cs
@@ -63,7 +63,8 @@
             // Rest of the tasks should be canceled and send completed task event to the driver. 
             Assert.Equal(0, failedEvaluatorCount);
             Assert.Equal(NumberOfRetry * 2, failedTaskCount);
-            Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount);
+            Assert.True(((NumberOfRetry + 1) * numTasks) - failedTaskCount >= completedTaskCount);
+            Assert.True((NumberOfRetry * numTasks) - failedTaskCount < completedTaskCount);
 
             // eventually job succeeds
             Assert.Equal(1, jobSuccess);