| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <unistd.h> |
| |
| #include <algorithm> |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <gmock/gmock.h> |
| |
| #include <mesos/executor.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <process/clock.hpp> |
| #include <process/future.hpp> |
| #include <process/gmock.hpp> |
| #include <process/pid.hpp> |
| #include <process/reap.hpp> |
| #include <process/subprocess.hpp> |
| |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/path.hpp> |
| #include <stout/try.hpp> |
| |
| #include "common/build.hpp" |
| #include "common/http.hpp" |
| #include "common/protobuf_utils.hpp" |
| |
| #include "master/flags.hpp" |
| #include "master/master.hpp" |
| |
| #include "slave/constants.hpp" |
| #include "slave/gc.hpp" |
| #include "slave/flags.hpp" |
| #include "slave/slave.hpp" |
| |
| #include "slave/containerizer/fetcher.hpp" |
| |
| #include "slave/containerizer/mesos/containerizer.hpp" |
| |
| #include "tests/containerizer.hpp" |
| #include "tests/flags.hpp" |
| #include "tests/limiter.hpp" |
| #include "tests/mesos.hpp" |
| #include "tests/utils.hpp" |
| |
| using namespace mesos::internal::slave; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::protobuf::createLabel; |
| |
| using process::Clock; |
| using process::Future; |
| using process::PID; |
| using process::Promise; |
| using process::UPID; |
| |
| using process::http::OK; |
| using process::http::Response; |
| using process::http::ServiceUnavailable; |
| |
| using std::map; |
| using std::shared_ptr; |
| using std::string; |
| using std::vector; |
| |
| using testing::_; |
| using testing::AtMost; |
| using testing::DoAll; |
| using testing::Eq; |
| using testing::Invoke; |
| using testing::Return; |
| using testing::SaveArg; |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| // Those of the overall Mesos master/slave/scheduler/driver tests |
| // that seem vaguely more slave than master-related are in this file. |
| // The others are in "master_tests.cpp". |
| |
| class SlaveTest : public MesosTest {}; |
| |
| |
| // This test ensures that when a slave shuts itself down, it |
| // unregisters itself and the master notifies the framework |
| // immediately and rescinds any offers. |
| TEST_F(SlaveTest, Shutdown) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave>> slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers.get().size()); |
| |
| Future<Nothing> offerRescinded; |
| EXPECT_CALL(sched, offerRescinded(&driver, offers.get()[0].id())) |
| .WillOnce(FutureSatisfy(&offerRescinded)); |
| |
| Future<Nothing> slaveLost; |
| EXPECT_CALL(sched, slaveLost(&driver, offers.get()[0].slave_id())) |
| .WillOnce(FutureSatisfy(&slaveLost)); |
| |
| // Stop the slave with explicit shutdown message so that the slave |
| // unregisters. |
| Stop(slave.get(), true); |
| |
| AWAIT_READY(offerRescinded); |
| AWAIT_READY(slaveLost); |
| |
| JSON::Object stats = Metrics(); |
| EXPECT_EQ(1, stats.values["master/slave_removals"]); |
| EXPECT_EQ(1, stats.values["master/slave_removals/reason_unregistered"]); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| TEST_F(SlaveTest, ShutdownUnregisteredExecutor) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Need flags for 'executor_registration_timeout'. |
| slave::Flags flags = CreateSlaveFlags(); |
| // Set the isolation flag so we know a MesosContainerizer will |
| // be created. |
| flags.isolation = "posix/cpu,posix/mem"; |
| |
| Fetcher fetcher; |
| |
| Try<MesosContainerizer*> containerizer = |
| MesosContainerizer::create(flags, false, &fetcher); |
| CHECK_SOME(containerizer); |
| |
| Try<PID<Slave>> slave = StartSlave(containerizer.get()); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| |
| CommandInfo command; |
| command.set_value("sleep 10"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| // Drop the registration message from the executor to the slave. |
| Future<Message> registerExecutor = |
| DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(registerExecutor); |
| |
| Clock::pause(); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| // Ensure that the slave times out and kills the executor. |
| Future<Nothing> destroyExecutor = |
| FUTURE_DISPATCH(_, &MesosContainerizerProcess::destroy); |
| |
| Clock::advance(flags.executor_registration_timeout); |
| |
| AWAIT_READY(destroyExecutor); |
| |
| Clock::settle(); // Wait for Containerizer::destroy to complete. |
| |
| // Now advance time until the reaper reaps the executor. |
| while (status.isPending()) { |
| Clock::advance(process::MAX_REAP_INTERVAL()); |
| Clock::settle(); |
| } |
| |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_FAILED, status->state()); |
| EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); |
| EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REGISTRATION_TIMEOUT, |
| status->reason()); |
| |
| Clock::resume(); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test verifies that when an executor terminates before |
| // registering with slave, it is properly cleaned up. |
| TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer containerizer(&exec); |
| |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| // Drop the registration message from the executor to the slave. |
| Future<Message> registerExecutorMessage = |
| DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(registerExecutorMessage); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| Future<Nothing> schedule = |
| FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule); |
| |
| EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)); |
| // Now kill the executor. |
| containerizer.destroy(offers.get()[0].framework_id(), DEFAULT_EXECUTOR_ID); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_FAILED, status->state()); |
| EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); |
| EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status->reason()); |
| |
| // We use 'gc.schedule' as a signal for the executor being cleaned |
| // up by the slave. |
| AWAIT_READY(schedule); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // Test that we can run the command executor and specify an "override" |
| // command to use via the --override argument. |
| // NOTE: CommmandExecutor::reaped() sleeps 1 second to avoid races, |
| // hence this test takes more than 1 second to finish. The root cause |
| // is tracked via MESOS-4111. |
| TEST_F(SlaveTest, CommandExecutorWithOverride) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| TestContainerizer containerizer; |
| |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| |
| CommandInfo command; |
| command.set_value("sleep 10"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| // Expect the launch and just assume it was sucessful since we'll be |
| // launching the executor ourselves manually below. |
| Future<Nothing> launch; |
| EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _)) |
| .WillOnce(DoAll(FutureSatisfy(&launch), |
| Return(true))); |
| |
| // Expect wait after launch is called but don't return anything |
| // until after we've finished everything below. |
| Future<Nothing> wait; |
| Promise<containerizer::Termination> promise; |
| EXPECT_CALL(containerizer, wait(_)) |
| .WillOnce(DoAll(FutureSatisfy(&wait), |
| Return(promise.future()))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Once we get the launch the mesos-executor with --override. |
| AWAIT_READY(launch); |
| |
| // Set up fake environment for executor. |
| map<string, string> environment = os::environment(); |
| environment["MESOS_SLAVE_PID"] = stringify(slave.get()); |
| environment["MESOS_SLAVE_ID"] = stringify(offers.get()[0].slave_id()); |
| environment["MESOS_FRAMEWORK_ID"] = stringify(offers.get()[0].framework_id()); |
| environment["MESOS_EXECUTOR_ID"] = stringify(task.task_id()); |
| environment["MESOS_DIRECTORY"] = ""; |
| |
| // Create temporary file to store validation string. If command is |
| // succesfully replaced, this file will end up containing the string |
| // 'Hello World\n'. Otherwise, the original task command i.e. |
| // 'sleep' will be called and the test will fail. |
| Try<string> file = os::mktemp(); |
| ASSERT_SOME(file); |
| |
| string executorCommand = |
| path::join(getLauncherDir(), "mesos-executor") + |
| " --override -- /bin/sh -c 'echo hello world >" + file.get() + "'"; |
| |
| // Expect two status updates, one for once the mesos-executor says |
| // the task is running and one for after our overridden command |
| // above finishes. |
| Future<TaskStatus> statusRunning, statusFinished; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| Try<Subprocess> executor = |
| subprocess( |
| executorCommand, |
| Subprocess::PIPE(), |
| Subprocess::PIPE(), |
| Subprocess::PIPE(), |
| environment); |
| |
| ASSERT_SOME(executor); |
| |
| // Scheduler should first receive TASK_RUNNING followed by the |
| // TASK_FINISHED from the executor. |
| AWAIT_READY(statusRunning); |
| ASSERT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); |
| |
| AWAIT_READY(statusFinished); |
| ASSERT_EQ(TASK_FINISHED, statusFinished.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); |
| |
| AWAIT_READY(wait); |
| |
| containerizer::Termination termination; |
| termination.set_message("Killed executor"); |
| termination.set_status(0); |
| promise.set(termination); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(executor.get().status()); |
| |
| // Verify file contents. |
| Try<string> validate = os::read(file.get()); |
| ASSERT_SOME(validate); |
| |
| EXPECT_EQ(validate.get(), "hello world\n"); |
| |
| os::rm(file.get()); |
| |
| Shutdown(); |
| } |
| |
| |
| // Test that we don't let task arguments bleed over as |
| // mesos-executor args. For more details of this see MESOS-1873. |
| // |
| // This assumes the ability to execute '/bin/echo --author'. |
| TEST_F(SlaveTest, ComamndTaskWithArguments) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Need flags for 'executor_registration_timeout'. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "posix/cpu,posix/mem"; |
| |
| Fetcher fetcher; |
| |
| Try<MesosContainerizer*> containerizer = |
| MesosContainerizer::create(flags, false, &fetcher); |
| CHECK_SOME(containerizer); |
| |
| Try<PID<Slave>> slave = StartSlave(containerizer.get()); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| |
| // Command executor will run as user running test. |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("/bin/echo"); |
| command.add_arguments("/bin/echo"); |
| command.add_arguments("--author"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Scheduler should first receive TASK_RUNNING followed by the |
| // TASK_FINISHED from the executor. |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // Don't let args from the CommandInfo struct bleed over into |
| // mesos-executor forking. For more details of this see MESOS-1873. |
| TEST_F(SlaveTest, GetExecutorInfo) |
| { |
| TestContainerizer containerizer; |
| StandaloneMasterDetector detector; |
| |
| MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); |
| |
| FrameworkID frameworkId; |
| frameworkId.set_value("20141010-221431-251662764-60288-32120-0000"); |
| |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.mutable_id()->CopyFrom(frameworkId); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name("task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->set_value( |
| "20141010-221431-251662764-60288-32120-0001"); |
| task.mutable_resources()->MergeFrom( |
| Resources::parse("cpus:0.1;mem:32").get()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("/bin/echo"); |
| command.add_arguments("/bin/echo"); |
| command.add_arguments("--author"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task); |
| |
| // Now assert that it actually is running mesos-executor without any |
| // bleedover from the command we intend on running. |
| EXPECT_TRUE(executor.command().shell()); |
| EXPECT_EQ(0, executor.command().arguments_size()); |
| EXPECT_NE(string::npos, executor.command().value().find("mesos-executor")); |
| } |
| |
| |
| // Ensure getExecutorInfo for mesos-executor gets the ContainerInfo, |
| // if present. This ensures the MesosContainerizer can get the |
| // NetworkInfo even when using the command executor. |
| TEST_F(SlaveTest, GetExecutorInfoForTaskWithContainer) |
| { |
| TestContainerizer containerizer; |
| StandaloneMasterDetector detector; |
| |
| MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); |
| |
| // Launch a task with the command executor and ContainerInfo with |
| // NetworkInfo. |
| TaskInfo task; |
| task.set_name("task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->set_value( |
| "20141010-221431-251662764-60288-12345-0001"); |
| task.mutable_resources()->MergeFrom( |
| Resources::parse("cpus:0.1;mem:32").get()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("/bin/echo"); |
| command.add_arguments("/bin/echo"); |
| command.add_arguments("--author"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| ContainerInfo *container = task.mutable_container(); |
| container->set_type(ContainerInfo::MESOS); |
| |
| NetworkInfo *network = container->add_network_infos(); |
| network->set_ip_address("4.3.2.1"); |
| network->add_groups("public"); |
| |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.mutable_id()->set_value( |
| "20141010-221431-251662764-60288-12345-0000"); |
| const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task); |
| |
| // Now assert that the executor has both the command and ContainerInfo |
| EXPECT_TRUE(executor.command().shell()); |
| // CommandInfo.container is not included. In this test the ContainerInfo |
| // must be included in Executor.container (copied from TaskInfo.container). |
| EXPECT_TRUE(executor.has_container()); |
| |
| EXPECT_EQ("4.3.2.1", executor.container().network_infos(0).ip_address()); |
| EXPECT_EQ(1, executor.container().network_infos(0).groups_size()); |
| EXPECT_EQ("public", executor.container().network_infos(0).groups(0)); |
| } |
| |
| |
| // This tests ensures that MesosContainerizer will launch a command |
| // executor even if it contains a ContainerInfo in the TaskInfo. |
| // Prior to 0.26.0, this was only used to launch Docker containers, so |
| // MesosContainerizer would fail the launch. |
| TEST_F(SlaveTest, LaunchTaskInfoWithContainerInfo) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Need flags for 'executor_registration_timeout'. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "posix/cpu,posix/mem"; |
| |
| Fetcher fetcher; |
| |
| Try<MesosContainerizer*> containerizer = |
| MesosContainerizer::create(flags, false, &fetcher); |
| CHECK_SOME(containerizer); |
| |
| StandaloneMasterDetector detector; |
| MockSlave slave(flags, &detector, containerizer.get()); |
| |
| // Launch a task with the command executor and ContainerInfo with |
| // NetworkInfo. |
| TaskInfo task; |
| task.set_name("task"); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->set_value( |
| "20141010-221431-251662764-60288-12345-0001"); |
| task.mutable_resources()->MergeFrom( |
| Resources::parse("cpus:0.1;mem:32").get()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("/bin/echo"); |
| command.add_arguments("/bin/echo"); |
| command.add_arguments("--author"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| ContainerID containerId; |
| containerId.set_value(UUID::random().toString()); |
| ContainerInfo *container = task.mutable_container(); |
| container->set_type(ContainerInfo::MESOS); |
| |
| NetworkInfo *network = container->add_network_infos(); |
| network->set_ip_address("4.3.2.1"); |
| network->add_groups("public"); |
| |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.mutable_id()->set_value( |
| "20141010-221431-251662764-60288-12345-0000"); |
| const ExecutorInfo& executor = slave.getExecutorInfo(frameworkInfo, task); |
| |
| SlaveID slaveID; |
| slaveID.set_value(UUID::random().toString()); |
| Future<bool> launch = containerizer.get()->launch( |
| containerId, |
| task, |
| executor, |
| "/tmp", |
| "test", |
| slaveID, |
| slave.self(), |
| false); |
| AWAIT_READY(launch); |
| |
| // TODO(spikecurtis): With agent capabilities (MESOS-3362), the |
| // Containerizer should fail this request since none of the listed |
| // isolators can handle NetworkInfo, which implies |
| // IP-per-container. |
| EXPECT_TRUE(launch.get()); |
| |
| // Wait for the container to terminate before shutting down. |
| AWAIT_READY(containerizer.get()->wait(containerId)); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test runs a command without the command user field set. The |
| // command will verify the assumption that the command is run as the |
| // slave user (in this case, root). |
| TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Need flags for 'executor_registration_timeout'. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "posix/cpu,posix/mem"; |
| |
| Fetcher fetcher; |
| |
| Try<MesosContainerizer*> containerizer = |
| MesosContainerizer::create(flags, false, &fetcher); |
| CHECK_SOME(containerizer); |
| |
| Try<PID<Slave>> slave = StartSlave(containerizer.get()); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| |
| Result<string> user = os::user(); |
| CHECK_SOME(user) << "Failed to get current user name" |
| << (user.isError() ? ": " + user.error() : ""); |
| |
| const string helper = getTestHelperPath("active-user-test-helper"); |
| |
| // Command executor will run as user running test. |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value(helper); |
| command.add_arguments(helper); |
| command.add_arguments(user.get()); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Scheduler should first receive TASK_RUNNING followed by the |
| // TASK_FINISHED from the executor. |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test runs a command _with_ the command user field set. The |
| // command will verify the assumption that the command is run as the |
| // specified user. We use (and assume the presence) of the |
| // unprivileged 'nobody' user which should be available on both Linux |
| // and Mac OS X. |
| TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser) |
| { |
| // TODO(nnielsen): Introduce STOUT abstraction for user verification |
| // instead of flat getpwnam call. |
| const string testUser = "nobody"; |
| if (::getpwnam(testUser.c_str()) == NULL) { |
| LOG(WARNING) << "Cannot run ROOT_RunTaskWithCommandInfoWithUser test:" |
| << " user '" << testUser << "' is not present"; |
| return; |
| } |
| |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Need flags for 'executor_registration_timeout'. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "posix/cpu,posix/mem"; |
| |
| Fetcher fetcher; |
| |
| Try<MesosContainerizer*> containerizer = |
| MesosContainerizer::create(flags, false, &fetcher); |
| CHECK_SOME(containerizer); |
| |
| Try<PID<Slave>> slave = StartSlave(containerizer.get()); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| const string helper = getTestHelperPath("active-user-test-helper"); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // HACK: Launch a prepare task as root to prepare the binaries. |
| // This task creates the lt-mesos-executor binary in the build dir. |
| // Because the real task is run as a test user (nobody), it does not |
| // have permission to create files in the build directory. |
| TaskInfo prepareTask; |
| prepareTask.set_name("prepare task"); |
| prepareTask.mutable_task_id()->set_value("1"); |
| prepareTask.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id()); |
| prepareTask.mutable_resources()->CopyFrom( |
| offers.get()[0].resources()); |
| |
| Result<string> user = os::user(); |
| CHECK_SOME(user) << "Failed to get current user name" |
| << (user.isError() ? ": " + user.error() : ""); |
| // Current user should be root. |
| EXPECT_EQ("root", user.get()); |
| |
| // This prepare command executor will run as the current user |
| // running the tests (root). After this command executor finishes, |
| // we know that the lt-mesos-executor binary file exists. |
| CommandInfo prepareCommand; |
| prepareCommand.set_shell(false); |
| prepareCommand.set_value(helper); |
| prepareCommand.add_arguments(helper); |
| prepareCommand.add_arguments(user.get()); |
| prepareTask.mutable_command()->CopyFrom(prepareCommand); |
| |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {prepareTask}); |
| |
| // Scheduler should first receive TASK_RUNNING followed by the |
| // TASK_FINISHED from the executor. |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); |
| |
| // Start to launch a task with different user. |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("2"); |
| task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->CopyFrom(offers.get()[0].resources()); |
| |
| CommandInfo command; |
| command.set_user(testUser); |
| command.set_shell(false); |
| command.set_value(helper); |
| command.add_arguments(helper); |
| command.add_arguments(testUser); |
| |
| task.mutable_command()->CopyFrom(command); |
| |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Scheduler should first receive TASK_RUNNING followed by the |
| // TASK_FINISHED from the executor. |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test ensures that a status update acknowledgement from a |
| // non-leading master is ignored. |
| TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave>> slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver schedDriver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&schedDriver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&schedDriver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // We need to grab this message to get the scheduler's pid. |
| Future<Message> frameworkRegisteredMessage = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| schedDriver.start(); |
| |
| AWAIT_READY(frameworkRegisteredMessage); |
| const UPID schedulerPid = frameworkRegisteredMessage.get().to; |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); |
| |
| Future<ExecutorDriver*> execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(FutureArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> update; |
| EXPECT_CALL(sched, statusUpdate(&schedDriver, _)) |
| .WillOnce(FutureArg<1>(&update)); |
| |
| // Pause the clock to prevent status update retries on the slave. |
| Clock::pause(); |
| |
| // Intercept the acknowledgement sent to the slave so that we can |
| // spoof the master's pid. |
| Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage = |
| DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), |
| master.get(), |
| slave.get()); |
| |
| Future<Nothing> _statusUpdateAcknowledgement = |
| FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement); |
| |
| schedDriver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(update); |
| EXPECT_EQ(TASK_RUNNING, update.get().state()); |
| |
| AWAIT_READY(acknowledgementMessage); |
| |
| // Send the acknowledgement to the slave with a non-leading master. |
| post(process::UPID("master@localhost:1"), |
| slave.get(), |
| acknowledgementMessage.get()); |
| |
| // Make sure the acknowledgement was ignored. |
| Clock::settle(); |
| ASSERT_TRUE(_statusUpdateAcknowledgement.isPending()); |
| |
| // Make sure the status update gets retried because the slave |
| // ignored the acknowledgement. |
| Future<TaskStatus> retriedUpdate; |
| EXPECT_CALL(sched, statusUpdate(&schedDriver, _)) |
| .WillOnce(FutureArg<1>(&retriedUpdate)); |
| |
| Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); |
| |
| AWAIT_READY(retriedUpdate); |
| |
| // Ensure the slave receives and properly handles the ACK. |
| // Clock::settle() ensures that the slave successfully |
| // executes Slave::_statusUpdateAcknowledgement(). |
| AWAIT_READY(_statusUpdateAcknowledgement); |
| Clock::settle(); |
| |
| Clock::resume(); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| schedDriver.stop(); |
| schedDriver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(SlaveTest, MetricsInMetricsEndpoint) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<PID<Slave>> slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| JSON::Object snapshot = Metrics(); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/uptime_secs")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/registered")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/recovery_errors")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/frameworks_active")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_staging")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_starting")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_running")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_finished")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_failed")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_killed")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/tasks_lost")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/executors_registering")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/executors_running")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/executors_terminating")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/executors_terminated")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/executors_preempted")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/valid_status_updates")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/invalid_status_updates")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/valid_framework_messages")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/invalid_framework_messages")); |
| |
| EXPECT_EQ( |
| 1u, |
| snapshot.values.count("slave/executor_directory_max_allowed_age_secs")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/container_launch_errors")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/cpus_total")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/cpus_used")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/cpus_percent")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/mem_total")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/mem_used")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/mem_percent")); |
| |
| EXPECT_EQ(1u, snapshot.values.count("slave/disk_total")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/disk_used")); |
| EXPECT_EQ(1u, snapshot.values.count("slave/disk_percent")); |
| |
| Shutdown(); |
| } |
| |
| |
| // Test to verify that we increment the container launch errors metric |
| // when we fail to launch a container. |
| TEST_F(SlaveTest, MetricsSlaveLaunchErrors) |
| { |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| TestContainerizer containerizer; |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| const Offer offer = offers.get()[0]; |
| |
| // Verify that we start with no launch failures. |
| JSON::Object snapshot = Metrics(); |
| EXPECT_EQ(0, snapshot.values["slave/container_launch_errors"]); |
| |
| EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _)) |
| .WillOnce(Return(Failure("Injected failure"))); |
| |
| Future<TaskStatus> failureUpdate; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&failureUpdate)); |
| |
| // The above injected containerizer failure also triggers executorLost. |
| EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)); |
| |
| // Try to start a task |
| TaskInfo task = createTask( |
| offer.slave_id(), |
| Resources::parse("cpus:1;mem:32").get(), |
| "sleep 1000", |
| DEFAULT_EXECUTOR_ID); |
| |
| driver.launchTasks(offer.id(), {task}); |
| |
| AWAIT_READY(failureUpdate); |
| ASSERT_EQ(TASK_FAILED, failureUpdate.get().state()); |
| |
| // After failure injection, metrics should report a single failure. |
| snapshot = Metrics(); |
| EXPECT_EQ(1, snapshot.values["slave/container_launch_errors"]); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(SlaveTest, StateEndpoint) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| flags.hostname = "localhost"; |
| flags.resources = "cpus:4;mem:2048;disk:512;ports:[33000-34000]"; |
| flags.attributes = "rack:abc;host:myhost"; |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer containerizer(&exec); |
| |
| // Capture the start time deterministically. |
| Clock::pause(); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Try<PID<Slave>> slave = StartSlave(&containerizer, flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure slave has finished recovery. |
| AWAIT_READY(__recover); |
| Clock::settle(); |
| |
| Future<Response> response = process::http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); |
| |
| ASSERT_SOME(parse); |
| |
| JSON::Object state = parse.get(); |
| |
| EXPECT_EQ(MESOS_VERSION, state.values["version"]); |
| |
| if (build::GIT_SHA.isSome()) { |
| EXPECT_EQ(build::GIT_SHA.get(), state.values["git_sha"]); |
| } |
| |
| if (build::GIT_BRANCH.isSome()) { |
| EXPECT_EQ(build::GIT_BRANCH.get(), state.values["git_branch"]); |
| } |
| |
| if (build::GIT_TAG.isSome()) { |
| EXPECT_EQ(build::GIT_TAG.get(), state.values["git_tag"]); |
| } |
| |
| EXPECT_EQ(build::DATE, state.values["build_date"]); |
| EXPECT_EQ(build::TIME, state.values["build_time"]); |
| EXPECT_EQ(build::USER, state.values["build_user"]); |
| |
| ASSERT_TRUE(state.values["start_time"].is<JSON::Number>()); |
| EXPECT_EQ( |
| static_cast<int>(Clock::now().secs()), |
| state.values["start_time"].as<JSON::Number>().as<int>()); |
| |
| // TODO(bmahler): The slave must register for the 'id' |
| // to be non-empty. |
| ASSERT_TRUE(state.values["id"].is<JSON::String>()); |
| |
| EXPECT_EQ(stringify(slave.get()), state.values["pid"]); |
| EXPECT_EQ(flags.hostname.get(), state.values["hostname"]); |
| |
| Try<Resources> resources = Resources::parse( |
| flags.resources.get(), flags.default_role); |
| |
| ASSERT_SOME(resources); |
| |
| EXPECT_EQ(model(resources.get()), state.values["resources"]); |
| |
| Attributes attributes = Attributes::parse(flags.attributes.get()); |
| |
| EXPECT_EQ(model(attributes), state.values["attributes"]); |
| |
| // TODO(bmahler): Test "master_hostname", "log_dir", |
| // "external_log_file". |
| |
| ASSERT_TRUE(state.values["frameworks"].is<JSON::Array>()); |
| EXPECT_TRUE(state.values["frameworks"].as<JSON::Array>().values.empty()); |
| |
| ASSERT_TRUE( |
| state.values["completed_frameworks"].is<JSON::Array>()); |
| EXPECT_TRUE( |
| state.values["completed_frameworks"].as<JSON::Array>().values.empty()); |
| |
| // TODO(bmahler): Ensure this contains all the flags. |
| ASSERT_TRUE(state.values["flags"].is<JSON::Object>()); |
| EXPECT_FALSE(state.values["flags"].as<JSON::Object>().values.empty()); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskID taskId; |
| taskId.set_value("1"); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->MergeFrom(taskId); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .Times(1); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| response = http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| parse = JSON::parse<JSON::Object>(response.get().body); |
| ASSERT_SOME(parse); |
| |
| // Check that executor_id is in the right format. |
| ASSERT_SOME_EQ( |
| "default", |
| parse.get().find<JSON::String>( |
| "frameworks[0].executors[0].tasks[0].executor_id")); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test checks that when a slave is in RECOVERING state it responds |
| // to HTTP requests for "/state" endpoint with ServiceUnavailable. |
| TEST_F(SlaveTest, StateEndpointUnavailableDuringRecovery) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer* containerizer1 = new TestContainerizer(&exec); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<PID<Slave>> slave = StartSlave(containerizer1, flags); |
| ASSERT_SOME(slave); |
| |
| // Launch a task so that slave has something to recover after restart. |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| driver.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Need this expectation here because `TestContainerizer` doesn't do recovery |
| // and hence sets `MESOS_RECOVERY_TIMEOUT` as '0s' causing the executor driver |
| // to exit immediately after slave exit. |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| // Restart the slave. |
| Stop(slave.get()); |
| delete containerizer1; |
| |
| // Pause the clock to keep slave in RECOVERING state. |
| Clock::pause(); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| TestContainerizer containerizer2; |
| |
| slave = StartSlave(&containerizer2, flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure slave has setup the route for "/state". |
| AWAIT_READY(_recover); |
| |
| Future<Response> response = process::http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(ServiceUnavailable().status, response); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test ensures that when a slave is shutting down, it will not |
| // try to re-register with the master. |
| TEST_F(SlaveTest, DISABLED_TerminatingSlaveDoesNotReregister) |
| { |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Create a MockExecutor to enable us to catch |
| // ShutdownExecutorMessage later. |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| // Create a StandaloneMasterDetector to enable the slave to trigger |
| // re-registration later. |
| StandaloneMasterDetector detector(master.get()); |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| // Make the executor_shutdown_grace_period to be much longer than |
| // REGISTER_RETRY_INTERVAL, so that the slave will at least call |
| // call doReliableRegistration() once before the slave is actually |
| // terminated. |
| flags.executor_shutdown_grace_period = slave::REGISTER_RETRY_INTERVAL_MAX * 2; |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(&exec, &detector, flags); |
| ASSERT_SOME(slave); |
| |
| // Create a task on the slave. |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| // Launch a task that uses less resource than the |
| // default(cpus:2, mem:1024). |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .Times(1); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| driver.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Pause the clock here so that after detecting a new master, |
| // the slave will not send multiple reregister messages |
| // before we change its state to TERMINATING. |
| Clock::pause(); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| DROP_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get()); |
| |
| // Simulate a new master detected event on the slave, |
| // so that the slave will do a re-registration. |
| detector.appoint(master.get()); |
| |
| // Make sure the slave has entered doReliableRegistration() |
| // before we change the slave's state. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // Setup an expectation that the master should not receive any |
| // ReregisterSlaveMessage in the future. |
| EXPECT_NO_FUTURE_PROTOBUFS( |
| ReregisterSlaveMessage(), slave.get(), master.get()); |
| |
| // Drop the ShutdownExecutorMessage, so that the slave will |
| // stay in TERMINATING for a while. |
| DROP_PROTOBUFS(ShutdownExecutorMessage(), slave.get(), _); |
| |
| Future<Nothing> executorLost; |
| EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)) |
| .WillOnce(FutureSatisfy(&executorLost)); |
| |
| // Send a ShutdownMessage instead of calling Stop() directly |
| // to avoid blocking. |
| post(master.get(), slave.get(), ShutdownMessage()); |
| |
| // Advance the clock to trigger doReliableRegistration(). |
| Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2); |
| Clock::settle(); |
| Clock::resume(); |
| |
| AWAIT_READY(executorLost); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies the slave will destroy a container if, when |
| // receiving a terminal status task update, updating the container's |
| // resources fails. |
| TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) |
| { |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| TestContainerizer containerizer(&exec); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| Future<vector<Offer>> offers; |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| Offer offer = offers.get()[0]; |
| |
| // Start two tasks. |
| vector<TaskInfo> tasks; |
| |
| tasks.push_back(createTask( |
| offer.slave_id(), |
| Resources::parse("cpus:0.1;mem:32").get(), |
| "sleep 1000", |
| exec.id)); |
| |
| tasks.push_back(createTask( |
| offer.slave_id(), |
| Resources::parse("cpus:0.1;mem:32").get(), |
| "sleep 1000", |
| exec.id)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status1, status2, status3, status4; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status1)) |
| .WillOnce(FutureArg<1>(&status2)) |
| .WillOnce(FutureArg<1>(&status3)) |
| .WillOnce(FutureArg<1>(&status4)); |
| |
| driver.launchTasks(offer.id(), tasks); |
| |
| AWAIT_READY(status1); |
| EXPECT_EQ(TASK_RUNNING, status1.get().state()); |
| |
| AWAIT_READY(status2); |
| EXPECT_EQ(TASK_RUNNING, status2.get().state()); |
| |
| // Set up the containerizer so the next update() will fail. |
| EXPECT_CALL(containerizer, update(_, _)) |
| .WillOnce(Return(Failure("update() failed"))) |
| .WillRepeatedly(Return(Nothing())); |
| |
| EXPECT_CALL(exec, killTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED)); |
| |
| Future<Nothing> executorLost; |
| EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)) |
| .WillOnce(FutureSatisfy(&executorLost)); |
| |
| // Kill one of the tasks. The failed update should result in the |
| // second task going lost when the container is destroyed. |
| driver.killTask(tasks[0].task_id()); |
| |
| AWAIT_READY(status3); |
| EXPECT_EQ(TASK_KILLED, status3.get().state()); |
| EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source()); |
| |
| AWAIT_READY(status4); |
| EXPECT_EQ(TASK_LOST, status4->state()); |
| EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source()); |
| EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason()); |
| |
| AWAIT_READY(executorLost); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that the resources of a container will be |
| // updated before tasks are sent to the executor. |
| TEST_F(SlaveTest, ContainerUpdatedBeforeTaskReachesExecutor) |
| { |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| TestContainerizer containerizer(&exec); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, "1", "128", "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // This is used to determine which of the following finishes first: |
| // `containerizer->update` or `exec->launchTask`. We want to make |
| // sure that containerizer update always finishes before the task is |
| // sent to the executor. |
| testing::Sequence sequence; |
| |
| EXPECT_CALL(containerizer, update(_, _)) |
| .InSequence(sequence) |
| .WillOnce(Return(Nothing())); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .InSequence(sequence) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies the slave will destroy a container if updating |
| // the container's resources fails during task launch. |
| TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails) |
| { |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer containerizer(&exec); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, "1", "128", "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // The executor may not receive the ExecutorRegisteredMessage if the |
| // container is destroyed before that. |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .Times(AtMost(1)); |
| |
| // Set up the containerizer so update() will fail. |
| EXPECT_CALL(containerizer, update(_, _)) |
| .WillOnce(Return(Failure("update() failed"))) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)); |
| |
| driver.start(); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_LOST, status->state()); |
| EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); |
| EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status->reason()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test ensures that the slave will re-register with the master |
| // if it does not receive any pings after registering. |
| TEST_F(SlaveTest, PingTimeoutNoPings) |
| { |
| // Set shorter ping timeout values. |
| master::Flags masterFlags = CreateMasterFlags(); |
| masterFlags.slave_ping_timeout = Seconds(5); |
| masterFlags.max_slave_ping_timeouts = 2u; |
| Duration totalTimeout = |
| masterFlags.slave_ping_timeout * masterFlags.max_slave_ping_timeouts; |
| |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(masterFlags); |
| ASSERT_SOME(master); |
| |
| // Block all pings to the slave. |
| DROP_PROTOBUFS(PingSlaveMessage(), _, _); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| ASSERT_TRUE(slaveRegisteredMessage.get().has_connection()); |
| MasterSlaveConnection connection = slaveRegisteredMessage.get().connection(); |
| EXPECT_EQ(totalTimeout, Seconds(connection.total_ping_timeout_seconds())); |
| |
| // Ensure the slave processes the registration message and schedules |
| // the ping timeout, before we advance the clock. |
| Clock::pause(); |
| Clock::settle(); |
| |
| // Advance to the ping timeout to trigger a re-detection and |
| // re-registration. |
| Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| Clock::advance(totalTimeout); |
| |
| AWAIT_READY(detected); |
| AWAIT_READY(slaveReregisteredMessage); |
| } |
| |
| |
| // This test ensures that the slave will re-register with the master |
| // if it stops receiving pings. |
| TEST_F(SlaveTest, PingTimeoutSomePings) |
| { |
| // Start a master. |
| master::Flags masterFlags = CreateMasterFlags(); |
| Try<PID<Master>> master = StartMaster(masterFlags); |
| ASSERT_SOME(master); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| Clock::pause(); |
| |
| // Ensure a ping reaches the slave. |
| Future<Message> ping = FUTURE_MESSAGE( |
| Eq(PingSlaveMessage().GetTypeName()), _, _); |
| |
| Clock::advance(masterFlags.slave_ping_timeout); |
| |
| AWAIT_READY(ping); |
| |
| // Now block further pings from the master and advance |
| // the clock to trigger a re-detection and re-registration on |
| // the slave. |
| DROP_PROTOBUFS(PingSlaveMessage(), _, _); |
| |
| Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| Clock::advance(slave::DEFAULT_MASTER_PING_TIMEOUT()); |
| |
| AWAIT_READY(detected); |
| AWAIT_READY(slaveReregisteredMessage); |
| } |
| |
| |
| // This test ensures that when slave removal rate limit is specified |
| // a slave that fails health checks is removed after a permit is |
| // provided by the rate limiter. |
| TEST_F(SlaveTest, RateLimitSlaveShutdown) |
| { |
| // Start a master. |
| shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter()); |
| master::Flags masterFlags = CreateMasterFlags(); |
| Try<PID<Master>> master = StartMaster(slaveRemovalLimiter, masterFlags); |
| ASSERT_SOME(master); |
| |
| // Set these expectations up before we spawn the slave so that we |
| // don't miss the first PING. |
| Future<Message> ping = FUTURE_MESSAGE( |
| Eq(PingSlaveMessage().GetTypeName()), _, _); |
| |
| // Drop all the PONGs to simulate health check timeout. |
| DROP_PROTOBUFS(PongSlaveMessage(), _, _); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| // Return a pending future from the rate limiter. |
| Future<Nothing> acquire; |
| Promise<Nothing> promise; |
| EXPECT_CALL(*slaveRemovalLimiter, acquire()) |
| .WillOnce(DoAll(FutureSatisfy(&acquire), |
| Return(promise.future()))); |
| |
| Future<ShutdownMessage> shutdown = FUTURE_PROTOBUF(ShutdownMessage(), _, _); |
| |
| // Induce a health check failure of the slave. |
| Clock::pause(); |
| size_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == masterFlags.max_slave_ping_timeouts) { |
| Clock::advance(masterFlags.slave_ping_timeout); |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _); |
| Clock::advance(masterFlags.slave_ping_timeout); |
| } |
| |
| // The master should attempt to acquire a permit. |
| AWAIT_READY(acquire); |
| |
| // The shutdown should not occur before the permit is satisfied. |
| Clock::settle(); |
| ASSERT_TRUE(shutdown.isPending()); |
| |
| // Once the permit is satisfied, the shutdown message |
| // should be sent. |
| promise.set(Nothing()); |
| AWAIT_READY(shutdown); |
| } |
| |
| |
| // This test verifies that when a slave responds to pings after the |
| // slave observer has scheduled it for shutdown (due to health check |
| // failure), the shutdown is cancelled. |
| TEST_F(SlaveTest, CancelSlaveShutdown) |
| { |
| // Start a master. |
| shared_ptr<MockRateLimiter> slaveRemovalLimiter(new MockRateLimiter()); |
| master::Flags masterFlags = CreateMasterFlags(); |
| Try<PID<Master>> master = StartMaster(slaveRemovalLimiter, masterFlags); |
| ASSERT_SOME(master); |
| |
| // Set these expectations up before we spawn the slave so that we |
| // don't miss the first PING. |
| Future<Message> ping = FUTURE_MESSAGE( |
| Eq(PingSlaveMessage().GetTypeName()), _, _); |
| |
| // Drop all the PONGs to simulate health check timeout. |
| DROP_PROTOBUFS(PongSlaveMessage(), _, _); |
| |
| // No shutdown should occur during the test! |
| EXPECT_NO_FUTURE_PROTOBUFS(ShutdownMessage(), _, _); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| // Return a pending future from the rate limiter. |
| Future<Nothing> acquire; |
| Promise<Nothing> promise; |
| EXPECT_CALL(*slaveRemovalLimiter, acquire()) |
| .WillOnce(DoAll(FutureSatisfy(&acquire), |
| Return(promise.future()))); |
| |
| // Induce a health check failure of the slave. |
| Clock::pause(); |
| size_t pings = 0; |
| while (true) { |
| AWAIT_READY(ping); |
| pings++; |
| if (pings == masterFlags.max_slave_ping_timeouts) { |
| Clock::advance(masterFlags.slave_ping_timeout); |
| break; |
| } |
| ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _); |
| Clock::advance(masterFlags.slave_ping_timeout); |
| } |
| |
| // The master should attempt to acquire a permit. |
| AWAIT_READY(acquire); |
| |
| // Settle to make sure the shutdown does not occur. |
| Clock::settle(); |
| |
| // Reset the filters to allow pongs from the slave. |
| filter(NULL); |
| |
| // Advance clock enough to do a ping pong. |
| Clock::advance(masterFlags.slave_ping_timeout); |
| Clock::settle(); |
| |
| // The master should have tried to cancel the removal. |
| ASSERT_TRUE(promise.future().hasDiscard()); |
| |
| // Allow the cancelation and settle the clock to ensure a shutdown |
| // does not occur. |
| promise.discard(); |
| Clock::settle(); |
| } |
| |
| |
| // This test ensures that a killTask() can happen between runTask() |
| // and _runTask() and then gets "handled properly". This means that |
| // the task never gets started, but also does not get lost. The end |
| // result is status TASK_KILLED. Essentially, killing the task is |
| // realized while preparing to start it. See MESOS-947. This test |
| // removes the framework and proves that removeFramework() is |
| // called. See MESOS-1945. |
| TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer containerizer(&exec); |
| |
| StandaloneMasterDetector detector(master.get()); |
| |
| MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); |
| spawn(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .Times(0); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .Times(0); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(0); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillRepeatedly(FutureArg<1>(&status)); |
| |
| EXPECT_CALL(slave, runTask(_, _, _, _, _)) |
| .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask)); |
| |
| // Saved arguments from Slave::_runTask(). |
| Future<bool> future; |
| FrameworkInfo frameworkInfo; |
| |
| // Skip what Slave::_runTask() normally does, save its arguments for |
| // later, tie reaching the critical moment when to kill the task to |
| // a future. |
| Future<Nothing> _runTask; |
| EXPECT_CALL(slave, _runTask(_, _, _)) |
| .WillOnce(DoAll(FutureSatisfy(&_runTask), |
| SaveArg<0>(&future), |
| SaveArg<1>(&frameworkInfo))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(_runTask); |
| |
| // Since this is the only task ever for this framework, the |
| // framework should get removed in Slave::killTask(). |
| // Thus we can observe that this happens before Shutdown(). |
| Future<Nothing> removeFramework; |
| EXPECT_CALL(slave, removeFramework(_)) |
| .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework), |
| FutureSatisfy(&removeFramework))); |
| |
| Future<Nothing> killTask; |
| EXPECT_CALL(slave, killTask(_, _, _)) |
| .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask), |
| FutureSatisfy(&killTask))); |
| |
| driver.killTask(task.task_id()); |
| |
| AWAIT_READY(killTask); |
| slave.unmocked__runTask(future, frameworkInfo, task); |
| |
| AWAIT_READY(removeFramework); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_KILLED, status.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| terminate(slave); |
| wait(slave); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test verifies that when a slave re-registers with the master |
| // it correctly includes the latest and status update task states. |
| TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState) |
| { |
| // Start a master. |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| // Create a StandaloneMasterDetector to enable the slave to trigger |
| // re-registration later. |
| StandaloneMasterDetector detector(master.get()); |
| |
| // Start a slave. |
| Try<PID<Slave>> slave = StartSlave(&exec, &detector); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| // Signal when the first update is dropped. |
| Future<StatusUpdateMessage> statusUpdateMessage = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); |
| |
| Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate); |
| |
| driver.start(); |
| |
| // Pause the clock to avoid status update retries. |
| Clock::pause(); |
| |
| // Wait until TASK_RUNNING is sent to the master. |
| AWAIT_READY(statusUpdateMessage); |
| |
| // Ensure status update manager handles TASK_RUNNING update. |
| AWAIT_READY(___statusUpdate); |
| |
| Future<Nothing> ___statusUpdate2 = |
| FUTURE_DISPATCH(_, &Slave::___statusUpdate); |
| |
| // Now send TASK_FINISHED update. |
| TaskStatus finishedStatus; |
| finishedStatus = statusUpdateMessage.get().update().status(); |
| finishedStatus.set_state(TASK_FINISHED); |
| execDriver->sendStatusUpdate(finishedStatus); |
| |
| // Ensure status update manager handles TASK_FINISHED update. |
| AWAIT_READY(___statusUpdate2); |
| |
| Future<ReregisterSlaveMessage> reregisterSlaveMessage = |
| FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _); |
| |
| // Drop any updates to the failed over master. |
| DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()); |
| |
| // Simulate a new master detected event on the slave, |
| // so that the slave will do a re-registration. |
| detector.appoint(master.get()); |
| |
| // Capture and inspect the slave reregistration message. |
| AWAIT_READY(reregisterSlaveMessage); |
| |
| ASSERT_EQ(1, reregisterSlaveMessage.get().tasks_size()); |
| |
| // The latest state of the task should be TASK_FINISHED. |
| ASSERT_EQ(TASK_FINISHED, reregisterSlaveMessage.get().tasks(0).state()); |
| |
| // The status update state of the task should be TASK_RUNNING. |
| ASSERT_EQ(TASK_RUNNING, |
| reregisterSlaveMessage.get().tasks(0).status_update_state()); |
| |
| // The status update uuid should match the TASK_RUNNING's uuid. |
| ASSERT_EQ(statusUpdateMessage.get().update().uuid(), |
| reregisterSlaveMessage.get().tasks(0).status_update_uuid()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that the slave should properly handle the case |
| // where the containerizer usage call fails when getting the usage |
| // information. |
| TEST_F(SlaveTest, ContainerizerUsageFailure) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| TestContainerizer containerizer(&exec); |
| StandaloneMasterDetector detector(master.get()); |
| |
| MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); |
| spawn(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(_, _, _)); |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| Future<vector<Offer>> offers; |
| |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| const Offer& offer = offers.get()[0]; |
| |
| TaskInfo task = createTask( |
| offer.slave_id(), |
| Resources::parse("cpus:0.1;mem:32").get(), |
| "sleep 1000", |
| exec.id); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offer.id(), {task}); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Set up the containerizer so the next usage() will fail. |
| EXPECT_CALL(containerizer, usage(_)) |
| .WillOnce(Return(Failure("Injected failure"))); |
| |
| // We expect that the slave will still returns ResourceUsage but no |
| // statistics will be found. |
| Future<ResourceUsage> usage = slave.usage(); |
| |
| AWAIT_READY(usage); |
| ASSERT_EQ(1, usage.get().executors_size()); |
| EXPECT_FALSE(usage.get().executors(0).has_statistics()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| terminate(slave); |
| wait(slave); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that DiscoveryInfo and Port messages, set in TaskInfo, |
| // are exposed over the slave state endpoint. The test launches a task with |
| // the DiscoveryInfo and Port message fields populated. It then makes an HTTP |
| // request to the state endpoint of the slave and retrieves the JSON data from |
| // the endpoint. The test passes if the DiscoveryInfo and Port message data in |
| // JSON matches the corresponding data set in the TaskInfo used to launch the |
| // task. |
| TEST_F(SlaveTest, DiscoveryInfoAndPorts) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave>> slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); |
| |
| Labels labels1; |
| labels1.add_labels()->CopyFrom(createLabel("ACTION", "port:7987 DENY")); |
| |
| Labels labels2; |
| labels2.add_labels()->CopyFrom(createLabel("ACTION", "port:7789 PERMIT")); |
| |
| Ports ports; |
| Port* port1 = ports.add_ports(); |
| port1->set_number(80); |
| port1->mutable_labels()->CopyFrom(labels1); |
| |
| Port* port2 = ports.add_ports(); |
| port2->set_number(8081); |
| port2->mutable_labels()->CopyFrom(labels2); |
| |
| DiscoveryInfo discovery; |
| discovery.set_name("test_discovery"); |
| discovery.set_visibility(DiscoveryInfo::CLUSTER); |
| discovery.mutable_ports()->CopyFrom(ports); |
| |
| task.mutable_discovery()->CopyFrom(discovery); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| Future<Nothing> launchTask; |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(FutureSatisfy(&launchTask)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(launchTask); |
| |
| // Verify label key and value in slave state endpoint. |
| Future<Response> response = process::http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); |
| ASSERT_SOME(parse); |
| |
| Result<JSON::Object> discoveryResult = parse.get().find<JSON::Object>( |
| "frameworks[0].executors[0].tasks[0].discovery"); |
| EXPECT_SOME(discoveryResult); |
| |
| JSON::Object discoveryObject = discoveryResult.get(); |
| EXPECT_EQ(JSON::Object(JSON::protobuf(discovery)), discoveryObject); |
| |
| // Check the ports are set in the `DiscoveryInfo` object. |
| Result<JSON::Object> portResult1 = parse.get().find<JSON::Object>( |
| "frameworks[0].executors[0].tasks[0].discovery.ports.ports[0]"); |
| Result<JSON::Object> portResult2 = parse.get().find<JSON::Object>( |
| "frameworks[0].executors[0].tasks[0].discovery.ports.ports[1]"); |
| |
| EXPECT_SOME(portResult1); |
| EXPECT_SOME(portResult2); |
| |
| // Verify that the ports retrieved from state endpoint are the ones |
| // that were set. |
| EXPECT_EQ(JSON::Object(JSON::protobuf(*port1)), portResult1.get()); |
| EXPECT_EQ(JSON::Object(JSON::protobuf(*port2)), portResult2.get()); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test verifies that label values can be set for tasks and that |
| // they are exposed over the slave state endpoint. |
| TEST_F(SlaveTest, TaskLabels) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| TestContainerizer containerizer(&exec); |
| |
| Try<PID<Slave>> slave = StartSlave(&containerizer); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); |
| |
| // Add three labels to the task (two of which share the same key). |
| Labels* labels = task.mutable_labels(); |
| |
| labels->add_labels()->CopyFrom(createLabel("foo", "bar")); |
| labels->add_labels()->CopyFrom(createLabel("bar", "baz")); |
| labels->add_labels()->CopyFrom(createLabel("bar", "qux")); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .Times(1); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<Nothing> update; |
| EXPECT_CALL(containerizer, |
| update(_, Resources(offers.get()[0].resources()))) |
| .WillOnce(DoAll(FutureSatisfy(&update), |
| Return(Nothing()))); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| AWAIT_READY(update); |
| |
| // Verify label key and value in slave state endpoint. |
| Future<Response> response = process::http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); |
| ASSERT_SOME(parse); |
| |
| Result<JSON::Array> find = parse.get().find<JSON::Array>( |
| "frameworks[0].executors[0].tasks[0].labels"); |
| EXPECT_SOME(find); |
| |
| JSON::Array labelsObject = find.get(); |
| |
| // Verify the contents of 'foo:bar', 'bar:baz', and 'bar:qux' pairs. |
| EXPECT_EQ( |
| JSON::Value(JSON::protobuf(createLabel("foo", "bar"))), |
| labelsObject.values[0]); |
| EXPECT_EQ( |
| JSON::Value(JSON::protobuf(createLabel("bar", "baz"))), |
| labelsObject.values[1]); |
| EXPECT_EQ( |
| JSON::Value(JSON::protobuf(createLabel("bar", "qux"))), |
| labelsObject.values[2]); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test verifies that TaskStatus label values are exposed over |
| // the slave state endpoint. |
| TEST_F(SlaveTest, TaskStatusLabels) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave>> slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| Future<TaskInfo> execTask; |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(FutureArg<1>(&execTask)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(execTask); |
| |
| // Now send TASK_RUNNING update. |
| TaskStatus runningStatus; |
| runningStatus.mutable_task_id()->MergeFrom(execTask.get().task_id()); |
| runningStatus.set_state(TASK_RUNNING); |
| |
| // Add three labels to the task (two of which share the same key). |
| Labels* labels = runningStatus.mutable_labels(); |
| |
| labels->add_labels()->CopyFrom(createLabel("foo", "bar")); |
| labels->add_labels()->CopyFrom(createLabel("bar", "baz")); |
| labels->add_labels()->CopyFrom(createLabel("bar", "qux")); |
| |
| execDriver->sendStatusUpdate(runningStatus); |
| |
| AWAIT_READY(status); |
| |
| // Verify label key and value in master state endpoint. |
| Future<Response> response = process::http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); |
| ASSERT_SOME(parse); |
| |
| Result<JSON::Array> find = parse.get().find<JSON::Array>( |
| "frameworks[0].executors[0].tasks[0].statuses[0].labels"); |
| EXPECT_SOME(find); |
| |
| JSON::Array labelsObject = find.get(); |
| |
| // Verify the contents of 'foo:bar', 'bar:baz', and 'bar:qux' pairs. |
| EXPECT_EQ( |
| JSON::Value(JSON::protobuf(createLabel("foo", "bar"))), |
| labelsObject.values[0]); |
| EXPECT_EQ( |
| JSON::Value(JSON::protobuf(createLabel("bar", "baz"))), |
| labelsObject.values[1]); |
| EXPECT_EQ( |
| JSON::Value(JSON::protobuf(createLabel("bar", "qux"))), |
| labelsObject.values[2]); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // This test verifies that TaskStatus::container_status an is exposed over |
| // the slave state endpoint. |
| TEST_F(SlaveTest, TaskStatusContainerStatus) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| |
| Try<PID<Slave>> slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 100", DEFAULT_EXECUTOR_ID); |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&status)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(status); |
| |
| const string slaveIPAddress = stringify(slave.get().address.ip); |
| |
| // Validate that the Slave has passed in its IP address in |
| // TaskStatus.container_status.network_infos[0].ip_address. |
| EXPECT_TRUE(status.get().has_container_status()); |
| EXPECT_EQ(1, status.get().container_status().network_infos().size()); |
| EXPECT_TRUE( |
| status.get().container_status().network_infos(0).has_ip_address()); |
| EXPECT_EQ( |
| slaveIPAddress, |
| status.get().container_status().network_infos(0).ip_address()); |
| |
| // Now do the same validation with state endpoint. |
| Future<Response> response = process::http::get(slave.get(), "state"); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); |
| ASSERT_SOME(parse); |
| |
| // Validate that the IP address passed in by the Slave is available at the |
| // state endpoint. |
| ASSERT_SOME_EQ( |
| slaveIPAddress, |
| parse.get().find<JSON::String>( |
| "frameworks[0].executors[0].tasks[0].statuses[0]" |
| ".container_status.network_infos[0].ip_address")); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); // Must shutdown before 'containerizer' gets deallocated. |
| } |
| |
| |
| // Test that we can set the executors environment variables and it |
| // won't inhert the slaves. |
| TEST_F(SlaveTest, ExecutorEnvironmentVariables) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Need flags for 'executor_environment_variables'. |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>("{\"PATH\": \"/bin\"}"); |
| |
| ASSERT_SOME(parse); |
| |
| flags.executor_environment_variables = parse.get(); |
| |
| Try<PID<Slave>> slave = StartSlave(flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| // Launch a task with the command executor. |
| TaskInfo task; |
| task.set_name(""); |
| task.mutable_task_id()->set_value("1"); |
| task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); |
| task.mutable_resources()->MergeFrom(offers.get()[0].resources()); |
| |
| // Command executor will run as user running test. |
| CommandInfo command; |
| command.set_shell(true); |
| command.set_value("test $PATH = /bin"); |
| |
| task.mutable_command()->MergeFrom(command); |
| |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Scheduler should first receive TASK_RUNNING followed by the |
| // TASK_FINISHED from the executor. |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that the slave should properly show total slave |
| // resources. |
| TEST_F(SlaveTest, TotalSlaveResourcesIncludedInUsage) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| TestContainerizer containerizer; |
| StandaloneMasterDetector detector(master.get()); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = "cpus:2;mem:1024;disk:1024;ports:[31000-32000]"; |
| |
| MockSlave slave(flags, &detector, &containerizer); |
| spawn(slave); |
| |
| Clock::pause(); |
| |
| // Wait for slave to be initialized. |
| Clock::settle(); |
| |
| // We expect that the slave will return ResourceUsage with |
| // total resources reported. |
| Future<ResourceUsage> usage = slave.usage(); |
| |
| AWAIT_READY(usage); |
| |
| // Total resources should match the resources from flag.resources. |
| EXPECT_EQ(Resources(usage.get().total()), |
| Resources::parse(flags.resources.get()).get()); |
| |
| terminate(slave); |
| wait(slave); |
| |
| Shutdown(); |
| } |
| |
| |
| // This test verifies that the slave should properly show total slave |
| // resources with checkpointed resources applied. |
| TEST_F(SlaveTest, CheckpointedResourcesIncludedInUsage) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| TestContainerizer containerizer; |
| StandaloneMasterDetector detector(master.get()); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = "cpus:2;cpus(role1):3;mem:1024;disk:1024;disk(role1):64;" |
| "ports:[31000-32000]"; |
| |
| MockSlave slave(flags, &detector, &containerizer); |
| spawn(slave); |
| |
| Clock::pause(); |
| |
| // Wait for slave to be initialized. |
| Clock::settle(); |
| |
| Resource dynamicReservation = Resources::parse("cpus", "1", "role1").get(); |
| dynamicReservation.mutable_reservation()->CopyFrom( |
| createReservationInfo("principal")); |
| |
| Resource persistentVolume = createPersistentVolume( |
| Megabytes(64), |
| "role1", |
| "id1", |
| "path1"); |
| |
| vector<Resource> checkpointedResources = |
| {dynamicReservation, persistentVolume}; |
| |
| // Add checkpointed resources. |
| slave.checkpointResources(checkpointedResources); |
| |
| // We expect that the slave will return ResourceUsage with |
| // total and checkpointed slave resources reported. |
| Future<ResourceUsage> usage = slave.usage(); |
| |
| AWAIT_READY(usage); |
| |
| Resources usageTotalResources(usage.get().total()); |
| |
| // Reported total field should contain persistent volumes and dynamic |
| // reservations. |
| EXPECT_EQ(usageTotalResources.persistentVolumes(), persistentVolume); |
| EXPECT_TRUE(usageTotalResources.contains(dynamicReservation)); |
| |
| terminate(slave); |
| wait(slave); |
| |
| Shutdown(); |
| } |
| |
| |
| // Ensures that the slave correctly handles a framework without |
| // a pid, which will be the case for HTTP schedulers. In |
| // particular, executor messages should be routed through the |
| // master. |
| TEST_F(SlaveTest, HTTPScheduler) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| Try<PID<Slave>> slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Capture the run task message to unset the framework pid. |
| Future<RunTaskMessage> runTaskMessage = |
| DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get()); |
| |
| driver.start(); |
| |
| AWAIT_READY(runTaskMessage); |
| |
| EXPECT_CALL(exec, registered(_, _, _, _)); |
| |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(SendFrameworkMessage("message")); |
| |
| // The slave should forward the message through the master. |
| Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 = |
| FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get()); |
| |
| // The master should then forward the message to the framework. |
| Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 = |
| FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _); |
| |
| Future<Nothing> frameworkMessage; |
| EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message")) |
| .WillOnce(FutureSatisfy(&frameworkMessage)); |
| |
| // Clear the pid in the run task message so that the slave |
| // thinks this is an HTTP scheduler. |
| RunTaskMessage spoofed = runTaskMessage.get(); |
| spoofed.set_pid(""); |
| |
| process::post(master.get(), slave.get(), spoofed); |
| |
| AWAIT_READY(executorToFrameworkMessage1); |
| AWAIT_READY(executorToFrameworkMessage2); |
| |
| AWAIT_READY(frameworkMessage); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| // Must call shutdown before the mock executor gets deallocated. |
| Shutdown(); |
| } |
| |
| |
| // Ensures that the slave correctly handles a framework upgrading |
| // to HTTP (going from having a pid, to not having a pid). In |
| // particular, executor messages should be routed through the |
| // master. |
| TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade) |
| { |
| Try<PID<Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockExecutor exec(DEFAULT_EXECUTOR_ID); |
| Try<PID<Slave>> slave = StartSlave(&exec); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| ExecutorDriver* execDriver; |
| EXPECT_CALL(exec, registered(_, _, _, _)) |
| .WillOnce(SaveArg<0>(&execDriver)); |
| |
| Future<Nothing> launchTask; |
| EXPECT_CALL(exec, launchTask(_, _)) |
| .WillOnce(FutureSatisfy(&launchTask)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(launchTask); |
| |
| // Now spoof a live upgrade of the framework by updating |
| // the framework information to have an empty pid. |
| UpdateFrameworkMessage updateFrameworkMessage; |
| updateFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId.get()); |
| updateFrameworkMessage.set_pid(""); |
| |
| process::post(master.get(), slave.get(), updateFrameworkMessage); |
| |
| // Send a message from the executor; the slave should forward |
| // the message through the master. |
| Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 = |
| FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get()); |
| |
| Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 = |
| FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _); |
| |
| Future<Nothing> frameworkMessage; |
| EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message")) |
| .WillOnce(FutureSatisfy(&frameworkMessage)); |
| |
| execDriver->sendFrameworkMessage("message"); |
| |
| AWAIT_READY(executorToFrameworkMessage1); |
| AWAIT_READY(executorToFrameworkMessage2); |
| |
| AWAIT_READY(frameworkMessage); |
| |
| EXPECT_CALL(exec, shutdown(_)) |
| .Times(AtMost(1)); |
| |
| driver.stop(); |
| driver.join(); |
| |
| // Must call shutdown before the mock executor gets deallocated. |
| Shutdown(); |
| } |
| |
| |
| // Ensures that the slave can restart when there is an empty |
| // framework pid. Executor messages should go through the |
| // master (instead of directly to the scheduler!). |
| TEST_F(SlaveTest, HTTPSchedulerSlaveRestart) |
| { |
| Try<PID<Master>> master = this->StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = this->CreateSlaveFlags(); |
| |
| Fetcher fetcher; |
| |
| Try<slave::MesosContainerizer*> containerizer = |
| slave::MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(containerizer); |
| |
| Try<PID<Slave>> slave = this->StartSlave(containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| FrameworkID frameworkId; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(SaveArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| // Capture the executor information. |
| Future<Message> registerExecutorMessage = |
| FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| AWAIT_READY(offers); |
| EXPECT_NE(0u, offers.get().size()); |
| |
| SlaveID slaveId = offers.get()[0].slave_id(); |
| |
| // Capture the run task so that we can unset the framework pid. |
| Future<RunTaskMessage> runTaskMessage = |
| DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get()); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(runTaskMessage); |
| |
| // Clear the pid in the run task message so that the slave |
| // thinks this is an HTTP scheduler. |
| RunTaskMessage spoofedRunTaskMessage = runTaskMessage.get(); |
| spoofedRunTaskMessage.set_pid(""); |
| |
| process::post(master.get(), slave.get(), spoofedRunTaskMessage); |
| |
| AWAIT_READY(registerExecutorMessage); |
| |
| RegisterExecutorMessage registerExecutor; |
| registerExecutor.ParseFromString(registerExecutorMessage.get().body); |
| ExecutorID executorId = registerExecutor.executor_id(); |
| UPID executorPid = registerExecutorMessage.get().from; |
| |
| AWAIT_READY(status); |
| EXPECT_EQ(TASK_RUNNING, status.get().state()); |
| |
| // Restart the slave. |
| Stop(slave.get()); |
| |
| Try<slave::MesosContainerizer*> containerizer2 = |
| slave::MesosContainerizer::create(flags, true, &fetcher); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Capture this so that we can unset the framework pid. |
| Future<UpdateFrameworkMessage> updateFrameworkMessage = |
| DROP_PROTOBUF(UpdateFrameworkMessage(), _, _); |
| |
| // Ensure that there will be no reregistration retries from the |
| // slave resulting in another UpdateFrameworkMessage from master. |
| Clock::pause(); |
| |
| slave = StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::settle(); |
| |
| // Ensure the slave considers itself recovered. |
| Clock::advance(slave::EXECUTOR_REREGISTER_TIMEOUT); |
| |
| Clock::resume(); |
| |
| AWAIT_READY(slaveReregisteredMessage); |
| AWAIT_READY(updateFrameworkMessage); |
| |
| // Make sure the slave sees an empty framework pid after recovery. |
| UpdateFrameworkMessage spoofedUpdateFrameworkMessage = |
| updateFrameworkMessage.get(); |
| spoofedUpdateFrameworkMessage.set_pid(""); |
| |
| process::post(master.get(), slave.get(), spoofedUpdateFrameworkMessage); |
| |
| // Spoof a message from the executor, to ensure the slave |
| // sends it through the master (instead of directly to the |
| // scheduler driver!). |
| Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 = |
| FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get()); |
| |
| Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 = |
| FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _); |
| |
| Future<Nothing> frameworkMessage; |
| EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message")) |
| .WillOnce(FutureSatisfy(&frameworkMessage)); |
| |
| ExecutorToFrameworkMessage executorToFrameworkMessage; |
| executorToFrameworkMessage.mutable_slave_id()->CopyFrom(slaveId); |
| executorToFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId); |
| executorToFrameworkMessage.mutable_executor_id()->CopyFrom(executorId); |
| executorToFrameworkMessage.set_data("message"); |
| |
| process::post(executorPid, slave.get(), executorToFrameworkMessage); |
| |
| AWAIT_READY(executorToFrameworkMessage1); |
| AWAIT_READY(executorToFrameworkMessage2); |
| AWAIT_READY(frameworkMessage); |
| |
| driver.stop(); |
| driver.join(); |
| |
| this->Shutdown(); |
| |
| delete containerizer.get(); |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |