| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <stdint.h> |
| #include <unistd.h> |
| |
| #include <gtest/gtest.h> |
| |
| #include <string> |
| |
| #include <mesos/executor.hpp> |
| #include <mesos/resources.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <process/dispatch.hpp> |
| #include <process/gmock.hpp> |
| #include <process/owned.hpp> |
| #include <process/reap.hpp> |
| |
| #include <stout/none.hpp> |
| #include <stout/numify.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/path.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "master/detector.hpp" |
| #include "master/master.hpp" |
| |
| #include "slave/gc.hpp" |
| #include "slave/paths.hpp" |
| #include "slave/slave.hpp" |
| #include "slave/state.hpp" |
| |
| #include "slave/containerizer/containerizer.hpp" |
| |
| #include "messages/messages.hpp" |
| |
| #include "tests/containerizer.hpp" |
| #include "tests/mesos.hpp" |
| #include "tests/utils.hpp" |
| |
| using namespace mesos; |
| using namespace mesos::internal; |
| using namespace mesos::internal::slave; |
| using namespace mesos::internal::tests; |
| |
| using namespace process; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::slave::GarbageCollectorProcess; |
| using mesos::internal::slave::Containerizer; |
| |
| using std::map; |
| using std::string; |
| using std::vector; |
| |
| using testing::_; |
| using testing::AtMost; |
| using testing::DoAll; |
| using testing::Eq; |
| using testing::Return; |
| using testing::SaveArg; |
| |
| |
| class SlaveStateTest : public TemporaryDirectoryTest {}; |
| |
| |
| TEST_F(SlaveStateTest, CheckpointProtobuf) |
| { |
| // Checkpoint slave id. |
| SlaveID expected; |
| expected.set_value("slave1"); |
| |
| const string& file = "slave.id"; |
| slave::state::checkpoint(file, expected); |
| |
| const Result<SlaveID>& actual = ::protobuf::read<SlaveID>(file); |
| ASSERT_SOME(actual); |
| |
| ASSERT_SOME_EQ(expected, actual); |
| } |
| |
| |
| TEST_F(SlaveStateTest, CheckpointString) |
| { |
| // Checkpoint a test string. |
| const string expected = "test"; |
| const string file = "test-file"; |
| slave::state::checkpoint(file, expected); |
| |
| ASSERT_SOME_EQ(expected, os::read(file)); |
| } |
| |
| template <typename T> |
| class SlaveRecoveryTest : public ContainerizerTest<T> |
| { |
| public: |
| virtual slave::Flags CreateSlaveFlags() |
| { |
| slave::Flags flags = ContainerizerTest<T>::CreateSlaveFlags(); |
| |
| // Setup recovery slave flags. |
| flags.checkpoint = true; |
| flags.recover = "reconnect"; |
| flags.strict = true; |
| |
| return flags; |
| } |
| }; |
| |
| |
| // Containerizer types to run the tests. |
| typedef ::testing::Types<slave::MesosContainerizer> ContainerizerTypes; |
| |
| |
| TYPED_TEST_CASE(SlaveRecoveryTest, ContainerizerTypes); |
| |
| // Enable checkpointing on the slave and ensure recovery works. |
| TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<Message> registerFrameworkMessage = |
| FUTURE_MESSAGE(Eq(RegisterFrameworkMessage().GetTypeName()), _, _); |
| |
| driver.start(); |
| |
| // Capture the framework pid. |
| AWAIT_READY(registerFrameworkMessage); |
| UPID frameworkPid = registerFrameworkMessage.get().from; |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| SlaveID slaveId = offers.get()[0].slave_id(); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| // Scheduler expectations. |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillRepeatedly(Return()); |
| |
| // Message expectations. |
| Future<Message> registerExecutorMessage = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<StatusUpdateMessage> update = |
| FUTURE_PROTOBUF(StatusUpdateMessage(), Eq(master.get()), _); |
| |
| Future<StatusUpdateAcknowledgementMessage> ack = |
| FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); |
| |
| Future<Nothing> _ack = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Capture the executor pids. |
| AWAIT_READY(registerExecutorMessage); |
| RegisterExecutorMessage registerExecutor; |
| registerExecutor.ParseFromString(registerExecutorMessage.get().body); |
| ExecutorID executorId = registerExecutor.executor_id(); |
| UPID libprocessPid = registerExecutorMessage.get().from; |
| |
| // Capture the update. |
| AWAIT_READY(update); |
| EXPECT_EQ(TASK_RUNNING, update.get().update().status().state()); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_ack); |
| |
| // Recover the state. |
| Result<slave::state::SlaveState> recover = slave::state::recover( |
| paths::getMetaRootDir(flags.work_dir), true); |
| |
| ASSERT_SOME(recover); |
| |
| slave::state::SlaveState state = recover.get(); |
| |
| // Check slave id. |
| ASSERT_EQ(slaveId, state.id); |
| |
| // Check framework id and pid. |
| ASSERT_TRUE(state.frameworks.contains(frameworkId)); |
| ASSERT_SOME_EQ(frameworkPid, state.frameworks[frameworkId].pid); |
| |
| ASSERT_TRUE(state.frameworks[frameworkId].executors.contains(executorId)); |
| |
| // Check executor id and pids. |
| const Option<ContainerID>& containerId = |
| state.frameworks[frameworkId].executors[executorId].latest; |
| ASSERT_SOME(containerId); |
| |
| ASSERT_TRUE(state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs.contains(containerId.get())); |
| |
| ASSERT_SOME_EQ( |
| libprocessPid, |
| state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs[containerId.get()] |
| .libprocessPid); |
| |
| |
| // Check task id and info. |
| ASSERT_TRUE(state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs[containerId.get()] |
| .tasks.contains(task.task_id())); |
| |
| const Task& t = mesos::internal::protobuf::createTask( |
| task, TASK_STAGING, executorId, frameworkId); |
| |
| ASSERT_SOME_EQ( |
| t, |
| state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs[containerId.get()] |
| .tasks[task.task_id()] |
| .info); |
| |
| // Check status update and ack. |
| ASSERT_EQ( |
| 1U, |
| state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs[containerId.get()] |
| .tasks[task.task_id()] |
| .updates.size()); |
| |
| ASSERT_EQ( |
| update.get().update().uuid(), |
| state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs[containerId.get()] |
| .tasks[task.task_id()] |
| .updates.front().uuid()); |
| |
| ASSERT_TRUE(state |
| .frameworks[frameworkId] |
| .executors[executorId] |
| .runs[containerId.get()] |
| .tasks[task.task_id()] |
| .acks.contains(UUID::fromBytes(ack.get().uuid()))); |
| |
| // Shut down the executor manually so that it doesn't hang around |
| // after the test finishes. |
| process::post(libprocessPid, ShutdownExecutorMessage()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| |
| delete containerizer.get(); |
| } |
| |
| |
| // The slave is killed before the update reaches the scheduler. |
| // When the slave comes back up it resends the unacknowledged update. |
| TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| // Message expectations. |
| Future<Message> registerExecutor = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| // Drop the first update from the executor. |
| Future<StatusUpdateMessage> update = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Capture the executor pid. |
| AWAIT_READY(registerExecutor); |
| UPID executorPid = registerExecutor.get().from; |
| |
| // Wait for the status update drop. |
| AWAIT_READY(update); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_RUNNING, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is stopped before the first update for a task is received |
| // from the executor. When it comes back up with recovery=reconnect, make |
| // sure the executor re-registers and the slave properly sends the update. |
| TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| // Drop the first update from the executor. |
| Future<StatusUpdateMessage> statusUpdate = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Stop the slave before the status update is received. |
| AWAIT_READY(statusUpdate); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<Message> reregisterExecutorMessage = |
| FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure the executor re-registers. |
| AWAIT_READY(reregisterExecutorMessage); |
| UPID executorPid = reregisterExecutorMessage.get().from; |
| |
| ReregisterExecutorMessage reregister; |
| reregister.ParseFromString(reregisterExecutorMessage.get().body); |
| |
| // Executor should inform about the unacknowledged update. |
| ASSERT_EQ(1, reregister.updates_size()); |
| const StatusUpdate& update = reregister.updates(0); |
| ASSERT_EQ(task.task_id(), update.status().task_id()); |
| ASSERT_EQ(TASK_RUNNING, update.status().state()); |
| |
| // Scheduler should receive the recovered update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_RUNNING, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is stopped before the (command) executor is registered. |
| // When it comes back up with recovery=reconnect, make sure the |
| // executor is killed and the task is transitioned to FAILED. |
| TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| // Drop the executor registration message. |
| Future<Message> registerExecutor = |
| DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Stop the slave before the executor is registered. |
| AWAIT_READY(registerExecutor); |
| UPID executorPid = registerExecutor.get().from; |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| |
| // Now advance time until the reaper reaps the executor. |
| while (status.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Scheduler should receive the TASK_FAILED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FAILED, status.get().state()); |
| |
| // Master should subsequently reoffer the same resources. |
| while (offers2.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is stopped after a non-terminal update is received. |
| // The command executor terminates when the slave is down. |
| // When it comes back up with recovery=reconnect, make |
| // sure the task is properly transitioned to FAILED. |
| TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| Future<Message> registerExecutor = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> ack = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Capture the executor pid. |
| AWAIT_READY(registerExecutor); |
| UPID executorPid = registerExecutor.get().from; |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(ack); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| // Now shut down the executor, when the slave is down. |
| process::post(executorPid, ShutdownExecutorMessage()); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| |
| // Now advance time until the reaper reaps the executor. |
| while (status.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Scheduler should receive the TASK_FAILED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FAILED, status.get().state()); |
| |
| while (offers2.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Master should subsequently reoffer the same resources. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is stopped after a non-terminal update is received. |
| // The command executor is expected to self-terminate while the slave |
| // is down, because the recovery timeout elapses. |
| // When the slave comes back up with recovery=reconnect, make |
| // sure the task is properly transitioned to FAILED. |
| // TODO(bmahler): Disabled for MESOS-685: the exited() event for the |
| // slave will not be delivered to the executor driver. |
| TYPED_TEST(SlaveRecoveryTest, DISABLED_RecoveryTimeout) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Set a short recovery timeout, as we can't control the executor |
| // driver time when using the process / cgroups isolators. |
| slave::Flags flags = this->CreateSlaveFlags(); |
| flags.recovery_timeout = Milliseconds(1); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| // Ensure the executor terminates by causing the recovery timeout |
| // to elapse while disconnected from the slave. |
| os::sleep(Milliseconds(1)); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| Clock::resume(); |
| |
| // Scheduler should receive the TASK_FAILED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FAILED, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is stopped after an executor is completed (i.e., it has |
| // terminated and all its updates have been acknowledged). |
| // When it comes back up with recovery=reconnect, make |
| // sure the recovery successfully completes. |
| TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "exit 0"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Short-lived task. |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .Times(2); // TASK_RUNNING and TASK_FINISHED updates. |
| |
| EXPECT_CALL(sched, offerRescinded(_, _)) |
| .Times(AtMost(1)); |
| |
| Future<Nothing> schedule = FUTURE_DISPATCH( |
| _, &GarbageCollectorProcess::schedule); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // We use 'gc.schedule' as a proxy for the cleanup of the executor. |
| AWAIT_READY(schedule); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<Nothing> schedule2 = FUTURE_DISPATCH( |
| _, &GarbageCollectorProcess::schedule); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // We use 'gc.schedule' as a proxy for the cleanup of the executor. |
| AWAIT_READY(schedule2); |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is stopped after a non-terminal update is received. |
| // Slave is restarted in recovery=cleanup mode. It kills the command |
| // executor, and transitions the task to FAILED. |
| TYPED_TEST(SlaveRecoveryTest, CleanupExecutor) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> ack = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(ack); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Slave in cleanup mode shouldn't reregister with slave and hence |
| // no offers should be made by the master. |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .Times(0); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| // Restart the slave in 'cleanup' recovery mode with a new isolator. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| flags.recover = "cleanup"; |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| // Now advance time until the reaper reaps the executor. |
| while (status.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Scheduler should receive the TASK_FAILED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FAILED, status.get().state()); |
| |
| // Wait for recovery to complete. |
| AWAIT_READY(__recover); |
| Clock::settle(); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // This test checks whether a non-checkpointing framework is |
| // properly removed, when a checkpointing slave is disconnected. |
| TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Disable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(false); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch 2 tasks from this offer. |
| vector<TaskInfo> tasks; |
| Offer offer = offers.get()[0]; |
| |
| Offer offer1 = offer; |
| offer1.mutable_resources()->CopyFrom( |
| Resources::parse("cpus:1;mem:512").get()); |
| tasks.push_back(createTask(offer1, "sleep 1000")); // Long-running task |
| |
| Offer offer2 = offer; |
| offer2.mutable_resources()->CopyFrom( |
| Resources::parse("cpus:1;mem:512").get()); |
| tasks.push_back(createTask(offer2, "sleep 1000")); // Long-running task |
| |
| ASSERT_LE(Resources(offer1.resources()) + Resources(offer2.resources()), |
| Resources(offer.resources())); |
| |
| Future<Nothing> update1; |
| Future<Nothing> update2; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureSatisfy(&update1)) |
| .WillOnce(FutureSatisfy(&update2)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Wait for TASK_RUNNING updates from the tasks. |
| AWAIT_READY(update1); |
| AWAIT_READY(update2); |
| |
| // The master should generate TASK_LOST updates once the slave is stopped. |
| Future<TaskStatus> status1; |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status1)) |
| .WillOnce(FutureArg<1>(&status2)); |
| |
| this->Stop(slave.get()); |
| |
| // Scheduler should receive the TASK_LOST updates. |
| AWAIT_READY(status1); |
| ASSERT_EQ(TASK_LOST, status1.get().state()); |
| |
| AWAIT_READY(status2); |
| ASSERT_EQ(TASK_LOST, status2.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| // Destroy all the containers before we destroy the containerizer. We need to |
| // do this manually because there are no slaves left in the cluster. |
| Future<hashset<ContainerID> > containers = containerizer.get()->containers(); |
| AWAIT_READY(containers); |
| |
| foreach (const ContainerID& containerId, containers.get()) { |
| Future<containerizer::Termination> wait = |
| containerizer.get()->wait(containerId); |
| |
| containerizer.get()->destroy(containerId); |
| AWAIT_READY(wait); |
| } |
| |
| delete containerizer.get(); |
| |
| this->Shutdown(); |
| } |
| |
| |
| // This test ensures that no checkpointing happens for a |
| // framework that has disabled checkpointing. |
| TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Disable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(false); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| Future<Nothing> update; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureSatisfy(&update)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Wait for TASK_RUNNING update. |
| AWAIT_READY(update); |
| |
| Clock::pause(); |
| |
| Future<Nothing> updateFramework = FUTURE_DISPATCH(_, &Slave::updateFramework); |
| |
| // Simulate a 'UpdateFrameworkMessage' to ensure framework pid is |
| // not being checkpointed. |
| process::dispatch(slave.get(), &Slave::updateFramework, frameworkId, ""); |
| |
| AWAIT_READY(updateFramework); |
| |
| Clock::settle(); // Wait for the slave to act on the dispatch. |
| |
| // Ensure that the framework info is not being checkpointed. |
| const string& path = paths::getFrameworkPath( |
| paths::getMetaRootDir(flags.work_dir), |
| task.slave_id(), |
| frameworkId); |
| |
| ASSERT_FALSE(os::exists(path)); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer.get(); |
| } |
| |
| |
| // This test ensures that a non-checkpointing slave's resources are not offered |
| // to a framework that requires checkpointing. |
| TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Disable checkpointing for the slave. |
| slave::Flags flags = this->CreateSlaveFlags(); |
| flags.checkpoint = false; |
| |
| |
| Future<RegisterSlaveMessage> registerSlaveMessage = |
| FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _); |
| |
| Try<TypeParam*> containerizer = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(registerSlaveMessage); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .Times(0); // No offers should be received! |
| |
| Clock::pause(); |
| |
| driver.start(); |
| |
| // Wait for scheduler to register. We do a Clock::settle() here |
| // to ensure that no offers are received by the scheduler. |
| AWAIT_READY(registered); |
| Clock::settle(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer.get(); |
| } |
| |
| // Scheduler asks a restarted slave to kill a task that has been |
| // running before the slave restarted. This test ensures that a |
| // restarted slave is able to communicate with all components |
| // (scheduler, master, executor). |
| TYPED_TEST(SlaveRecoveryTest, KillTask) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> ack = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(ack); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<ReregisterExecutorMessage> reregisterExecutorMessage = |
| FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new isolator. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| // Wait for the executor to re-register. |
| AWAIT_READY(reregisterExecutorMessage); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| Clock::resume(); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Kill the task. |
| driver.killTask(task.task_id()); |
| |
| // Wait for TASK_KILLED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_KILLED, status.get().state()); |
| |
| Clock::pause(); |
| |
| // Advance the clock until the allocator allocates |
| // the recovered resources. |
| while (offers2.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // When the slave is down we modify the BOOT_ID_FILE to simulate a |
| // reboot. The subsequent run of the slave should not recover. |
| TYPED_TEST(SlaveRecoveryTest, Reboot) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| flags.strict = false; |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| // Capture the slave and framework ids. |
| SlaveID slaveId = offers1.get()[0].slave_id(); |
| FrameworkID frameworkId = offers1.get()[0].framework_id(); |
| |
| Future<Message> registerExecutorMessage = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<Nothing> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureSatisfy(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Capture the executor ID and PID. |
| AWAIT_READY(registerExecutorMessage); |
| RegisterExecutorMessage registerExecutor; |
| registerExecutor.ParseFromString(registerExecutorMessage.get().body); |
| ExecutorID executorId = registerExecutor.executor_id(); |
| UPID executorPid = registerExecutorMessage.get().from; |
| |
| // Wait for TASK_RUNNING update. |
| AWAIT_READY(status); |
| |
| // Capture the container ID. |
| Future<hashset<ContainerID> > containers = |
| containerizer1.get()->containers(); |
| |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers.get().size()); |
| |
| ContainerID containerId = *containers.get().begin(); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Get the executor's pid so we can reap it to properly simulate a |
| // reboot. |
| string pidPath = paths::getForkedPidPath( |
| paths::getMetaRootDir(flags.work_dir), |
| slaveId, |
| frameworkId, |
| executorId, |
| containerId); |
| |
| Try<string> read = os::read(pidPath); |
| ASSERT_SOME(read); |
| |
| Try<pid_t> pid = numify<pid_t>(read.get()); |
| ASSERT_SOME(pid); |
| |
| Future<Option<int> > executorStatus = process::reap(pid.get()); |
| |
| // Shut down the executor manually and wait until it's been reaped. |
| process::post(executorPid, ShutdownExecutorMessage()); |
| |
| AWAIT_READY(executorStatus); |
| |
| // Modify the boot ID to simulate a reboot. |
| ASSERT_SOME(os::write( |
| paths::getBootIdPath(paths::getMetaRootDir(flags.work_dir)), |
| "rebooted! ;)")); |
| |
| Future<RegisterSlaveMessage> registerSlave = |
| FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(registerSlave); |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // When the slave is down we remove the "latest" symlink in the |
| // executor's run directory, to simulate a situation where the |
| // recovered slave (--no-strict) cannot recover the executor and |
| // hence schedules it for gc. |
| TYPED_TEST(SlaveRecoveryTest, GCExecutor) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| flags.strict = false; |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| // Capture the slave and framework ids. |
| SlaveID slaveId = offers1.get()[0].slave_id(); |
| FrameworkID frameworkId = offers1.get()[0].framework_id(); |
| |
| Future<Message> registerExecutorMessage = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<Nothing> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureSatisfy(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Capture the executor id and pid. |
| AWAIT_READY(registerExecutorMessage); |
| RegisterExecutorMessage registerExecutor; |
| registerExecutor.ParseFromString(registerExecutorMessage.get().body); |
| ExecutorID executorId = registerExecutor.executor_id(); |
| UPID executorPid = registerExecutorMessage.get().from; |
| |
| // Wait for TASK_RUNNING update. |
| AWAIT_READY(status); |
| |
| this->Stop(slave.get()); |
| |
| // Destroy all the containers before we destroy the containerizer. |
| Future<hashset<ContainerID> > containers = containerizer1.get()->containers(); |
| AWAIT_READY(containers); |
| |
| foreach (const ContainerID& containerId, containers.get()) { |
| Future<containerizer::Termination> wait = |
| containerizer1.get()->wait(containerId); |
| |
| containerizer1.get()->destroy(containerId); |
| AWAIT_READY(wait); |
| } |
| |
| delete containerizer1.get(); |
| |
| // Remove the symlink "latest" in the executor directory |
| // to simulate a non-recoverable executor. |
| ASSERT_SOME(os::rm(paths::getExecutorLatestRunPath( |
| paths::getMetaRootDir(flags.work_dir), |
| slaveId, |
| frameworkId, |
| executorId))); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new isolator. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(_recover); |
| |
| Clock::pause(); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| |
| Clock::settle(); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| Clock::advance(flags.gc_delay); |
| |
| Clock::settle(); |
| |
| // Executor's work and meta directories should be gc'ed by now. |
| ASSERT_FALSE(os::exists(paths::getExecutorPath( |
| flags.work_dir, slaveId, frameworkId, executorId))); |
| |
| ASSERT_FALSE(os::exists(paths::getExecutorPath( |
| paths::getMetaRootDir(flags.work_dir), |
| slaveId, |
| frameworkId, |
| executorId))); |
| |
| while (offers2.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave is asked to shutdown. When it comes back up, it should |
| // register as a new slave. |
| TYPED_TEST(SlaveRecoveryTest, ShutdownSlave) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) // Initial offer. |
| .WillOnce(FutureArg<1>(&offers2)); // Task resources re-offered. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| Future<Nothing> statusUpdate1; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureSatisfy(&statusUpdate1)) |
| .WillOnce(Return()); // Ignore TASK_FAILED update. |
| |
| Future<Message> registerExecutor = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Capture the executor pid. |
| AWAIT_READY(registerExecutor); |
| UPID executorPid = registerExecutor.get().from; |
| |
| AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update. |
| |
| EXPECT_CALL(sched, offerRescinded(_, _)) |
| .Times(AtMost(1)); |
| |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| // We shut down the executor here so that a shutting down slave |
| // does not spend too much time waiting for the executor to exit. |
| process::post(executorPid, ShutdownExecutorMessage()); |
| |
| Clock::pause(); |
| |
| // Now advance time until the reaper reaps the executor. |
| while (executorTerminated.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| AWAIT_READY(executorTerminated); |
| AWAIT_READY(offers2); |
| |
| Clock::resume(); |
| |
| EXPECT_CALL(sched, slaveLost(_, _)) |
| .Times(AtMost(1)); |
| |
| this->Stop(slave.get(), true); // Send a "shut down". |
| delete containerizer1.get(); |
| |
| Future<vector<Offer> > offers3; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers3)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Now restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure that the slave registered with a new id. |
| AWAIT_READY(offers3); |
| |
| EXPECT_NE(0u, offers3.get().size()); |
| // Make sure all slave resources are reoffered. |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers3.get()[0].resources())); |
| |
| // Ensure the slave id is different. |
| ASSERT_NE( |
| offers1.get()[0].slave_id().value(), offers3.get()[0].slave_id().value()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The slave should shutdown when it receives a SIGUSR1 signal. |
| TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) // Initial offer. |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_RUNNING, status.get().state()); |
| |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status2)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(_, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| Future<Nothing> signaled = |
| FUTURE_DISPATCH(_, &Slave::signaled); |
| |
| Future<UnregisterSlaveMessage> unregisterSlaveMessage = |
| FUTURE_PROTOBUF(UnregisterSlaveMessage(), slave.get(), master.get()); |
| |
| // Send SIGUSR1 signal to the slave. |
| kill(getpid(), SIGUSR1); |
| |
| AWAIT_READY(signaled); |
| AWAIT_READY(unregisterSlaveMessage); |
| AWAIT_READY(executorTerminated); |
| |
| // The master should send a TASK_LOST and slaveLost. |
| AWAIT_READY(status2); |
| ASSERT_EQ(TASK_LOST, status2.get().state()); |
| AWAIT_READY(slaveLost); |
| |
| // Make sure the slave terminates. |
| ASSERT_TRUE(process::wait(slave.get(), Seconds(10))); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer.get(); |
| } |
| |
| |
| // The checkpointing slave fails to do recovery and tries to register |
| // as a new slave. The master should give it a new id and transition |
| // all the tasks of the old slave to LOST. |
| TYPED_TEST(SlaveRecoveryTest, RegisterDisconnectedSlave) |
| { |
| master::Flags masterFlags = this->CreateMasterFlags(); |
| |
| // Disable authentication so the spoofed re-registration below works. |
| masterFlags.authenticate_slaves = false; |
| |
| Try<PID<Master> > master = this->StartMaster(masterFlags); |
| ASSERT_SOME(master); |
| |
| Future<RegisterSlaveMessage> registerSlaveMessage = |
| FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(registerSlaveMessage); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| // Capture the slave and framework ids. |
| SlaveID slaveId = offers.get()[0].slave_id(); |
| FrameworkID frameworkId = offers.get()[0].framework_id(); |
| |
| Future<Message> registerExecutorMessage = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<Nothing> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureSatisfy(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Capture the executor pid. |
| AWAIT_READY(registerExecutorMessage); |
| RegisterExecutorMessage registerExecutor; |
| registerExecutor.ParseFromString(registerExecutorMessage.get().body); |
| UPID executorPid = registerExecutorMessage.get().from; |
| |
| // Wait for TASK_RUNNING update. |
| AWAIT_READY(status); |
| |
| EXPECT_CALL(sched, slaveLost(_, _)) |
| .Times(AtMost(1)); |
| |
| this->Stop(slave.get()); |
| |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status2)); |
| |
| // Spoof the registration attempt of a checkpointing slave |
| // that failed recovery. We do this because simply restarting |
| // the slave will result in a slave with a different pid than |
| // the previous one. |
| post(slave.get(), master.get(), registerSlaveMessage.get()); |
| |
| // Scheduler should get a TASK_LOST message. |
| AWAIT_READY(status2); |
| ASSERT_EQ(TASK_LOST, status2.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| // Destroy all the containers before we destroy the containerizer. We need to |
| // do this manually because there are no slaves left in the cluster. |
| Future<hashset<ContainerID> > containers = containerizer.get()->containers(); |
| AWAIT_READY(containers); |
| |
| foreach (const ContainerID& containerId, containers.get()) { |
| Future<containerizer::Termination> wait = |
| containerizer.get()->wait(containerId); |
| |
| containerizer.get()->destroy(containerId); |
| AWAIT_READY(wait); |
| } |
| |
| delete containerizer.get(); |
| |
| this->Shutdown(); |
| } |
| |
| |
| // This test verifies that a KillTask message received by the |
| // master when a checkpointing slave is disconnected is properly |
| // reconciled when the slave reregisters. |
| TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<RegisterSlaveMessage> registerSlaveMessage = |
| FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(registerSlaveMessage); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| // Capture the slave and framework ids. |
| SlaveID slaveId = offers1.get()[0].slave_id(); |
| FrameworkID frameworkId = offers1.get()[0].framework_id(); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); // TASK_RUNNING |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Wait for TASK_RUNNING update to be acknowledged. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Now send a KillTask message to the master. This will not be |
| // received by the slave because it is down. |
| driver.killTask(task.task_id()); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| // Now restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Scheduler should get a TASK_KILLED message. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_KILLED, status.get().state()); |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // This test verifies that when the slave recovers and re-registers |
| // with a framework that was shutdown when the slave was down, it gets |
| // a ShutdownFramework message. |
| TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<RegisterSlaveMessage> registerSlaveMessage = |
| FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(registerSlaveMessage); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Capture the slave and framework ids. |
| SlaveID slaveId = offers.get()[0].slave_id(); |
| FrameworkID frameworkId = offers.get()[0].framework_id(); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); // TASK_RUNNING |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Wait for TASK_RUNNING update to be acknowledged. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<UnregisterFrameworkMessage> unregisterFrameworkMessage = |
| FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _); |
| |
| // Now stop the framework. |
| driver.stop(); |
| driver.join(); |
| |
| // Wait util the framework is removed. |
| AWAIT_READY(unregisterFrameworkMessage); |
| |
| Future<ShutdownFrameworkMessage> shutdownFrameworkMessage = |
| FUTURE_PROTOBUF(ShutdownFrameworkMessage(), _, _); |
| |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| // Now restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Slave should get a ShutdownFrameworkMessage. |
| AWAIT_READY(shutdownFrameworkMessage); |
| |
| // Ensure that the executor is terminated. |
| AWAIT_READY(executorTerminated); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // This ensures that reconciliation properly deals with tasks |
| // present in the master and missing from the slave. Notably: |
| // 1. The tasks are sent to LOST. |
| // 2. The task resources are recovered. |
| // TODO(bmahler): Ensure the executor resources are recovered by |
| // using an explicit executor. |
| TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) |
| { |
| MockAllocatorProcess<master::allocator::HierarchicalDRFAllocatorProcess> |
| allocator; |
| |
| EXPECT_CALL(allocator, initialize(_, _, _)); |
| |
| Try<PID<Master> > master = this->StartMaster(&allocator); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| EXPECT_CALL(allocator, slaveAdded(_, _, _)); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(allocator, frameworkAdded(_, _, _)); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| // Start a task on the slave so that the master has knowledge of it. |
| // We'll ensure the slave does not have this task when it |
| // re-registers by wiping the relevant meta directory. |
| TaskInfo task = createTask(offers1.get()[0], "sleep 10"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| EXPECT_CALL(allocator, slaveDeactivated(_)); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Construct the framework meta directory that needs wiping. |
| string frameworkPath = paths::getFrameworkPath( |
| paths::getMetaRootDir(flags.work_dir), |
| offers1.get()[0].slave_id(), |
| frameworkId.get()); |
| |
| // Kill the forked pid, so that we don't leak a child process. |
| // Construct the executor id from the task id, since this test |
| // uses a command executor. |
| ExecutorID executorId; |
| executorId.set_value(task.task_id().value()); |
| |
| string executorPath = paths::getExecutorLatestRunPath( |
| paths::getMetaRootDir(flags.work_dir), |
| offers1.get()[0].slave_id(), |
| frameworkId.get(), |
| executorId); |
| |
| Try<string> read = os::read( |
| path::join(executorPath, "pids", paths::FORKED_PID_FILE)); |
| ASSERT_SOME(read); |
| |
| Try<pid_t> pid = numify<pid_t>(read.get()); |
| ASSERT_SOME(pid); |
| |
| ASSERT_SOME(os::killtree(pid.get(), SIGKILL)); |
| |
| // Remove the framework meta directory, so that the slave will not |
| // recover the task. |
| ASSERT_SOME(os::rmdir(frameworkPath, true)); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| EXPECT_CALL(allocator, slaveActivated(_)); |
| EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(_recover); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // Wait for TASK_LOST update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_LOST, status.get().state()); |
| |
| Clock::pause(); |
| |
| // Advance the clock until the allocator allocates |
| // the recovered resources. |
| while (offers2.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| Clock::resume(); |
| |
| EXPECT_CALL(allocator, frameworkDeactivated(_)) |
| .WillRepeatedly(Return()); |
| EXPECT_CALL(allocator, frameworkRemoved(_)) |
| .WillRepeatedly(Return()); |
| |
| // If there was an outstanding offer, we can get a call to |
| // resourcesRecovered when we stop the scheduler. |
| EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) |
| .WillRepeatedly(Return()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // Scheduler asks a restarted slave to kill a task that has been |
| // running before the slave restarted. A scheduler failover happens |
| // when the slave is down. This test verifies that a scheduler |
| // failover will not affect the slave recovery process. |
| TYPED_TEST(SlaveRecoveryTest, SchedulerFailover) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Launch the first (i.e., failing) scheduler. |
| MockScheduler sched1; |
| |
| FrameworkInfo framework1; |
| framework1.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| framework1.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver1( |
| &sched1, framework1, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched1, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver1.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| // Create a long running task. |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(sched1, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver1.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler. |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; |
| framework2.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| framework2.mutable_id()->MergeFrom(frameworkId.get()); |
| framework2.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> sched2Registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _)) |
| .WillOnce(FutureSatisfy(&sched2Registered)); |
| |
| Future<Nothing> sched1Error; |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) |
| .WillOnce(FutureSatisfy(&sched1Error)); |
| |
| driver2.start(); |
| |
| AWAIT_READY(sched2Registered); |
| AWAIT_READY(sched1Error); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<ReregisterExecutorMessage> reregisterExecutorMessage = |
| FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| // Wait for the executor to re-register. |
| AWAIT_READY(reregisterExecutorMessage); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| Clock::resume(); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched2, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched2, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Kill the task. |
| driver2.killTask(task.task_id()); |
| |
| // Wait for TASK_KILLED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_KILLED, status.get().state()); |
| |
| Clock::pause(); |
| |
| // Advance the clock until the allocator allocates |
| // the recovered resources. |
| while (offers2.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| Clock::resume(); |
| |
| driver2.stop(); |
| driver2.join(); |
| |
| driver1.stop(); |
| driver1.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // The purpose of this test is to ensure that during a network |
| // partition, the master will remove a partitioned slave. When the |
| // partition is removed, the slave will receive a ShutdownMessage. |
| // When the slave starts again on the same host, we verify that the |
| // slave will not try to reregister itself with the master. It will |
| // register itself with the master and get a new slave id. |
| TYPED_TEST(SlaveRecoveryTest, PartitionedSlave) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Set these expectations up before we spawn the slave so that we |
| // don't miss the first PING. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| |
| // Drop all the PONGs to simulate slave partition. |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_NE(0u, offers.get().size()); |
| |
| // Long running task. |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| Future<ShutdownMessage> shutdownMessage = |
| FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| // The master will have notified the framework of the lost task. |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_LOST, status.get().state()); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // Wait for the executor to be terminated. |
| while (executorTerminated.isPending()) { |
| Clock::advance(Seconds(1)); |
| Clock::settle(); |
| } |
| |
| AWAIT_READY(executorTerminated); |
| Clock::settle(); |
| |
| Clock::resume(); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<RegisterSlaveMessage> registerSlaveMessage = |
| FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new isolator. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(registerSlaveMessage); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // This test verifies that if the master changes when the slave is |
| // down, the slave can still recover the task when it restarts. We |
| // verify its correctness by killing the task from the scheduler. |
| TYPED_TEST(SlaveRecoveryTest, MasterFailover) |
| { |
| // Step 1. Run a task. |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| Owned<StandaloneMasterDetector> detector( |
| new StandaloneMasterDetector(master.get())); |
| TestingMesosSchedulerDriver driver( |
| &sched, frameworkInfo, DEFAULT_CREDENTIAL, detector.get()); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| Future<process::Message> frameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Step 2. Simulate failed over master by restarting the master. |
| this->Stop(master.get()); |
| master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| EXPECT_CALL(sched, disconnected(_)); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| // Simulate a new master detected event to the scheduler. |
| detector->appoint(master.get()); |
| |
| // Framework should get a registered callback. |
| AWAIT_READY(registered); |
| |
| // Step 3. Restart the slave and kill the task. |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<ReregisterExecutorMessage> reregisterExecutorMessage = |
| FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new isolator. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| // Wait for the executor to re-register. |
| AWAIT_READY(reregisterExecutorMessage); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| |
| Clock::resume(); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Wait for the executor to terminate before shutting down the |
| // slave in order to give cgroups (if applicable) time to clean up. |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| // Kill the task. |
| driver.killTask(task.task_id()); |
| |
| // Wait for TASK_KILLED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_KILLED, status.get().state()); |
| |
| // Make sure all slave resources are reoffered. |
| AWAIT_READY(offers2); |
| ASSERT_EQ(Resources(offers1.get()[0].resources()), |
| Resources(offers2.get()[0].resources())); |
| |
| AWAIT_READY(executorTerminated); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // In this test there are two frameworks and one slave. Each |
| // framework launches a task before the slave goes down. We verify |
| // that the two frameworks and their tasks are recovered after the |
| // slave restarts. |
| TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Framework 1. |
| MockScheduler sched1; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo1; |
| frameworkInfo1.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo1.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver1( |
| &sched1, frameworkInfo1, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched1, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched1, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(DeclineOffers()); // Ignore subsequent offers. |
| |
| driver1.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| // Use part of the resources in the offer so that the rest can be |
| // offered to framework 2. |
| Offer offer1 = offers1.get()[0]; |
| offer1.mutable_resources()->CopyFrom( |
| Resources::parse("cpus:1;mem:512").get()); |
| |
| // Framework 1 launches a task. |
| TaskInfo task1 = createTask(offer1, "sleep 1000"); |
| vector<TaskInfo> tasks1; |
| tasks1.push_back(task1); // Long-running task |
| |
| EXPECT_CALL(sched1, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement1 = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver1.launchTasks(offer1.id(), tasks1); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement1); |
| |
| // Framework 2. |
| MockScheduler sched2; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo2; |
| frameworkInfo2.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo2.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, frameworkInfo2, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched2, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched2, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(DeclineOffers()); // Ignore subsequent offers. |
| |
| driver2.start(); |
| |
| AWAIT_READY(offers2); |
| EXPECT_NE(0u, offers2.get().size()); |
| |
| // Framework 2 launches a task. |
| TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); |
| |
| vector<TaskInfo> tasks2; |
| tasks2.push_back(task2); // Long-running task |
| |
| EXPECT_CALL(sched2, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement2 = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| driver2.launchTasks(offers2.get()[0].id(), tasks2); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement2); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<ReregisterExecutorMessage> reregisterExecutorMessage2 = |
| FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); |
| |
| Future<ReregisterExecutorMessage> reregisterExecutorMessage1 = |
| FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| AWAIT_READY(_recover); |
| |
| // Wait for the executors to re-register. |
| AWAIT_READY(reregisterExecutorMessage1); |
| AWAIT_READY(reregisterExecutorMessage2); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| |
| Clock::resume(); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // Expectations for the status changes as a result of killing the |
| // tasks. |
| Future<TaskStatus> status1; |
| EXPECT_CALL(sched1, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status1)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched2, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status2)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| // Wait for the executors to terminate before shutting down the |
| // slave in order to give cgroups (if applicable) time to clean up. |
| Future<Nothing> executorTerminated1 = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| Future<Nothing> executorTerminated2 = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| // Kill task 1. |
| driver1.killTask(task1.task_id()); |
| |
| // Wait for TASK_KILLED update. |
| AWAIT_READY(status1); |
| ASSERT_EQ(TASK_KILLED, status1.get().state()); |
| |
| // Kill task 2. |
| driver2.killTask(task2.task_id()); |
| |
| // Wait for TASK_KILLED update. |
| AWAIT_READY(status2); |
| ASSERT_EQ(TASK_KILLED, status2.get().state()); |
| |
| AWAIT_READY(executorTerminated1); |
| AWAIT_READY(executorTerminated2); |
| |
| driver1.stop(); |
| driver1.join(); |
| driver2.stop(); |
| driver2.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| |
| |
| // This test verifies that slave recovery works properly even if |
| // multiple slaves are co-located on the same host. |
| TYPED_TEST(SlaveRecoveryTest, MultipleSlaves) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| driver.start(); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| // Start the first slave. |
| slave::Flags flags1 = this->CreateSlaveFlags(); |
| |
| #ifdef __linux__ |
| // Disable putting slave into cgroup(s) because this is a multi-slave test. |
| flags1.slave_subsystems = None(); |
| #endif |
| |
| Try<TypeParam*> containerizer1 = TypeParam::create(flags1, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave1 = this->StartSlave(containerizer1.get(), flags1); |
| ASSERT_SOME(slave1); |
| |
| AWAIT_READY(offers1); |
| ASSERT_EQ(1u, offers1.get().size()); |
| |
| // Launch a long running task in the first slave. |
| TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks1; |
| tasks1.push_back(task1); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .Times(1); |
| |
| Future<Nothing> _statusUpdateAcknowledgement1 = |
| FUTURE_DISPATCH(slave1.get(), &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks1); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement1); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers2)); |
| |
| // Start the second slave. |
| slave::Flags flags2 = this->CreateSlaveFlags(); |
| |
| #ifdef __linux__ |
| // Disable putting slave into cgroup(s) because this is a multi-slave test. |
| flags2.slave_subsystems = None(); |
| #endif |
| |
| Try<TypeParam*> containerizer2 = TypeParam::create(flags2, true); |
| ASSERT_SOME(containerizer2); |
| |
| Try<PID<Slave> > slave2 = this->StartSlave(containerizer2.get(), flags2); |
| ASSERT_SOME(slave2); |
| |
| AWAIT_READY(offers2); |
| ASSERT_EQ(1u, offers2.get().size()); |
| |
| // Launch a long running task in each slave. |
| TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks2; |
| tasks2.push_back(task2); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .Times(1); |
| |
| Future<Nothing> _statusUpdateAcknowledgement2 = |
| FUTURE_DISPATCH(slave2.get(), &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers2.get()[0].id(), tasks2); |
| |
| // Wait for the ACKs to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement2); |
| |
| this->Stop(slave1.get()); |
| delete containerizer1.get(); |
| this->Stop(slave2.get()); |
| delete containerizer2.get(); |
| |
| Future<Nothing> _recover2 = FUTURE_DISPATCH(_, &Slave::_recover); |
| Future<Nothing> _recover1 = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage2 = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage1 = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart both slaves using the same flags with new containerizers. |
| Try<TypeParam*> containerizer3 = TypeParam::create(flags1, true); |
| ASSERT_SOME(containerizer3); |
| |
| Clock::pause(); |
| |
| slave1 = this->StartSlave(containerizer3.get(), flags1); |
| ASSERT_SOME(slave1); |
| |
| Try<TypeParam*> containerizer4 = TypeParam::create(flags2, true); |
| ASSERT_SOME(containerizer4); |
| |
| slave2 = this->StartSlave(containerizer4.get(), flags2); |
| ASSERT_SOME(slave2); |
| |
| AWAIT_READY(_recover1); |
| AWAIT_READY(_recover2); |
| |
| // Wait for slaves to schedule reregister timeout. |
| Clock::settle(); |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| |
| // Make sure all pending timeouts are fired. |
| Clock::settle(); |
| |
| Clock::resume(); |
| |
| // Wait for the slaves to re-register. |
| AWAIT_READY(slaveReregisteredMessage1); |
| AWAIT_READY(slaveReregisteredMessage2); |
| |
| Future<TaskStatus> status1; |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status1)) |
| .WillOnce(FutureArg<1>(&status2)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| Future<Nothing> executorTerminated2 = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| Future<Nothing> executorTerminated1 = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Kill both tasks. |
| driver.killTask(task1.task_id()); |
| driver.killTask(task2.task_id()); |
| |
| AWAIT_READY(status1); |
| ASSERT_EQ(TASK_KILLED, status1.get().state()); |
| |
| AWAIT_READY(status2); |
| ASSERT_EQ(TASK_KILLED, status2.get().state()); |
| |
| AWAIT_READY(executorTerminated1); |
| AWAIT_READY(executorTerminated2); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer3.get(); |
| delete containerizer4.get(); |
| } |
| |
| |
| // The slave is stopped after it dispatched Containerizer::launch but |
| // before the containerizer has processed the launch. When the slave |
| // comes back up it should send a TASK_FAILED for the task. |
| // NOTE: This is a 'TYPED_TEST' but we don't use 'TypeParam'. |
| TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| TestContainerizer* containerizer1 = new TestContainerizer(); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| // Expect the launch but don't do anything. |
| Future<Nothing> launch; |
| EXPECT_CALL(*containerizer1, launch(_, _, _, _, _, _, _)) |
| .WillOnce(DoAll(FutureSatisfy(&launch), |
| Return(Future<bool>()))); |
| |
| // No status update should be sent for now. |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .Times(0); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Once we see the call to launch, restart the slave. |
| AWAIT_READY(launch); |
| |
| this->Stop(slave.get()); |
| delete containerizer1; |
| |
| Future<TaskStatus> status; |
| // There is a race here where the Slave may reregister before we |
| // shut down. If it does, it causes the StatusUpdateManager to |
| // flush which will cause a duplicate status update to be sent. As |
| // such, we ignore any subsequent updates. |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| TestContainerizer containerizer2; |
| |
| Clock::pause(); |
| |
| slave = this->StartSlave(&containerizer2, flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(_recover); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| Clock::resume(); |
| |
| // Scheduler should receive the TASK_FAILED update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FAILED, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| } |
| |
| |
| // We explicitly instantiate a SlaveRecoveryTest for test cases where |
| // we assume we'll only have the MesosContainerizer. |
| class MesosContainerizerSlaveRecoveryTest |
| : public SlaveRecoveryTest<MesosContainerizer> {}; |
| |
| |
| TEST_F(MesosContainerizerSlaveRecoveryTest, ResourceStatistics) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Try<MesosContainerizer*> containerizer1 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long-running task. |
| |
| // Message expectations. |
| Future<Message> registerExecutor = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(registerExecutor); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Set up so we can wait until the new slave updates the container's |
| // resources (this occurs after the executor has re-registered). |
| Future<Nothing> update = |
| FUTURE_DISPATCH(_, &MesosContainerizerProcess::update); |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<MesosContainerizer*> containerizer2 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Wait until the containerizer is updated. |
| AWAIT_READY(update); |
| |
| Future<hashset<ContainerID> > containers = containerizer2.get()->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers.get().size()); |
| |
| ContainerID containerId = *(containers.get().begin()); |
| |
| Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId); |
| AWAIT_READY(usage); |
| |
| // Check the resource limits are set. |
| EXPECT_TRUE(usage.get().has_cpus_limit()); |
| EXPECT_TRUE(usage.get().has_mem_limit_bytes()); |
| |
| Future<containerizer::Termination> wait = |
| containerizer2.get()->wait(containerId); |
| |
| containerizer2.get()->destroy(containerId); |
| |
| AWAIT_READY(wait); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| |
| delete containerizer2.get(); |
| } |
| |
| #ifdef __linux__ |
| // Test that the perf event isolator can be enabled on a new slave. |
| // Previously created containers will not report perf statistics but |
| // newly created containers will. |
| TEST_F(MesosContainerizerSlaveRecoveryTest, CGROUPS_ROOT_PerfRollForward) |
| { |
| Try<PID<Master> > master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Start a slave using a containerizer without a perf event |
| // isolator. |
| slave::Flags flags = this->CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| flags.slave_subsystems = ""; |
| |
| Try<MesosContainerizer*> containerizer1 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Scheduler expectations. |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillRepeatedly(Return()); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| SlaveID slaveId = offers1.get()[0].slave_id(); |
| |
| TaskInfo task1 = createTask( |
| slaveId, Resources::parse("cpus:0.5;mem:128").get(), "sleep 1000"); |
| vector<TaskInfo> tasks1; |
| tasks1.push_back(task1); |
| |
| // Message expectations. |
| Future<Message> registerExecutor = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks1); |
| |
| AWAIT_READY(registerExecutor); |
| |
| Future<hashset<ContainerID> > containers = containerizer1.get()->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers.get().size()); |
| |
| ContainerID containerId1 = *(containers.get().begin()); |
| |
| Future<ResourceStatistics> usage = containerizer1.get()->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| // There should not be any perf statistics. |
| EXPECT_FALSE(usage.get().has_perf()); |
| |
| this->Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| // Set up so we can wait until the new slave updates the container's |
| // resources (this occurs after the executor has re-registered). |
| Future<Nothing> update = |
| FUTURE_DISPATCH(_, &MesosContainerizerProcess::update); |
| |
| // Start a slave using a containerizer with a perf event isolator. |
| flags.isolation = "cgroups/cpu,cgroups/mem,cgroups/perf_event"; |
| flags.perf_events = "cycles,task-clock"; |
| flags.perf_duration = Milliseconds(250); |
| flags.perf_interval = Milliseconds(500); |
| |
| Try<MesosContainerizer*> containerizer2 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| slave = this->StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(offers2); |
| EXPECT_NE(0u, offers2.get().size()); |
| |
| // Wait until the containerizer is updated. |
| AWAIT_READY(update); |
| |
| // The first container should not report perf statistics. |
| usage = containerizer2.get()->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| EXPECT_FALSE(usage.get().has_perf()); |
| |
| // Start a new container which will start reporting perf statistics. |
| TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks2; |
| tasks2.push_back(task2); |
| |
| // Message expectations. |
| registerExecutor = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers2.get()[0].id(), tasks2); |
| |
| AWAIT_READY(registerExecutor); |
| |
| containers = containerizer2.get()->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(2u, containers.get().size()); |
| EXPECT_TRUE(containers.get().contains(containerId1)); |
| |
| ContainerID containerId2; |
| foreach (const ContainerID containerId, containers.get()) { |
| if (containerId != containerId1) { |
| containerId2.CopyFrom(containerId); |
| } |
| } |
| |
| usage = containerizer2.get()->usage(containerId2); |
| AWAIT_READY(usage); |
| |
| EXPECT_TRUE(usage.get().has_perf()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| delete containerizer2.get(); |
| } |
| #endif // __linux__ |