[REEF-1430] Validate Task Message Send failure => FailedEvaluator Event

This change:
 * updates HeartBeatManager.OnNext(Alarm) to properly handle exception
   thrown during getting or sending evaluator heartbeat.
 * adds test to verify that task message send failure in heartbeat causes
   FailedEvaluator event on driver.

JIRA:
  [REEF-1430](https://issues.apache.org/jira/browse/REEF-1430)

Pull request:
  This closes #1202
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
index 0144573..0d26a64 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
@@ -257,7 +257,7 @@
             }
         }
 
-        private void OnException(Exception e)
+        internal void OnException(Exception e)
         {
             lock (_heartBeatManager)
             {
diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
index 889c67c..5800081 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
@@ -287,9 +287,18 @@
                 
                 if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && EvaluatorRuntime.State == State.RUNNING)
                 {
-                    EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
-                    LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString()));
-                    Send(evaluatorHeartbeatProto);
+                    try
+                    {
+                        EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto();
+                        LOGGER.Log(Level.Verbose,
+                            string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus));
+                        Send(evaluatorHeartbeatProto);
+                    }
+                    catch (Exception e)
+                    {
+                        Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+                        EvaluatorRuntime.OnException(e);
+                    }
                 }
                 else
                 {
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs
new file mode 100644
index 0000000..8d6bf6e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/SendTaskMessageExceptionTest.cs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions;
+using Org.Apache.REEF.Tests.Functional.Common;
+using Org.Apache.REEF.Tests.Functional.Common.Task;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Failure.User
+{
+    /// <summary>
+    /// This class contains a test that tests the behavior upon throwing an Exception when
+    /// sending a Context Message from the Evaluator's IContextMessageSource.
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public sealed class SendTaskMessageExceptionTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof(SendTaskMessageExceptionTest));
+
+        private static readonly string TaskId = "TaskId";
+        private static readonly string ExpectedExceptionMessage = "ExpectedExceptionMessage";
+        private static readonly string ReceivedFailedEvaluator = "ReceivedFailedEvaluator";
+
+        /// <summary>
+        /// This test validates that a failure in the ITaskMessageSource results in a FailedEvaluator.
+        /// </summary>
+        [Fact]
+        public void TestSendTaskMessageException()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+
+            TestRun(DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, GenericType<TestSendTaskMessageExceptionDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TestSendTaskMessageExceptionDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TestSendTaskMessageExceptionDriver>.Class)
+                .Set(DriverConfiguration.OnContextFailed, GenericType<TestSendTaskMessageExceptionDriver>.Class)
+                .Build(),
+                typeof(TestSendTaskMessageExceptionDriver),
+                1,
+                "SendTaskMessageExceptionTest",
+                "local",
+                testFolder);
+
+            ValidateSuccessForLocalRuntime(0, 0, 1, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(ReceivedFailedEvaluator, testFolder);
+            CleanUp(testFolder);
+        }
+
+        private sealed class TestSendTaskMessageExceptionDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IFailedContext>,
+            IObserver<IFailedEvaluator>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private TestSendTaskMessageExceptionDriver(IEvaluatorRequestor requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitTask(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, TaskId)
+                        .Set(TaskConfiguration.Task, GenericType<TestTask>.Class)
+                        .Set(TaskConfiguration.OnSendMessage, GenericType<SendTaskMessageExceptionHandler>.Class)
+                        .Build());
+            }
+
+            /// <summary>
+            /// Throwing an Exception in a task message handler will result in a Failed Evaluator.
+            /// </summary>
+            public void OnNext(IFailedEvaluator value)
+            {
+                Assert.Equal(1, value.FailedContexts.Count);
+                Assert.NotNull(value.EvaluatorException.InnerException);
+                Assert.True(value.EvaluatorException.InnerException is TestSerializableException,
+                    "Unexpected type of evaluator exception: " + value.EvaluatorException.InnerException.GetType());
+                Assert.Equal(ExpectedExceptionMessage, value.EvaluatorException.InnerException.Message);
+
+                Logger.Log(Level.Info, ReceivedFailedEvaluator);
+            }
+
+            public void OnNext(IFailedContext value)
+            {
+                throw new Exception("The Driver does not expect a Failed Context message.");
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// A Context message source that throws an Exception.
+        /// </summary>
+        private sealed class SendTaskMessageExceptionHandler : ITaskMessageSource
+        {
+            private int counter;
+
+            [Inject]
+            private SendTaskMessageExceptionHandler()
+            {
+            }
+
+            public Optional<TaskMessage> Message
+            {
+                get
+                {
+                    counter++;
+                    if (counter == 2)
+                    {
+                        throw new TestSerializableException(ExpectedExceptionMessage);
+                    }
+                    return Optional<TaskMessage>.Empty();
+                }
+            }
+        }
+
+        private sealed class TestTask : WaitingTask
+        {
+            [Inject]
+            private TestTask(EventMonitor eventMonitor) : base(eventMonitor, "WaitingTask started")
+            {
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 0f57571..d483c7f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -87,6 +87,7 @@
     <Compile Include="Functional\Common\Task\NullTask.cs" />
     <Compile Include="Functional\Failure\User\ContextStopExceptionTest.cs" />
     <Compile Include="Functional\Common\EventMonitor.cs" />
+    <Compile Include="Functional\Failure\User\SendTaskMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\SendContextMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" />