Ensured TaskStatus::source field is set for executor status updates.
A status update originating from executor should have the
TaskStatus::source field set to TaskStatus::SOURCE_EXECUTOR. Set this
field in the slave to be future proof (a future where there will be no
executor driver). Previous code has a bug and updated a copy of the
update that was not forwarded. Add some checks for source and reason for
status updates in existing tests.
Review: https://reviews.apache.org/r/32130
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8b62ce0..4053e5b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2503,7 +2503,7 @@
// reliable delivery of status updates. Since executor driver caches
// unacked updates it is important that whoever sent the update gets
// acknowledgement for it.
-void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
+void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
{
LOG(INFO) << "Handling status update " << update << " from " << pid;
@@ -2511,9 +2511,9 @@
state == RUNNING || state == TERMINATING)
<< state;
- TaskStatus status = update.status();
- status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE
- : TaskStatus::SOURCE_EXECUTOR);
+ // Set the source before forwarding the status update.
+ update.mutable_status()->set_source(
+ pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
@@ -2538,6 +2538,8 @@
return;
}
+ TaskStatus status = update.status();
+
Executor* executor = framework->getExecutor(status.task_id());
if (executor == NULL) {
LOG(WARNING) << "Could not find the executor for "
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index f3eab7e..3fa1887 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -181,7 +181,9 @@
// after the update is successfully handled. If pid == UPID()
// no ACK is sent. The latter is used by the slave to send
// status updates it generated (e.g., TASK_LOST).
- void statusUpdate(const StatusUpdate& update, const process::UPID& pid);
+ // NOTE: StatusUpdate is passed by value because it is modified
+ // to ensure source field is set.
+ void statusUpdate(StatusUpdate update, const process::UPID& pid);
// Continue handling the status update after optionally updating the
// container's resources.
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 60c9f35..593d738 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -177,6 +177,7 @@
AWAIT_READY(status);
ASSERT_EQ(TASK_FAILED, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
Clock::resume();
@@ -248,6 +249,8 @@
AWAIT_READY(status);
EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+ EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
// We use 'gc.schedule' as a signal for the executor being cleaned
// up by the slave.
@@ -349,10 +352,10 @@
// Expect two status updates, one for once the mesos-executor says
// the task is running and one for after our overridden command
// above finishes.
- Future<TaskStatus> status1, status2;
+ Future<TaskStatus> statusRunning, statusFinished;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status1))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished));
Try<Subprocess> executor =
subprocess(
@@ -364,12 +367,15 @@
ASSERT_SOME(executor);
- // Scheduler should receive the TASK_RUNNING update.
- AWAIT_READY(status1);
- ASSERT_EQ(TASK_RUNNING, status1.get().state());
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
- AWAIT_READY(status2);
- ASSERT_EQ(TASK_FINISHED, status2.get().state());
+ AWAIT_READY(statusFinished);
+ ASSERT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
AWAIT_READY(wait);
@@ -462,11 +468,15 @@
driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
driver.stop();
driver.join();
@@ -593,11 +603,15 @@
driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
driver.stop();
driver.join();
@@ -685,11 +699,15 @@
driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
driver.stop();
driver.join();
@@ -1104,9 +1122,12 @@
AWAIT_READY(status3);
EXPECT_EQ(TASK_KILLED, status3.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
AWAIT_READY(status4);
EXPECT_EQ(TASK_LOST, status4.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source());
+ EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason());
driver.stop();
driver.join();
@@ -1212,6 +1233,8 @@
AWAIT_READY(status);
EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+ EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
driver.stop();
driver.join();