Ensured TaskStatus::source field is set for executor status updates.

A status update originating from executor should have the
TaskStatus::source field set to TaskStatus::SOURCE_EXECUTOR. Set this
field in the slave to be future proof (a future where there will be no
executor driver). Previous code has a bug and updated a copy of the
update that was not forwarded. Add some checks for source and reason for
status updates in existing tests.

Review: https://reviews.apache.org/r/32130
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8b62ce0..4053e5b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2503,7 +2503,7 @@
 // reliable delivery of status updates. Since executor driver caches
 // unacked updates it is important that whoever sent the update gets
 // acknowledgement for it.
-void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
+void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
 {
   LOG(INFO) << "Handling status update " << update << " from " << pid;
 
@@ -2511,9 +2511,9 @@
         state == RUNNING || state == TERMINATING)
     << state;
 
-  TaskStatus status = update.status();
-  status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE
-                                  : TaskStatus::SOURCE_EXECUTOR);
+  // Set the source before forwarding the status update.
+  update.mutable_status()->set_source(
+      pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
 
   Framework* framework = getFramework(update.framework_id());
   if (framework == NULL) {
@@ -2538,6 +2538,8 @@
     return;
   }
 
+  TaskStatus status = update.status();
+
   Executor* executor = framework->getExecutor(status.task_id());
   if (executor == NULL) {
     LOG(WARNING)  << "Could not find the executor for "
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index f3eab7e..3fa1887 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -181,7 +181,9 @@
   // after the update is successfully handled. If pid == UPID()
   // no ACK is sent. The latter is used by the slave to send
   // status updates it generated (e.g., TASK_LOST).
-  void statusUpdate(const StatusUpdate& update, const process::UPID& pid);
+  // NOTE: StatusUpdate is passed by value because it is modified
+  // to ensure source field is set.
+  void statusUpdate(StatusUpdate update, const process::UPID& pid);
 
   // Continue handling the status update after optionally updating the
   // container's resources.
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 60c9f35..593d738 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -177,6 +177,7 @@
 
   AWAIT_READY(status);
   ASSERT_EQ(TASK_FAILED, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
 
   Clock::resume();
 
@@ -248,6 +249,8 @@
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
 
   // We use 'gc.schedule' as a signal for the executor being cleaned
   // up by the slave.
@@ -349,10 +352,10 @@
   // Expect two status updates, one for once the mesos-executor says
   // the task is running and one for after our overridden command
   // above finishes.
-  Future<TaskStatus> status1, status2;
+  Future<TaskStatus> statusRunning, statusFinished;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status1))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished));
 
   Try<Subprocess> executor =
     subprocess(
@@ -364,12 +367,15 @@
 
   ASSERT_SOME(executor);
 
-  // Scheduler should receive the TASK_RUNNING update.
-  AWAIT_READY(status1);
-  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
+  AWAIT_READY(statusRunning);
+  ASSERT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
-  AWAIT_READY(status2);
-  ASSERT_EQ(TASK_FINISHED, status2.get().state());
+  AWAIT_READY(statusFinished);
+  ASSERT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   AWAIT_READY(wait);
 
@@ -462,11 +468,15 @@
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
   AWAIT_READY(statusFinished);
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   driver.stop();
   driver.join();
@@ -593,11 +603,15 @@
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
   AWAIT_READY(statusFinished);
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   driver.stop();
   driver.join();
@@ -685,11 +699,15 @@
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
   AWAIT_READY(statusFinished);
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   driver.stop();
   driver.join();
@@ -1104,9 +1122,12 @@
 
   AWAIT_READY(status3);
   EXPECT_EQ(TASK_KILLED, status3.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
 
   AWAIT_READY(status4);
   EXPECT_EQ(TASK_LOST, status4.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason());
 
   driver.stop();
   driver.join();
@@ -1212,6 +1233,8 @@
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
 
   driver.stop();
   driver.join();