blob: 69be28f6e82b99e23424bd2be8294f715d8040d4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unistd.h>
#include <gmock/gmock.h>
#include <map>
#include <string>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/io.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/subprocess.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
#include "master/flags.hpp"
#include "master/master.hpp"
#include "slave/constants.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
using mesos::internal::master::Master;
using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Slave;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::MesosContainerizerProcess;
using process::Clock;
using process::Future;
using process::Message;
using process::Owned;
using process::PID;
using std::map;
using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::Return;
// Those of the overall Mesos master/slave/scheduler/driver tests
// that seem vaguely more slave than master-related are in this file.
// The others are in "master_tests.cpp".
class SlaveTest : public MesosTest {};
TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
// Need flags for 'executor_registration_timeout'.
slave::Flags flags = CreateSlaveFlags();
// Set the isolation flag so we know a MesoContainerizer will be created.
flags.isolation = "posix/cpu,posix/mem";
Try<MesosContainerizer*> containerizer =
MesosContainerizer::create(flags, false);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
// Launch a task with the command executor.
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
CommandInfo command;
command.set_value("sleep 10");
task.mutable_command()->MergeFrom(command);
vector<TaskInfo> tasks;
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
Future<process::Message> registerExecutor =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(registerExecutor);
Clock::pause();
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// Ensure that the slave times out and kills the executor.
Future<Nothing> destroyExecutor =
FUTURE_DISPATCH(_, &MesosContainerizerProcess::destroy);
Clock::advance(flags.executor_registration_timeout);
AWAIT_READY(destroyExecutor);
Clock::settle(); // Wait for Containerizer::destroy to complete.
// Now advance time until the reaper reaps the executor.
while (status.isPending()) {
Clock::advance(Seconds(1));
Clock::settle();
}
AWAIT_READY(status);
ASSERT_EQ(TASK_FAILED, status.get().state());
Clock::resume();
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
// This test verifies that when an executor terminates before
// registering with slave, it is properly cleaned up.
TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave> > slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
Future<process::Message> registerExecutorMessage =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(registerExecutorMessage);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
Future<Nothing> schedule =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Now kill the executor.
containerizer.destroy(offers.get()[0].framework_id(), DEFAULT_EXECUTOR_ID);
AWAIT_READY(status);
EXPECT_EQ(TASK_LOST, status.get().state());
// We use 'gc.schedule' as a signal for the executor being cleaned
// up by the slave.
AWAIT_READY(schedule);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
// Test that we can run the mesos-executor and specify an "override"
// command to use via the --override argument.
TEST_F(SlaveTest, MesosExecutorWithOverride)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
TestContainerizer containerizer;
Try<PID<Slave> > slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
// Launch a task with the command executor.
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
CommandInfo command;
command.set_value("sleep 10");
task.mutable_command()->MergeFrom(command);
vector<TaskInfo> tasks;
tasks.push_back(task);
// Expect the launch and just assume it was sucessful since we'll be
// launching the executor ourselves manually below.
Future<Nothing> launch;
EXPECT_CALL(containerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureSatisfy(&launch),
Return(true)));
// Expect wait after launch is called but don't return anything
// until after we've finished everything below.
Future<Nothing> wait;
process::Promise<containerizer::Termination> promise;
EXPECT_CALL(containerizer, wait(_))
.WillOnce(DoAll(FutureSatisfy(&wait),
Return(promise.future())));
driver.launchTasks(offers.get()[0].id(), tasks);
// Once we get the launch the mesos-executor with --override.
AWAIT_READY(launch);
// Set up fake environment for executor.
map<string, string> environment;
environment["MESOS_SLAVE_PID"] = stringify(slave.get());
environment["MESOS_SLAVE_ID"] = stringify(offers.get()[0].slave_id());
environment["MESOS_FRAMEWORK_ID"] = stringify(offers.get()[0].framework_id());
environment["MESOS_EXECUTOR_ID"] = stringify(task.task_id());
environment["MESOS_DIRECTORY"] = "";
// Create temporary file to store validation string. If command is
// succesfully replaced, this file will end up containing the string
// 'Hello World\n'. Otherwise, the original task command i.e.
// 'sleep' will be called and the test will fail.
Try<std::string> file = os::mktemp();
ASSERT_SOME(file);
string executorCommand =
path::join(tests::flags.build_dir, "src", "mesos-executor") +
" --override -- /bin/sh -c 'echo hello world >" + file.get() + "'";
// Expect two status updates, one for once the mesos-executor says
// the task is running and one for after our overridden command
// above finishes.
Future<TaskStatus> status1, status2;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
Try<process::Subprocess> executor =
process::subprocess(
executorCommand,
process::Subprocess::PIPE(),
process::Subprocess::PIPE(),
process::Subprocess::PIPE(),
environment);
ASSERT_SOME(executor);
// Scheduler should receive the TASK_RUNNING update.
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1.get().state());
AWAIT_READY(status2);
ASSERT_EQ(TASK_FINISHED, status2.get().state());
AWAIT_READY(wait);
containerizer::Termination termination;
termination.set_killed(false);
termination.set_message("Killed executor");
termination.set_status(0);
promise.set(termination);
driver.stop();
driver.join();
AWAIT_READY(executor.get().status());
// Verify file contents.
Try<std::string> validate = os::read(file.get());
ASSERT_SOME(validate);
EXPECT_EQ(validate.get(), "hello world\n");
os::rm(file.get());
Shutdown();
}
// This test runs a command without the command user field set. The
// command will verify the assumption that the command is run as the
// slave user (in this case, root).
TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
// Need flags for 'executor_registration_timeout'.
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
Try<MesosContainerizer*> containerizer =
MesosContainerizer::create(flags, false);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
// Launch a task with the command executor.
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
Result<string> user = os::user();
CHECK_SOME(user) << "Failed to get current user name"
<< (user.isError() ? ": " + user.error() : "");
// Command executor will run as user running test.
CommandInfo command;
command.set_value("test `whoami` = " + user.get());
task.mutable_command()->MergeFrom(command);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
// This test runs a command _with_ the command user field set. The
// command will verify the assumption that the command is run as the
// specified user. We use (and assume the precense) of the
// unprivileged 'nobody' user which should be available on both Linux
// and Mac OS X.
TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser)
{
// TODO(nnielsen): Introduce STOUT abstraction for user verification
// instead of flat getpwnam call.
const string testUser = "nobody";
if (::getpwnam(testUser.c_str()) == NULL) {
LOG(WARNING) << "Cannot run ROOT_RunTaskWithCommandInfoWithUser test:"
<< " user '" << testUser << "' is not present";
return;
}
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
// Need flags for 'executor_registration_timeout'.
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "posix/cpu,posix/mem";
Try<MesosContainerizer*> containerizer =
MesosContainerizer::create(flags, false);
CHECK_SOME(containerizer);
Try<PID<Slave> > slave = StartSlave(containerizer.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
// Launch a task with the command executor.
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
CommandInfo command;
command.set_value("test `whoami` = " + testUser);
command.set_user(testUser);
task.mutable_command()->MergeFrom(command);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
// This test ensures that a status update acknowledgement from a
// non-leading master is ignored.
TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave = StartSlave(&exec);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver schedDriver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&schedDriver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&schedDriver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// We need to grab this message to get the scheduler's pid.
Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
schedDriver.start();
AWAIT_READY(frameworkRegisteredMessage);
const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ExecutorDriver*> execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> update;
EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
.WillOnce(FutureArg<1>(&update));
// Pause the clock to prevent status update retries on the slave.
Clock::pause();
// Intercept the acknowledgement sent to the slave so that we can
// spoof the master's pid.
Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage =
DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(),
master.get(),
slave.get());
Future<Nothing> _statusUpdateAcknowledgement =
FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
schedDriver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(update);
EXPECT_EQ(TASK_RUNNING, update.get().state());
AWAIT_READY(acknowledgementMessage);
// Send the acknowledgement to the slave with a non-leading master.
process::post(
process::UPID("master@localhost:1"),
slave.get(),
acknowledgementMessage.get());
// Make sure the acknowledgement was ignored.
Clock::settle();
ASSERT_TRUE(_statusUpdateAcknowledgement.isPending());
// Make sure the status update gets retried because the slave
// ignored the acknowledgement.
Future<TaskStatus> retriedUpdate;
EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
.WillOnce(FutureArg<1>(&retriedUpdate));
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(retriedUpdate);
// Ensure the slave receives and properly handles the ACK.
// Clock::settle() ensures that the slave successfully
// executes Slave::_statusUpdateAcknowledgement().
AWAIT_READY(_statusUpdateAcknowledgement);
Clock::settle();
Clock::resume();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
schedDriver.stop();
schedDriver.join();
Shutdown();
}
TEST_F(SlaveTest, MetricsInStatsEndpoint)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
Future<process::http::Response> response =
process::http::get(slave.get(), "stats.json");
AWAIT_READY(response);
EXPECT_SOME_EQ(
"application/json",
response.get().headers.get("Content-Type"));
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
ASSERT_SOME(parse);
JSON::Object stats = parse.get();
EXPECT_EQ(1u, stats.values.count("slave/uptime_secs"));
EXPECT_EQ(1u, stats.values.count("slave/registered"));
EXPECT_EQ(1u, stats.values.count("slave/recovery_errors"));
EXPECT_EQ(1u, stats.values.count("slave/frameworks_active"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_staging"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_starting"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_running"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_finished"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_failed"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_killed"));
EXPECT_EQ(1u, stats.values.count("slave/tasks_lost"));
EXPECT_EQ(1u, stats.values.count("slave/executors_registering"));
EXPECT_EQ(1u, stats.values.count("slave/executors_running"));
EXPECT_EQ(1u, stats.values.count("slave/executors_terminating"));
EXPECT_EQ(1u, stats.values.count("slave/executors_terminated"));
EXPECT_EQ(1u, stats.values.count("slave/valid_status_updates"));
EXPECT_EQ(1u, stats.values.count("slave/invalid_status_updates"));
EXPECT_EQ(1u, stats.values.count("slave/valid_framework_messages"));
EXPECT_EQ(1u, stats.values.count("slave/invalid_framework_messages"));
Shutdown();
}
// This test ensures that when a slave is shutting down, it will not
// try to re-register with the master.
TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
{
// Start a master.
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
// Create a MockExecutor to enable us to catch
// ShutdownExecutorMessage later.
MockExecutor exec(DEFAULT_EXECUTOR_ID);
// Create a StandaloneMasterDetector to enable the slave to trigger
// re-registration later.
StandaloneMasterDetector detector(master.get());
slave::Flags flags = CreateSlaveFlags();
// Make the executor_shutdown_grace_period to be much longer than
// REGISTER_RETRY_INTERVAL, so that the slave will at least call
// call doReliableRegistration() once before the slave is actually
// terminated.
flags.executor_shutdown_grace_period = slave::REGISTER_RETRY_INTERVAL_MAX * 2;
// Start a slave.
Try<PID<Slave> > slave = StartSlave(&exec, &detector, flags);
ASSERT_SOME(slave);
// Create a task on the slave.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
// Launch a task that uses less resource than the
// default(cpus:2, mem:1024).
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.start();
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
// Pause the clock here so that after detecting a new master,
// the slave will not send multiple reregister messages
// before we change its state to TERMINATING.
Clock::pause();
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
DROP_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get());
// Simulate a new master detected event on the slave,
// so that the slave will do a re-registration.
detector.appoint(master.get());
// Make sure the slave has entered doReliableRegistration()
// before we change the slave's state.
AWAIT_READY(slaveReregisteredMessage);
// Setup an expectation that the master should not receive any
// ReregisterSlaveMessage in the future.
EXPECT_NO_FUTURE_PROTOBUFS(
ReregisterSlaveMessage(), slave.get(), master.get());
// Drop the ShutdownExecutorMessage, so that the slave will
// stay in TERMINATING for a while.
DROP_PROTOBUFS(ShutdownExecutorMessage(), slave.get(), _);
// Send a ShutdownMessage instead of calling Stop() directly
// to avoid blocking.
process::post(master.get(), slave.get(), ShutdownMessage());
// Advance the clock to trigger doReliableRegistration().
Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX * 2);
Clock::settle();
Clock::resume();
// Clean up.
driver.stop();
driver.join();
Shutdown();
}
// This test verifies the slave will destroy a container if, when
// receiving a terminal status task update, updating the container's
// resources fails.
TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
{
// Start a master.
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
EXPECT_CALL(exec, registered(_, _, _, _));
TestContainerizer containerizer(&exec);
// Start a slave.
Try<PID<Slave> > slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
Offer offer = offers.get()[0];
// Start two tasks.
vector<TaskInfo> tasks;
tasks.push_back(createTask(
offer.slave_id(),
Resources::parse("cpus:0.1;mem:32").get(),
"sleep 1000",
exec.id));
tasks.push_back(createTask(
offer.slave_id(),
Resources::parse("cpus:0.1;mem:32").get(),
"sleep 1000",
exec.id));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status1, status2, status3, status4;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillOnce(FutureArg<1>(&status3))
.WillOnce(FutureArg<1>(&status4));
driver.launchTasks(offer.id(), tasks);
AWAIT_READY(status1);
EXPECT_EQ(TASK_RUNNING, status1.get().state());
AWAIT_READY(status2);
EXPECT_EQ(TASK_RUNNING, status2.get().state());
// Set up the containerizer so the next update() will fail.
EXPECT_CALL(containerizer, update(_, _))
.WillOnce(Return(process::Failure("update() failed")))
.WillRepeatedly(Return(Nothing()));
EXPECT_CALL(exec, killTask(_, _))
.WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
// Kill one of the tasks. The failed update should result in the
// second task going lost when the container is destroyed.
driver.killTask(tasks[0].task_id());
AWAIT_READY(status3);
EXPECT_EQ(TASK_KILLED, status3.get().state());
AWAIT_READY(status4);
EXPECT_EQ(TASK_LOST, status4.get().state());
driver.stop();
driver.join();
Shutdown();
}
// This test ensures that the slave will re-register with the master
// if it does not receive any pings after registering.
TEST_F(SlaveTest, PingTimeoutNoPings)
{
// Start a master.
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
// Block all pings to the slave.
DROP_MESSAGES(Eq("PING"), _, _);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
// Start a slave.
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
// Ensure the slave processes the registration message and schedules
// the ping timeout, before we advance the clock.
Clock::pause();
Clock::settle();
// Advance to the ping timeout to trigger a re-detection and
// re-registration.
Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected);
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
Clock::advance(slave::MASTER_PING_TIMEOUT());
AWAIT_READY(detected);
AWAIT_READY(slaveReregisteredMessage);
}
// This test ensures that the slave will re-register with the master
// if it stops receiving pings.
TEST_F(SlaveTest, PingTimeoutSomePings)
{
// Start a master.
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
// Start a slave.
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
Clock::pause();
// Ensure a ping reaches the slave.
Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
Clock::advance(master::SLAVE_PING_TIMEOUT);
AWAIT_READY(ping);
// Now block further pings from the master and advance
// the clock to trigger a re-detection and re-registration on
// the slave.
DROP_MESSAGES(Eq("PING"), _, _);
Future<Nothing> detected = FUTURE_DISPATCH(_, &Slave::detected);
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
Clock::advance(slave::MASTER_PING_TIMEOUT());
AWAIT_READY(detected);
AWAIT_READY(slaveReregisteredMessage);
}