| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <unistd.h> |
| |
| #include <gmock/gmock.h> |
| |
| #include <map> |
| #include <string> |
| #include <vector> |
| |
| #include <mesos/executor.hpp> |
| #include <mesos/mesos.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <process/future.hpp> |
| #include <process/gmock.hpp> |
| #include <process/pid.hpp> |
| #include <process/process.hpp> |
| #include <process/protobuf.hpp> |
| |
| #include <stout/stringify.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "master/master.hpp" |
| |
| #include "slave/isolator.hpp" |
| #include "slave/slave.hpp" |
| |
| #include "tests/isolator.hpp" |
| #include "tests/mesos.hpp" |
| |
| using namespace mesos; |
| using namespace mesos::internal; |
| using namespace mesos::internal::protobuf; |
| using namespace mesos::internal::tests; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::slave::Isolator; |
| using mesos::internal::slave::Slave; |
| using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL; |
| |
| using process::Clock; |
| using process::Future; |
| using process::Message; |
| using process::PID; |
| using process::UPID; |
| |
| using std::string; |
| using std::map; |
| using std::vector; |
| |
| using testing::_; |
| using testing::AnyOf; |
| using testing::AtMost; |
| using testing::DoAll; |
| using testing::ElementsAre; |
| using testing::Eq; |
| using testing::Not; |
| using testing::Return; |
| using testing::SaveArg; |
| |
| |
| class FaultToleranceTest : public MesosTest {}; |
| |
| |
| // This test checks that when a slave is lost, |
| // its offer(s) is rescinded. |
| TEST_F(FaultToleranceTest, SlaveLost) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers.get().size()); |
| |
| Future<Nothing> offerRescinded; |
| EXPECT_CALL(sched, offerRescinded(&driver, offers.get()[0].id())) |
| .WillOnce(FutureSatisfy(&offerRescinded)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, offers.get()[0].slave_id())) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| ShutdownSlaves(); |
| |
| AWAIT_READY(offerRescinded); |
| AWAIT_READY(slaveLost); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that a scheduler gets a slave lost |
| // message for a partioned slave. |
| TEST_F(FaultToleranceTest, PartitionedSlave) |
| { |
| Clock::pause(); |
| |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Set these expectations up before we spawn the slave so that we |
| // don't miss the first PING. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| |
| // Drop all the PONGs to simulate slave partition. |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| // Need to make sure the framework AND slave have registered with |
| // master. Waiting for resource offers should accomplish both. |
| AWAIT_READY(resourceOffers); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .Times(AtMost(1)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| // Now advance through the PINGs. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| |
| AWAIT_READY(slaveLost); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| // The purpose of this test is to ensure that when slaves are removed |
| // from the master, and then attempt to re-register, we deny the |
| // re-registration by sending a ShutdownMessage to the slave. |
| // Why? Because during a network partition, the master will remove a |
| // partitioned slave, thus sending its tasks to LOST. At this point, |
| // when the partition is removed, the slave will attempt to |
| // re-register with its running tasks. We've already notified |
| // frameworks that these tasks were LOST, so we have to have the slave |
| // slave shut down. |
| TEST_F(FaultToleranceTest, PartitionedSlaveReregistration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Allow the master to PING the slave, but drop all PONG messages |
| // from the slave. Note that we don't match on the master / slave |
| // PIDs because it's actually the SlaveObserver Process that sends |
| // the pings. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_NE(0u, offers.get().size()); |
| |
| // Launch a task. This is to ensure the task is killed by the slave, |
| // during shutdown. |
| TaskID taskId; |
| taskId.set_value("1"); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->MergeFrom(taskId); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| task.mutable_executor()->mutable_command()->set_value("sleep 60"); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| // Set up the expectations for launching the task. |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> runningStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&runningStatus)); |
| |
| Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( |
| slave.get(), &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(runningStatus); |
| EXPECT_EQ(TASK_RUNNING, runningStatus.get().state()); |
| |
| // Wait for the slave to have handled the acknowledgment prior |
| // to pausing the clock. |
| AWAIT_READY(statusUpdateAck); |
| |
| // Drop the first shutdown message from the master (simulated |
| // partition), allow the second shutdown message to pass when |
| // the slave re-registers. |
| Future<ShutdownMessage> shutdownMessage = |
| DROP_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| Future<TaskStatus> lostStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&lostStatus)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // The master will have notified the framework of the lost task. |
| AWAIT_READY(lostStatus); |
| EXPECT_EQ(TASK_LOST, lostStatus.get().state()); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| // We now complete the partition on the slave side as well. This |
| // is done by simulating a NoMasterDetectedMessage which would |
| // normally occur during a network partition. |
| process::post(slave.get(), NoMasterDetectedMessage()); |
| |
| Future<Nothing> shutdown; |
| EXPECT_CALL(exec, shutdown(_)) |
| .WillOnce(FutureSatisfy(&shutdown)); |
| |
| shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| // Have the slave re-register with the master. |
| NewMasterDetectedMessage newMasterDetectedMessage; |
| newMasterDetectedMessage.set_pid(master.get()); |
| process::post(slave.get(), newMasterDetectedMessage); |
| |
| // Upon re-registration, the master will shutdown the slave. |
| // The slave will then shut down the executor. |
| AWAIT_READY(shutdownMessage); |
| AWAIT_READY(shutdown); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // The purpose of this test is to ensure that when slaves are removed |
| // from the master, and then attempt to send status updates, we send |
| // a ShutdownMessage to the slave. Why? Because during a network |
| // partition, the master will remove a partitioned slave, thus sending |
| // its tasks to LOST. At this point, when the partition is removed, |
| // the slave may attempt to send updates if it was unaware that the |
| // master deactivated it. We've already notified frameworks that these |
| // tasks were LOST, so we have to have the slave shut down. |
| TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Allow the master to PING the slave, but drop all PONG messages |
| // from the slave. Note that we don't match on the master / slave |
| // PIDs because it's actually the SlaveObserver Process that sends |
| // the pings. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| // Drop the first shutdown message from the master (simulated |
| // partition), allow the second shutdown message to pass when |
| // the slave sends an update. |
| Future<ShutdownMessage> shutdownMessage = |
| DROP_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| // At this point, the slave still thinks it's registered, so we |
| // simulate a status update coming from the slave. |
| StatusUpdateMessage statusUpdate; |
| statusUpdate.set_pid(stringify(slave.get())); |
| statusUpdate.mutable_update()->mutable_framework_id()->set_value( |
| frameworkId.get().value()); |
| statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor"); |
| statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value()); |
| statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value( |
| "task_id"); |
| statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING); |
| statusUpdate.mutable_update()->set_timestamp(Clock::now().secs()); |
| statusUpdate.mutable_update()->set_uuid(stringify(UUID::random())); |
| process::post(master.get(), statusUpdate); |
| |
| // The master should shutdown the slave upon receiving the update. |
| AWAIT_READY(shutdownMessage); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // The purpose of this test is to ensure that when slaves are removed |
| // from the master, and then attempt to send exited executor messages, |
| // we send a ShutdownMessage to the slave. Why? Because during a |
| // network partition, the master will remove a partitioned slave, thus |
| // sending its tasks to LOST. At this point, when the partition is |
| // removed, the slave may attempt to send exited executor messages if |
| // it was unaware that the master deactivated it. We've already |
| // notified frameworks that the tasks under the executors were LOST, |
| // so we have to have the slave shut down. |
| TEST_F(FaultToleranceTest, PartitionedSlaveExitedExecutor) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Allow the master to PING the slave, but drop all PONG messages |
| // from the slave. Note that we don't match on the master / slave |
| // PIDs because it's actually the SlaveObserver Process that sends |
| // the pings. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| TestingIsolator isolator(&exec); |
| |
| Try<PID<Slave> > slave = StartSlave(&isolator); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId));\ |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(offers); |
| ASSERT_NE(0u, offers.get().size()); |
| |
| // Launch a task. This allows us to have the slave send an |
| // ExitedExecutorMessage. |
| TaskID taskId; |
| taskId.set_value("1"); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->MergeFrom(taskId); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| task.mutable_executor()->mutable_command()->set_value("sleep 60"); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| // Set up the expectations for launching the task. |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| // Drop all the status updates from the slave, so that we can |
| // ensure the ExitedExecutorMessage is what triggers the slave |
| // shutdown. |
| DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Drop the first shutdown message from the master (simulated |
| // partition) and allow the second shutdown message to pass when |
| // triggered by the ExitedExecutorMessage. |
| Future<ShutdownMessage> shutdownMessage = |
| DROP_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| Future<TaskStatus> lostStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&lostStatus)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // The master will have notified the framework of the lost task. |
| AWAIT_READY(lostStatus); |
| EXPECT_EQ(TASK_LOST, lostStatus.get().state()); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| // Induce an ExitedExecutorMessage from the slave. |
| dispatch(isolator, |
| &Isolator::killExecutor, |
| frameworkId.get(), |
| DEFAULT_EXECUTOR_INFO.executor_id()); |
| |
| // Upon receiving the message, the master will shutdown the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test ensures that a framework connecting with a |
| // failed over master gets a registered callback. |
| // Note that this behavior might change in the future and |
| // the scheduler might receive a re-registered callback. |
| TEST_F(FaultToleranceTest, MasterFailover) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<process::Message> frameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| // Simulate failed over master by restarting the master. |
| Stop(master.get()); |
| master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| EXPECT_CALL(sched, disconnected(&driver)); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| // Simulate a new master detected message to the scheduler. |
| NewMasterDetectedMessage newMasterDetectedMsg; |
| newMasterDetectedMsg.set_pid(master.get()); |
| |
| process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg); |
| |
| // Framework should get a registered callback. |
| AWAIT_READY(registered); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SchedulerFailover) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| // Launch the first (i.e., failing) scheduler and wait until |
| // registered gets called to launch the second (i.e., failover) |
| // scheduler. |
| |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillRepeatedly(Return()); |
| |
| driver1.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| // Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler and wait until it |
| // gets a registered callback.. |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId.get()); |
| |
| MesosSchedulerDriver driver2(&sched2, framework2, master.get()); |
| |
| Future<Nothing> sched2Registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _)) |
| .WillOnce(FutureSatisfy(&sched2Registered)); |
| |
| EXPECT_CALL(sched2, resourceOffers(&driver2, _)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(sched2, offerRescinded(&driver2, _)) |
| .Times(AtMost(1)); |
| |
| // Scheduler1's expectations. |
| EXPECT_CALL(sched1, offerRescinded(&driver1, _)) |
| .Times(AtMost(1)); |
| |
| Future<Nothing> sched1Error; |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) |
| .WillOnce(FutureSatisfy(&sched1Error)); |
| |
| driver2.start(); |
| |
| AWAIT_READY(sched2Registered); |
| |
| AWAIT_READY(sched1Error); |
| |
| EXPECT_EQ(DRIVER_STOPPED, driver2.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver2.join()); |
| |
| EXPECT_EQ(DRIVER_ABORTED, driver1.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver1.join()); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, FrameworkReliableRegistration) |
| { |
| Clock::pause(); |
| |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .Times(AtMost(1)); |
| |
| // Drop the first framework registered message, allow subsequent messages. |
| Future<FrameworkRegisteredMessage> frameworkRegisteredMessage = |
| DROP_PROTOBUF(FrameworkRegisteredMessage(), _, _); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| // TODO(benh): Pull out constant from SchedulerProcess. |
| Clock::advance(Seconds(1)); |
| |
| AWAIT_READY(registered); // Ensures registered message is received. |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, FrameworkReregister) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| Future<process::Message> message = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| driver.start(); |
| |
| AWAIT_READY(message); // Framework registered message, to get the pid. |
| AWAIT_READY(registered); // Framework registered call. |
| |
| Future<Nothing> disconnected; |
| EXPECT_CALL(sched, disconnected(&driver)) |
| .WillOnce(FutureSatisfy(&disconnected)); |
| |
| Future<Nothing> reregistered; |
| EXPECT_CALL(sched, reregistered(&driver, _)) |
| .WillOnce(FutureSatisfy(&reregistered)); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .Times(AtMost(1)); |
| |
| // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper |
| // expiration) at the scheduler. |
| NewMasterDetectedMessage newMasterDetectedMsg; |
| newMasterDetectedMsg.set_pid(master.get()); |
| |
| process::post(message.get().to, newMasterDetectedMsg); |
| |
| AWAIT_READY(disconnected); |
| |
| AWAIT_READY(reregistered); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, TaskLost) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<process::Message> message = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| AWAIT_READY(message); |
| |
| Future<Nothing> disconnected; |
| EXPECT_CALL(sched, disconnected(&driver)) |
| .WillOnce(FutureSatisfy(&disconnected)); |
| |
| // Simulate a spurious noMasterDetected event at the scheduler. |
| process::post(message.get().to, NoMasterDetectedMessage()); |
| |
| AWAIT_READY(disconnected); |
| |
| TaskInfo task; |
| task.set_name("test task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_LOST, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that a failover scheduler gets the |
| // retried status update. |
| TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate) |
| { |
| Clock::pause(); |
| |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| // Launch the first (i.e., failing) scheduler. |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver1.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .Times(1); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| // Drop the first status update message |
| // between master and the scheduler. |
| Future<StatusUpdateMessage> statusUpdateMessage = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, Not(AnyOf(Eq(master.get()), Eq(slave.get())))); |
| |
| driver1.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(statusUpdateMessage); |
| |
| // Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler and wait until it |
| // registers. |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId); |
| |
| MesosSchedulerDriver driver2(&sched2, framework2, master.get()); |
| |
| Future<Nothing> registered2; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) |
| .WillOnce(FutureSatisfy(®istered2)); |
| |
| // Scheduler1 should get an error due to failover. |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")); |
| |
| driver2.start(); |
| |
| AWAIT_READY(registered2); |
| |
| // Now advance time enough for the reliable timeout |
| // to kick in and another status update is sent. |
| Future<Nothing> statusUpdate; |
| EXPECT_CALL(sched2, statusUpdate(&driver2, _)) |
| .WillOnce(FutureSatisfy(&statusUpdate)); |
| |
| Clock::advance(STATUS_UPDATE_RETRY_INTERVAL); |
| |
| AWAIT_READY(statusUpdate); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver1.stop(); |
| driver2.stop(); |
| |
| driver1.join(); |
| driver2.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| Offer offer = offers.get()[0]; |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offer.slave_id()); |
| task.mutable_resources()->MergeFrom(offer.resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<Nothing> statusUpdate; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureSatisfy(&statusUpdate)); // TASK_RUNNING of task1. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver.launchTasks(offer.id(), tasks); |
| |
| // Wait until TASK_RUNNING of task1 is received. |
| AWAIT_READY(statusUpdate); |
| |
| // Simulate the slave receiving status update from an unknown |
| // (e.g. exited) executor of the given framework. |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); // TASK_RUNNING of task2. |
| |
| TaskID taskId; |
| taskId.set_value("task2"); |
| |
| StatusUpdate statusUpdate2 = createStatusUpdate( |
| frameworkId, offer.slave_id(), taskId, TASK_RUNNING, "Dummy update"); |
| |
| process::dispatch(slave.get(), &Slave::statusUpdate, statusUpdate2); |
| |
| // Ensure that the scheduler receives task2's update. |
| AWAIT_READY(status); |
| EXPECT_EQ(taskId, status.get().task_id()); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1(&sched1, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver1.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched1, statusUpdate(&driver1, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver1.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId); |
| |
| MesosSchedulerDriver driver2(&sched2, framework2, master.get()); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| Future<Nothing> frameworkMessage; |
| EXPECT_CALL(sched2, frameworkMessage(&driver2, _, _, _)) |
| .WillOnce(FutureSatisfy(&frameworkMessage)); |
| |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")); |
| |
| driver2.start(); |
| |
| AWAIT_READY(registered); |
| |
| execDriver->sendFrameworkMessage("Executor to Framework message"); |
| |
| AWAIT_READY(frameworkMessage); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver1.stop(); |
| driver2.stop(); |
| |
| driver1.join(); |
| driver2.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that a scheduler exit shuts down the executor. |
| TEST_F(FaultToleranceTest, SchedulerExit) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| AWAIT_READY(offers); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SlaveReliableRegistration) |
| { |
| Clock::pause(); |
| |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Drop the first slave registered message, allow subsequent messages. |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| DROP_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| Clock::advance(Seconds(1)); // TODO(benh): Pull out constant from Slave. |
| |
| AWAIT_READY(resourceOffers); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(resourceOffers); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper |
| // expiration) at the slave. |
| |
| NewMasterDetectedMessage message; |
| message.set_pid(master.get()); |
| |
| process::post(slave.get(), message); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that a re-registering slave does not inform |
| // the master about a terminated executor (and its tasks), when the |
| // executor has pending updates. We check this by ensuring that the |
| // master sends a TASK_LOST update for the task belonging to the |
| // terminated executor. |
| TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| TestingIsolator isolator(&exec); |
| |
| Try<PID<Slave> > slave = StartSlave(&isolator); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(1, 1, 512)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Drop the TASK_LOST status update(s) sent to the master. |
| // This ensures that the TASK_LOST received by the scheduler |
| // is generated by the master. |
| DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()); |
| |
| Future<ExitedExecutorMessage> executorExitedMessage = |
| FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); |
| |
| // Now kill the executor. |
| dispatch(isolator, |
| &Isolator::killExecutor, |
| frameworkId.get(), |
| DEFAULT_EXECUTOR_ID); |
| |
| AWAIT_READY(executorExitedMessage); |
| |
| // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper |
| // expiration) at the slave to force re-registration. |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status2)); |
| |
| NewMasterDetectedMessage message; |
| message.set_pid(master.get()); |
| |
| process::post(slave.get(), message); |
| |
| AWAIT_READY(status2); |
| EXPECT_EQ(TASK_LOST, status2.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that the master sends TASK_LOST updates |
| // for tasks in the master absent from the re-registered slave. |
| // We do this by dropping RunTaskMessage from master to the slave. |
| TEST_F(FaultToleranceTest, ReconcileLostTasks) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name("test task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| // We now launch a task and drop the corresponding RunTaskMessage on |
| // the slave, to ensure that only the master knows about this task. |
| Future<RunTaskMessage> runTaskMessage = |
| DROP_PROTOBUF(RunTaskMessage(), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(runTaskMessage); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper |
| // expiration) at the slave to force re-registration. |
| |
| NewMasterDetectedMessage message; |
| message.set_pid(master.get()); |
| |
| process::post(slave.get(), message); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| AWAIT_READY(status); |
| |
| ASSERT_EQ(task.task_id(), status.get().task_id()); |
| ASSERT_EQ(TASK_LOST, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that when the slave re-registers, the master |
| // does not send TASK_LOST update for a task that has reached terminal |
| // state but is waiting for an acknowledgement. |
| TEST_F(FaultToleranceTest, ReconcileIncompleteTasks) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name("test task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| // Send a terminal update right away. |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED)); |
| |
| // Drop the status update from slave to the master, so that |
| // the slave has a pending terminal update when it re-registers. |
| DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); |
| |
| Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| Clock::pause(); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(_statusUpdate); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper |
| // expiration) at the slave to force re-registration. |
| NewMasterDetectedMessage message; |
| message.set_pid(master.get()); |
| |
| process::post(slave.get(), message); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // The master should not send a TASK_LOST after the slave |
| // re-registers. We check this by advancing the clock so that |
| // the only update the scheduler receives is the retried |
| // TASK_FINISHED update. |
| Clock::advance(STATUS_UPDATE_RETRY_INTERVAL); |
| |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FINISHED, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |