Fixed slave to remove a queued task when the task is terminated.
Review: https://reviews.apache.org/r/13207
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c40a9ac..20d76e2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2954,6 +2954,7 @@
if (queuedTasks.contains(taskId)) {
task = new Task(
protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+ queuedTasks.erase(taskId);
} else if (launchedTasks.contains(taskId)) {
// Update the resources if it's been launched.
task = launchedTasks[taskId];
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5ac4d5f..c503842 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -40,6 +40,7 @@
#include "master/master.hpp"
#include "slave/constants.hpp"
+#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/process_isolator.hpp"
#include "slave/slave.hpp"
@@ -53,6 +54,7 @@
using mesos::internal::master::Master;
+using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Isolator;
using mesos::internal::slave::ProcessIsolator;
using mesos::internal::slave::Slave;
@@ -755,6 +757,83 @@
}
+// This test verifies that when an executor terminates before
+// registering with slave, it is properly cleaned up.
+TEST_F(MasterTest, RemoveUnregisteredTerminatedExecutor)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestingIsolator isolator(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&isolator);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Drop the registration message from the executor to the slave.
+ Future<process::Message> registerExecutorMessage =
+ DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(registerExecutorMessage);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> schedule =
+ FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+ // Now kill the executor.
+ dispatch(isolator,
+ &Isolator::killExecutor,
+ offers.get()[0].framework_id(),
+ DEFAULT_EXECUTOR_ID);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+
+ // We use 'gc.schedule' as a signal for the executor being cleaned
+ // up by the slave.
+ AWAIT_READY(schedule);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}
+
+
TEST_F(MasterTest, MasterInfo)
{
Try<PID<Master> > master = StartMaster();