| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <stdint.h> |
| #include <unistd.h> |
| |
| #include <gmock/gmock.h> |
| |
| #include <map> |
| #include <string> |
| #include <vector> |
| |
| #include <mesos/executor.hpp> |
| #include <mesos/mesos.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <process/future.hpp> |
| #include <process/gmock.hpp> |
| #include <process/http.hpp> |
| #include <process/owned.hpp> |
| #include <process/pid.hpp> |
| #include <process/process.hpp> |
| #include <process/protobuf.hpp> |
| |
| #include <stout/json.hpp> |
| #include <stout/stringify.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "master/master.hpp" |
| |
| #include "slave/slave.hpp" |
| |
| #include "tests/containerizer.hpp" |
| #include "tests/mesos.hpp" |
| |
| using namespace mesos; |
| using namespace mesos::internal; |
| using namespace mesos::internal::protobuf; |
| using namespace mesos::internal::tests; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::slave::Slave; |
| using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_MIN; |
| |
| using process::Clock; |
| using process::Future; |
| using process::Message; |
| using process::Owned; |
| using process::PID; |
| using process::UPID; |
| using process::http::OK; |
| using process::http::Response; |
| |
| using std::string; |
| using std::map; |
| using std::vector; |
| |
| using testing::_; |
| using testing::AnyOf; |
| using testing::AtMost; |
| using testing::DoAll; |
| using testing::ElementsAre; |
| using testing::Eq; |
| using testing::Not; |
| using testing::Return; |
| using testing::SaveArg; |
| |
| |
| class FaultToleranceTest : public MesosTest {}; |
| |
| |
| // This test checks that when a slave is lost, |
| // its offer(s) is rescinded. |
| TEST_F(FaultToleranceTest, SlaveLost) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers.get().size()); |
| |
| Future<Nothing> offerRescinded; |
| EXPECT_CALL(sched, offerRescinded(&driver, offers.get()[0].id())) |
| .WillOnce(FutureSatisfy(&offerRescinded)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, offers.get()[0].slave_id())) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| ShutdownSlaves(); |
| |
| AWAIT_READY(offerRescinded); |
| AWAIT_READY(slaveLost); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that a scheduler gets a slave lost |
| // message for a partioned slave. |
| TEST_F(FaultToleranceTest, PartitionedSlave) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Set these expectations up before we spawn the slave so that we |
| // don't miss the first PING. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| |
| // Drop all the PONGs to simulate slave partition. |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| // Need to make sure the framework AND slave have registered with |
| // master. Waiting for resource offers should accomplish both. |
| AWAIT_READY(resourceOffers); |
| |
| Clock::pause(); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .Times(AtMost(1)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| // Now advance through the PINGs. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| |
| AWAIT_READY(slaveLost); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| // The purpose of this test is to ensure that when slaves are removed |
| // from the master, and then attempt to re-register, we deny the |
| // re-registration by sending a ShutdownMessage to the slave. |
| // Why? Because during a network partition, the master will remove a |
| // partitioned slave, thus sending its tasks to LOST. At this point, |
| // when the partition is removed, the slave will attempt to |
| // re-register with its running tasks. We've already notified |
| // frameworks that these tasks were LOST, so we have to have the slave |
| // slave shut down. |
| TEST_F(FaultToleranceTest, PartitionedSlaveReregistration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Allow the master to PING the slave, but drop all PONG messages |
| // from the slave. Note that we don't match on the master / slave |
| // PIDs because it's actually the SlaveObserver Process that sends |
| // the pings. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| StandaloneMasterDetector detector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec, &detector); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_NE(0u, offers.get().size()); |
| |
| // Launch a task. This is to ensure the task is killed by the slave, |
| // during shutdown. |
| TaskID taskId; |
| taskId.set_value("1"); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->MergeFrom(taskId); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| task.mutable_executor()->mutable_command()->set_value("sleep 60"); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| // Set up the expectations for launching the task. |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> runningStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&runningStatus)); |
| |
| Future<Nothing> statusUpdateAck = FUTURE_DISPATCH( |
| slave.get(), &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(runningStatus); |
| EXPECT_EQ(TASK_RUNNING, runningStatus.get().state()); |
| |
| // Wait for the slave to have handled the acknowledgment prior |
| // to pausing the clock. |
| AWAIT_READY(statusUpdateAck); |
| |
| // Drop the first shutdown message from the master (simulated |
| // partition), allow the second shutdown message to pass when |
| // the slave re-registers. |
| Future<ShutdownMessage> shutdownMessage = |
| DROP_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| Future<TaskStatus> lostStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&lostStatus)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // The master will have notified the framework of the lost task. |
| AWAIT_READY(lostStatus); |
| EXPECT_EQ(TASK_LOST, lostStatus.get().state()); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| Clock::resume(); |
| |
| // We now complete the partition on the slave side as well. This |
| // is done by simulating a master loss event which would normally |
| // occur during a network partition. |
| detector.appoint(None()); |
| |
| Future<Nothing> shutdown; |
| EXPECT_CALL(exec, shutdown(_)) |
| .WillOnce(FutureSatisfy(&shutdown)); |
| |
| shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| // Have the slave re-register with the master. |
| detector.appoint(master.get()); |
| |
| // Upon re-registration, the master will shutdown the slave. |
| // The slave will then shut down the executor. |
| AWAIT_READY(shutdownMessage); |
| AWAIT_READY(shutdown); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // The purpose of this test is to ensure that when slaves are removed |
| // from the master, and then attempt to send status updates, we send |
| // a ShutdownMessage to the slave. Why? Because during a network |
| // partition, the master will remove a partitioned slave, thus sending |
| // its tasks to LOST. At this point, when the partition is removed, |
| // the slave may attempt to send updates if it was unaware that the |
| // master removed it. We've already notified frameworks that these |
| // tasks were LOST, so we have to have the slave shut down. |
| TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Allow the master to PING the slave, but drop all PONG messages |
| // from the slave. Note that we don't match on the master / slave |
| // PIDs because it's actually the SlaveObserver Process that sends |
| // the pings. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| // Drop the first shutdown message from the master (simulated |
| // partition), allow the second shutdown message to pass when |
| // the slave sends an update. |
| Future<ShutdownMessage> shutdownMessage = |
| DROP_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| // At this point, the slave still thinks it's registered, so we |
| // simulate a status update coming from the slave. |
| TaskID taskId; |
| taskId.set_value("task_id"); |
| const StatusUpdate& update = createStatusUpdate( |
| frameworkId.get(), slaveId, taskId, TASK_RUNNING); |
| |
| StatusUpdateMessage message; |
| message.mutable_update()->CopyFrom(update); |
| message.set_pid(stringify(slave.get())); |
| |
| process::post(master.get(), message); |
| |
| // The master should shutdown the slave upon receiving the update. |
| AWAIT_READY(shutdownMessage); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // The purpose of this test is to ensure that when slaves are removed |
| // from the master, and then attempt to send exited executor messages, |
| // we send a ShutdownMessage to the slave. Why? Because during a |
| // network partition, the master will remove a partitioned slave, thus |
| // sending its tasks to LOST. At this point, when the partition is |
| // removed, the slave may attempt to send exited executor messages if |
| // it was unaware that the master removed it. We've already |
| // notified frameworks that the tasks under the executors were LOST, |
| // so we have to have the slave shut down. |
| TEST_F(FaultToleranceTest, PartitionedSlaveExitedExecutor) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Allow the master to PING the slave, but drop all PONG messages |
| // from the slave. Note that we don't match on the master / slave |
| // PIDs because it's actually the SlaveObserver Process that sends |
| // the pings. |
| Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| DROP_MESSAGES(Eq("PONG"), _, _); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| TestContainerizer containerizer(&exec); |
| |
| Try<PID<Slave> > slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId));\ |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(offers); |
| ASSERT_NE(0u, offers.get().size()); |
| |
| // Launch a task. This allows us to have the slave send an |
| // ExitedExecutorMessage. |
| TaskID taskId; |
| taskId.set_value("1"); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->MergeFrom(taskId); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| task.mutable_executor()->mutable_command()->set_value("sleep 60"); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| // Set up the expectations for launching the task. |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| // Drop all the status updates from the slave, so that we can |
| // ensure the ExitedExecutorMessage is what triggers the slave |
| // shutdown. |
| DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| // Drop the first shutdown message from the master (simulated |
| // partition) and allow the second shutdown message to pass when |
| // triggered by the ExitedExecutorMessage. |
| Future<ShutdownMessage> shutdownMessage = |
| DROP_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| Future<TaskStatus> lostStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&lostStatus)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, _)) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| Clock::pause(); |
| |
| // Now, induce a partition of the slave by having the master |
| // timeout the slave. |
| uint32_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == master::MAX_SLAVE_PING_TIMEOUTS) { |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq("PING"), _, _); |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| } |
| |
| Clock::advance(master::SLAVE_PING_TIMEOUT); |
| Clock::settle(); |
| |
| // The master will have notified the framework of the lost task. |
| AWAIT_READY(lostStatus); |
| EXPECT_EQ(TASK_LOST, lostStatus.get().state()); |
| |
| // Wait for the master to attempt to shut down the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| // The master will notify the framework that the slave was lost. |
| AWAIT_READY(slaveLost); |
| |
| shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get()); |
| |
| // Induce an ExitedExecutorMessage from the slave. |
| containerizer.destroy( |
| frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id()); |
| |
| // Upon receiving the message, the master will shutdown the slave. |
| AWAIT_READY(shutdownMessage); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test ensures that a framework connecting with a |
| // failed over master gets a registered callback. |
| // Note that this behavior might change in the future and |
| // the scheduler might receive a re-registered callback. |
| TEST_F(FaultToleranceTest, MasterFailover) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockScheduler sched; |
| StandaloneMasterDetector detector(master.get()); |
| TestingMesosSchedulerDriver driver(&sched, &detector); |
| |
| Future<process::Message> frameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| Future<Nothing> registered1; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered1)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| AWAIT_READY(registered1); |
| |
| // Simulate failed over master by restarting the master. |
| Stop(master.get()); |
| master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| EXPECT_CALL(sched, disconnected(&driver)); |
| |
| Future<AuthenticateMessage> authenticateMessage = |
| FUTURE_PROTOBUF(AuthenticateMessage(), _, _); |
| |
| Future<Nothing> registered2; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered2)); |
| |
| // Simulate a new master detected message to the scheduler. |
| detector.appoint(master.get()); |
| |
| // Scheduler should retry authentication. |
| AWAIT_READY(authenticateMessage); |
| |
| // Framework should get a registered callback. |
| AWAIT_READY(registered2); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test ensures that a failed over master recovers completed tasks |
| // from a slave's re-registration when the slave thinks the framework has |
| // completed (but the framework has not actually completed yet from master's |
| // point of view. |
| TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks) |
| { |
| // Step 1. Start Master and Slave. |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor executor(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer containerizer(&executor); |
| |
| StandaloneMasterDetector slaveDetector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&containerizer, &slaveDetector); |
| ASSERT_SOME(slave); |
| |
| // Verify master/slave have 0 completed/running frameworks. |
| Future<Response> masterState = process::http::get(master.get(), "state.json"); |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, masterState); |
| |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ( |
| "application/json", |
| "Content-Type", |
| masterState); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(masterState.get().body); |
| ASSERT_SOME(parse); |
| JSON::Object masterJSON = parse.get(); |
| |
| EXPECT_EQ(0u, |
| masterJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(0u, |
| masterJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| // Step 2. Create/start framework. |
| StandaloneMasterDetector schedDetector(master.get()); |
| |
| MockScheduler sched; |
| TestingMesosSchedulerDriver driver(&sched, &schedDetector); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| EXPECT_NE("", frameworkId.get().value()); |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Step 3. Create/launch a task. |
| TaskInfo task = |
| createTask(offers.get()[0], "sleep 10000", DEFAULT_EXECUTOR_ID); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); // Long lasting task |
| |
| EXPECT_CALL(executor, registered(_, _, _, _)); |
| EXPECT_CALL(executor, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| |
| // Verify master and slave recognize the running task/framework. |
| masterState = process::http::get(master.get(), "state.json"); |
| |
| parse = JSON::parse<JSON::Object>(masterState.get().body); |
| ASSERT_SOME(parse); |
| masterJSON = parse.get(); |
| |
| EXPECT_EQ(0u, |
| masterJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(1u, |
| masterJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| Future<Response> slaveState = process::http::get(slave.get(), "state.json"); |
| |
| parse = JSON::parse<JSON::Object>(slaveState.get().body); |
| ASSERT_SOME(parse); |
| JSON::Object slaveJSON = parse.get(); |
| |
| EXPECT_EQ(0u, |
| slaveJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(1u, |
| slaveJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| // Step 4. Kill task. |
| EXPECT_CALL(executor, killTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED)); |
| |
| Future<TaskStatus> statusKilled; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusKilled)); |
| |
| driver.killTask(task.task_id()); |
| |
| AWAIT_READY(statusKilled); |
| ASSERT_EQ(TASK_KILLED, statusKilled.get().state()); |
| |
| // At this point, the task is killed, but the framework is still |
| // running. This is because the executor has to time-out before |
| // it exits. |
| masterState = process::http::get(master.get(), "state.json"); |
| parse = JSON::parse<JSON::Object>(masterState.get().body); |
| ASSERT_SOME(parse); |
| masterJSON = parse.get(); |
| |
| EXPECT_EQ(0u, |
| masterJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(1u, |
| masterJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| slaveState = process::http::get(slave.get(), "state.json"); |
| parse = JSON::parse<JSON::Object>(slaveState.get().body); |
| ASSERT_SOME(parse); |
| slaveJSON = parse.get(); |
| |
| EXPECT_EQ(0u, |
| slaveJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(1u, |
| slaveJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| // Step 5. Kill the executor. |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| // Induce an ExitedExecutorMessage from the slave. |
| containerizer.destroy( |
| frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id()); |
| |
| AWAIT_READY(executorTerminated); |
| |
| // Slave should consider the framework completed after it executes |
| // "executorTerminated". |
| Clock::pause(); |
| Clock::settle(); |
| Clock::resume(); |
| |
| // Verify slave sees completed framework. |
| slaveState = process::http::get(slave.get(), "state.json"); |
| parse = JSON::parse<JSON::Object>(slaveState.get().body); |
| ASSERT_SOME(parse); |
| slaveJSON = parse.get(); |
| |
| EXPECT_EQ(1u, |
| slaveJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(0u, |
| slaveJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| // Step 6. Simulate failed over master by restarting the master. |
| Stop(master.get()); |
| master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Step 7. Simulate a framework re-registration with a failed over master. |
| EXPECT_CALL(sched, disconnected(&driver)); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| schedDetector.appoint(master.get()); |
| |
| AWAIT_READY(registered); |
| |
| // Step 8. Simulate a slave re-registration with a failed over master. |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Simulate a new master detected message to the slave. |
| slaveDetector.appoint(master.get()); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // Verify that the master doesn't add the completed framework |
| // reported by the slave to "frameworks.completed". |
| Clock::pause(); |
| Clock::settle(); |
| Clock::resume(); |
| |
| masterState = process::http::get(master.get(), "state.json"); |
| parse = JSON::parse<JSON::Object>(masterState.get().body); |
| ASSERT_SOME(parse); |
| masterJSON = parse.get(); |
| |
| EXPECT_EQ(0u, |
| masterJSON.values["completed_frameworks"].as<JSON::Array>().values.size()); |
| EXPECT_EQ(1u, |
| masterJSON.values["frameworks"].as<JSON::Array>().values.size()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SchedulerFailover) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| // Launch the first (i.e., failing) scheduler and wait until |
| // registered gets called to launch the second (i.e., failover) |
| // scheduler. |
| |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillRepeatedly(Return()); |
| |
| driver1.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| // Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler and wait until it |
| // gets a registered callback.. |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId.get()); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> sched2Registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _)) |
| .WillOnce(FutureSatisfy(&sched2Registered)); |
| |
| EXPECT_CALL(sched2, resourceOffers(&driver2, _)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(sched2, offerRescinded(&driver2, _)) |
| .Times(AtMost(1)); |
| |
| // Scheduler1's expectations. |
| EXPECT_CALL(sched1, offerRescinded(&driver1, _)) |
| .Times(AtMost(1)); |
| |
| Future<Nothing> sched1Error; |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) |
| .WillOnce(FutureSatisfy(&sched1Error)); |
| |
| driver2.start(); |
| |
| AWAIT_READY(sched2Registered); |
| |
| AWAIT_READY(sched1Error); |
| |
| EXPECT_EQ(DRIVER_STOPPED, driver2.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver2.join()); |
| |
| EXPECT_EQ(DRIVER_ABORTED, driver1.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver1.join()); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test was added to cover a fix for MESOS-659. |
| // Here, we drop the initial FrameworkReregisteredMessage from the |
| // master, so that the scheduler driver retries the initial failover |
| // re-registration. Previously, this caused a "Framework failed over" |
| // to be sent to the new scheduler driver! |
| TEST_F(FaultToleranceTest, SchedulerFailoverRetriedReregistration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Launch the first (i.e., failing) scheduler and wait until |
| // registered gets called to launch the second (i.e., failover) |
| // scheduler. |
| |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| driver1.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| // Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler and wait until it |
| // gets a registered callback.. |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId.get()); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| Clock::pause(); |
| |
| // Drop the initial FrameworkRegisteredMessage to the failed over |
| // scheduler. This ensures the scheduler driver will retry the |
| // registration. |
| Future<process::Message> reregistrationMessage = DROP_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| // There should be no error received, the master sends the error |
| // prior to sending the FrameworkRegisteredMessage so we don't |
| // need to wait to ensure this does not occur. |
| EXPECT_CALL(sched2, error(&driver2, "Framework failed over")) |
| .Times(0); |
| |
| Future<Nothing> sched2Registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _)) |
| .WillOnce(FutureSatisfy(&sched2Registered)); |
| |
| Future<Nothing> sched1Error; |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) |
| .WillOnce(FutureSatisfy(&sched1Error)); |
| |
| driver2.start(); |
| |
| AWAIT_READY(reregistrationMessage); |
| |
| // Trigger the re-registration retry. |
| Clock::advance(Seconds(1)); |
| |
| AWAIT_READY(sched2Registered); |
| |
| AWAIT_READY(sched1Error); |
| |
| EXPECT_EQ(DRIVER_STOPPED, driver2.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver2.join()); |
| |
| EXPECT_EQ(DRIVER_ABORTED, driver1.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver1.join()); |
| |
| Shutdown(); |
| Clock::resume(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, FrameworkReliableRegistration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .Times(AtMost(1)); |
| |
| Future<AuthenticateMessage> authenticateMessage = |
| FUTURE_PROTOBUF(AuthenticateMessage(), _, master.get()); |
| |
| // Drop the first framework registered message, allow subsequent messages. |
| Future<FrameworkRegisteredMessage> frameworkRegisteredMessage = |
| DROP_PROTOBUF(FrameworkRegisteredMessage(), master.get(), _); |
| |
| driver.start(); |
| |
| // Ensure authentication occurs. |
| AWAIT_READY(authenticateMessage); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| // TODO(benh): Pull out constant from SchedulerProcess. |
| Clock::pause(); |
| Clock::advance(Seconds(1)); |
| |
| AWAIT_READY(registered); // Ensures registered message is received. |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, FrameworkReregister) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| StandaloneMasterDetector slaveDetector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&slaveDetector); |
| ASSERT_SOME(slave); |
| |
| // Create a detector for the scheduler driver because we want the |
| // spurious leading master change to be known by the scheduler |
| // driver only. |
| StandaloneMasterDetector schedDetector(master.get()); |
| MockScheduler sched; |
| TestingMesosSchedulerDriver driver(&sched, &schedDetector); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)); |
| |
| Future<process::Message> message = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| driver.start(); |
| |
| AWAIT_READY(message); // Framework registered message, to get the pid. |
| AWAIT_READY(registered); // Framework registered call. |
| AWAIT_READY(resourceOffers); |
| |
| Future<Nothing> disconnected; |
| EXPECT_CALL(sched, disconnected(&driver)) |
| .WillOnce(FutureSatisfy(&disconnected)); |
| |
| Future<Nothing> reregistered; |
| EXPECT_CALL(sched, reregistered(&driver, _)) |
| .WillOnce(FutureSatisfy(&reregistered)); |
| |
| Future<Nothing> resourceOffers2; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| EXPECT_CALL(sched, offerRescinded(&driver, _)) |
| .Times(AtMost(1)); |
| |
| // Simulate a spurious leading master change at the scheduler. |
| schedDetector.appoint(master.get()); |
| |
| AWAIT_READY(disconnected); |
| |
| AWAIT_READY(reregistered); |
| |
| // The re-registered framework should get offers. |
| AWAIT_READY(resourceOffers2); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, TaskLost) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| StandaloneMasterDetector detector(master.get()); |
| TestingMesosSchedulerDriver driver(&sched, &detector); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<process::Message> message = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| AWAIT_READY(message); |
| |
| Future<Nothing> disconnected; |
| EXPECT_CALL(sched, disconnected(&driver)) |
| .WillOnce(FutureSatisfy(&disconnected)); |
| |
| // Simulate a spurious master loss event at the scheduler. |
| detector.appoint(None()); |
| |
| AWAIT_READY(disconnected); |
| |
| TaskInfo task; |
| task.set_name("test task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_LOST, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that a failover scheduler gets the |
| // retried status update. |
| TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| // Launch the first (i.e., failing) scheduler. |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| driver1.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| // Drop the first status update message |
| // between master and the scheduler. |
| Future<StatusUpdateMessage> statusUpdateMessage = |
| DROP_PROTOBUF(StatusUpdateMessage(), |
| _, |
| Not(AnyOf(Eq(master.get()), Eq(slave.get())))); |
| |
| driver1.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(statusUpdateMessage); |
| |
| // Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler and wait until it |
| // registers. |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> registered2; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) |
| .WillOnce(FutureSatisfy(®istered2)); |
| |
| // Scheduler1 should get an error due to failover. |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")); |
| |
| // Scheduler2 should receive retried status updates. |
| Future<Nothing> statusUpdate; |
| EXPECT_CALL(sched2, statusUpdate(&driver2, _)) |
| .WillOnce(FutureSatisfy(&statusUpdate)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| driver2.start(); |
| |
| AWAIT_READY(registered2); |
| |
| Clock::pause(); |
| |
| // Now advance time enough for the reliable timeout |
| // to kick in and another status update is sent. |
| Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_MIN); |
| |
| AWAIT_READY(statusUpdate); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver1.stop(); |
| driver2.stop(); |
| |
| driver1.join(); |
| driver2.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| // This test was added to ensure MESOS-420 is fixed. |
| // We need to make sure that the master correctly handles non-terminal |
| // tasks with exited executors upon framework re-registration. This is |
| // possible because the ExitedExecutor message can arrive before the |
| // terminal status update(s) of its task(s). |
| TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor) |
| { |
| // First we'll start a master and slave, then register a framework |
| // so we can launch a task. |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| TestContainerizer containerizer(&exec); |
| |
| StandaloneMasterDetector slaveDetector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&containerizer, &slaveDetector); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| StandaloneMasterDetector schedDetector(master.get()); |
| TestingMesosSchedulerDriver driver(&sched, &schedDetector); |
| |
| Future<process::Message> frameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<Nothing> statusUpdate; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureSatisfy(&statusUpdate)); // TASK_RUNNING. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| // Wait until TASK_RUNNING of the task is received. |
| AWAIT_READY(statusUpdate); |
| |
| EXPECT_CALL(sched, disconnected(&driver)); |
| |
| // Now that the task is launched, we need to induce the following: |
| // 1. ExitedExecutorMessage received by the master prior to a |
| // terminal status update for the corresponding task. This |
| // means we need to drop the status update coming from the |
| // slave. |
| // 2. Framework re-registration. |
| // |
| // To achieve this, we need to: |
| // 1. Restart the master (the slave / framework will not detect |
| // the new master automatically using the BasicMasterDetector). |
| // 2. Notify the slave of the new master. |
| // 3. Kill the executor. |
| // 4. Drop the status update, but allow the ExitedExecutorMessage. |
| // 5. Notify the framework of the new master. |
| Stop(master.get()); |
| master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| slaveDetector.appoint(master.get()); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // Allow the executor exited message and drop the status update, |
| // it's possible for a duplicate update to occur if the status |
| // update manager is notified of the new master after the task was |
| // killed. |
| Future<ExitedExecutorMessage> executorExitedMessage = |
| FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); |
| DROP_PROTOBUFS(StatusUpdateMessage(), _, _); |
| |
| // Now kill the executor. |
| containerizer.destroy(frameworkId, DEFAULT_EXECUTOR_ID); |
| |
| AWAIT_READY(executorExitedMessage); |
| |
| // Now notify the framework of the new master. |
| Future<FrameworkRegisteredMessage> frameworkRegisteredMessage2 = |
| FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| schedDetector.appoint(master.get()); |
| |
| AWAIT_READY(frameworkRegisteredMessage2); |
| |
| driver.stop(); |
| driver.join(); |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)); |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| Offer offer = offers.get()[0]; |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offer.slave_id()); |
| task.mutable_resources()->MergeFrom(offer.resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<Nothing> statusUpdate; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureSatisfy(&statusUpdate)); // TASK_RUNNING of task1. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver.launchTasks(offer.id(), tasks); |
| |
| // Wait until TASK_RUNNING of task1 is received. |
| AWAIT_READY(statusUpdate); |
| |
| // Simulate the slave receiving status update from an unknown |
| // (e.g. exited) executor of the given framework. |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); // TASK_RUNNING of task2. |
| |
| TaskID taskId; |
| taskId.set_value("task2"); |
| |
| StatusUpdate statusUpdate2 = createStatusUpdate( |
| frameworkId, offer.slave_id(), taskId, TASK_RUNNING, "Dummy update"); |
| |
| process::dispatch(slave.get(), &Slave::statusUpdate, statusUpdate2, UPID()); |
| |
| // Ensure that the scheduler receives task2's update. |
| AWAIT_READY(status); |
| EXPECT_EQ(taskId, status.get().task_id()); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver1.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched1, statusUpdate(&driver1, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver1.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| Future<Nothing> frameworkMessage; |
| EXPECT_CALL(sched2, frameworkMessage(&driver2, _, _, _)) |
| .WillOnce(FutureSatisfy(&frameworkMessage)); |
| |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")); |
| |
| Future<UpdateFrameworkMessage> updateFrameworkMessage = |
| FUTURE_PROTOBUF(UpdateFrameworkMessage(), _, _); |
| |
| driver2.start(); |
| |
| AWAIT_READY(registered); |
| |
| // Wait for the slave to get the updated framework pid. |
| AWAIT_READY(updateFrameworkMessage); |
| |
| execDriver->sendFrameworkMessage("Executor to Framework message"); |
| |
| AWAIT_READY(frameworkMessage); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver1.stop(); |
| driver2.stop(); |
| |
| driver1.join(); |
| driver2.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that a partitioned framework that still |
| // thinks it is registered with the master cannot kill a task because |
| // the master has re-registered another instance of the framework. |
| // What this test does: |
| // 1. Launch a master, slave and scheduler. |
| // 2. Scheduler launches a task. |
| // 3. Launch a second failed over scheduler. |
| // 4. Make the first scheduler believe it is still registered. |
| // 5. First scheduler attempts to kill the task which is ignored by the master. |
| TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| // Start the first scheduler and launch a task. |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched1, resourceOffers(&driver1, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched1, statusUpdate(&driver1, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage |
| = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver1.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Wait for the status update acknowledgement to be sent. This |
| // ensures the slave doesn't resend the TASK_RUNNING update to the |
| // failed over scheduler (below). |
| AWAIT_READY(statusUpdateAcknowledgementMessage); |
| |
| // Now start the second failed over scheduler. |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| EXPECT_CALL(sched2, resourceOffers(&driver2, _)) |
| .WillRepeatedly(Return()); // Ignore any offers. |
| |
| // Drop the framework error message from the master to simulate |
| // a partitioned framework. |
| Future<FrameworkErrorMessage> frameworkErrorMessage = |
| DROP_PROTOBUF(FrameworkErrorMessage(), _ , _); |
| |
| driver2.start(); |
| |
| AWAIT_READY(frameworkErrorMessage); |
| |
| AWAIT_READY(registered); |
| |
| // Now both the frameworks think they are registered with the |
| // master, but the master only knows about the second framework. |
| |
| // A 'killTask' by first framework should be dropped by the master. |
| EXPECT_CALL(sched1, statusUpdate(&driver1, _)) |
| .Times(0); |
| |
| // 'TASK_FINSIHED' by the executor should reach the second framework. |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched2, statusUpdate(&driver2, _)) |
| .WillOnce(FutureArg<1>(&status2)); |
| |
| Future<KillTaskMessage> killTaskMessage = |
| FUTURE_PROTOBUF(KillTaskMessage(), _, _); |
| |
| driver1.killTask(status.get().task_id()); |
| |
| AWAIT_READY(killTaskMessage); |
| |
| // By this point the master must have processed and ignored the |
| // 'killTask' message from the first framework. To verify this, |
| // the executor sends 'TASK_FINISHED' to ensure the only update |
| // received by the scheduler is 'TASK_FINISHED' and not |
| // 'TASK_KILLED'. |
| TaskStatus finishedStatus; |
| finishedStatus = status.get(); |
| finishedStatus.set_state(TASK_FINISHED); |
| execDriver->sendStatusUpdate(finishedStatus); |
| |
| AWAIT_READY(status2); |
| EXPECT_EQ(TASK_FINISHED, status2.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver1.stop(); |
| driver2.stop(); |
| |
| driver1.join(); |
| driver2.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that a scheduler exit shuts down the executor. |
| TEST_F(FaultToleranceTest, SchedulerExit) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| AWAIT_READY(offers); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SlaveReliableRegistration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Drop the first slave registered message, allow subsequent messages. |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| DROP_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| Try<PID<Slave> > slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> registered; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureSatisfy(®istered)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(registered); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| Clock::pause(); |
| Clock::advance(Seconds(1)); // TODO(benh): Pull out constant from Slave. |
| |
| AWAIT_READY(resourceOffers); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| |
| Clock::resume(); |
| } |
| |
| |
| TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| StandaloneMasterDetector detector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&detector); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<Nothing> resourceOffers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureSatisfy(&resourceOffers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(resourceOffers); |
| |
| Future<Nothing> offerRescinded; |
| EXPECT_CALL(sched, offerRescinded(_, _)) |
| .WillOnce(FutureSatisfy(&offerRescinded)); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Simulate a spurious master change event (e.g., due to ZooKeeper |
| // expiration) at the slave. |
| detector.appoint(master.get()); |
| |
| // Since an authenticating slave re-registration results in |
| // disconnecting the slave, its resources should be rescinded. |
| AWAIT_READY(offerRescinded); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that a re-registering slave sends the terminal |
| // unacknowledged tasks for a terminal executor. This is required |
| // for the master to correctly reconcile it's view with the slave's |
| // view of tasks. This test drops a terminal update to the master |
| // and then forces the slave to re-register. |
| TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| TestContainerizer containerizer(&exec); |
| |
| StandaloneMasterDetector detector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&containerizer, &detector); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage |
| = FUTURE_PROTOBUF( |
| StatusUpdateAcknowledgementMessage(), master.get(), slave.get()); |
| |
| driver.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Make sure the acknowledgement reaches the slave. |
| AWAIT_READY(statusUpdateAcknowledgementMessage); |
| |
| // Drop the TASK_FINISHED status update sent to the master. |
| Future<StatusUpdateMessage> statusUpdateMessage = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); |
| |
| Future<ExitedExecutorMessage> executorExitedMessage = |
| FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); |
| |
| TaskStatus finishedStatus; |
| finishedStatus = status.get(); |
| finishedStatus.set_state(TASK_FINISHED); |
| execDriver->sendStatusUpdate(finishedStatus); |
| |
| // Ensure the update was sent. |
| AWAIT_READY(statusUpdateMessage); |
| |
| // Now kill the executor. |
| containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID); |
| |
| Future<TaskStatus> status2; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status2)); |
| |
| // We drop the 'UpdateFrameworkMessage' from the master to slave to |
| // stop the status update manager from retrying the update that was |
| // already sent due to the new master detection. |
| DROP_PROTOBUFS(UpdateFrameworkMessage(), _, _); |
| |
| detector.appoint(master.get()); |
| |
| AWAIT_READY(status2); |
| EXPECT_EQ(TASK_FINISHED, status2.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that the master sends TASK_LOST updates |
| // for tasks in the master absent from the re-registered slave. |
| // We do this by dropping RunTaskMessage from master to the slave. |
| TEST_F(FaultToleranceTest, ReconcileLostTasks) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| StandaloneMasterDetector detector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&detector); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name("test task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| // We now launch a task and drop the corresponding RunTaskMessage on |
| // the slave, to ensure that only the master knows about this task. |
| Future<RunTaskMessage> runTaskMessage = |
| DROP_PROTOBUF(RunTaskMessage(), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(runTaskMessage); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| // Simulate a spurious master change event (e.g., due to ZooKeeper |
| // expiration) at the slave to force re-registration. |
| detector.appoint(master.get()); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| AWAIT_READY(status); |
| |
| ASSERT_EQ(task.task_id(), status.get().task_id()); |
| ASSERT_EQ(TASK_LOST, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that when the slave re-registers, the master |
| // does not send TASK_LOST update for a task that has reached terminal |
| // state but is waiting for an acknowledgement. |
| TEST_F(FaultToleranceTest, ReconcileIncompleteTasks) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| StandaloneMasterDetector detector(master.get()); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec, &detector); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer> > offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name("test task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| // Send a terminal update right away. |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED)); |
| |
| // Drop the status update from slave to the master, so that |
| // the slave has a pending terminal update when it re-registers. |
| DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); |
| |
| Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore retried update due to update framework. |
| |
| driver.launchTasks(offers.get()[0].id(), tasks); |
| |
| AWAIT_READY(_statusUpdate); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Simulate a spurious master change event (e.g., due to ZooKeeper |
| // expiration) at the slave to force re-registration. |
| detector.appoint(master.get()); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // The master should not send a TASK_LOST after the slave |
| // re-registers. We check this by calling Clock::settle() so that |
| // the only update the scheduler receives is the retried |
| // TASK_FINISHED update. |
| // NOTE: The status update manager resends the status update when |
| // it detects a new master. |
| Clock::pause(); |
| Clock::settle(); |
| |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FINISHED, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test ensures that if a master incorrectly thinks that it is |
| // leading, the scheduler driver will drop messages from this master. |
| // Unfortunately, it is not currently possible to start more than one |
| // master within the same process. So, this test merely simulates this |
| // by spoofing messages. |
| // This test does the following: |
| // 1. Start a master, scheduler, launch a task. |
| // 2. Spoof a lost task message for the slave. |
| // 3. Once the message is sent to the scheduler, kill the task. |
| // 4. Ensure the task was KILLED rather than LOST. |
| TEST_F(FaultToleranceTest, SplitBrainMasters) |
| { |
| // 1. Start a master, scheduler, and launch a task. |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave> > slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<Message> registered = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> runningStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&runningStatus)); |
| |
| driver.start(); |
| |
| AWAIT_READY(registered); |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(runningStatus); |
| EXPECT_EQ(TASK_RUNNING, runningStatus.get().state()); |
| |
| // 2. Spoof a lost task message for the slave. |
| StatusUpdateMessage lostUpdate; |
| lostUpdate.mutable_update()->CopyFrom(createStatusUpdate( |
| frameworkId.get(), |
| runningStatus.get().slave_id(), |
| runningStatus.get().task_id(), |
| TASK_LOST)); |
| |
| // Spoof a message from a random master; this should be dropped by |
| // the scheduler driver. Since this is delivered locally, it is |
| // synchronously placed on the scheduler driver's queue. |
| process::post(UPID("master2@127.0.0.1:50"), registered.get().to, lostUpdate); |
| |
| // 3. Once the message is sent to the scheduler, kill the task. |
| EXPECT_CALL(exec, killTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED)); |
| |
| Future<TaskStatus> killedStatus; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&killedStatus)); |
| |
| driver.killTask(runningStatus.get().task_id()); |
| |
| // 4. Ensure the task was KILLED rather than LOST. |
| AWAIT_READY(killedStatus); |
| EXPECT_EQ(TASK_KILLED, killedStatus.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .WillRepeatedly(Return()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |