blob: c50384232d653c1d1c45e6c60dd52487c7a821de [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unistd.h>
#include <gmock/gmock.h>
#include <map>
#include <string>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/pid.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
#include "master/flags.hpp"
#include "master/master.hpp"
#include "slave/constants.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/process_isolator.hpp"
#include "slave/slave.hpp"
#include "tests/isolator.hpp"
#include "tests/mesos.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
using mesos::internal::master::Master;
using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Isolator;
using mesos::internal::slave::ProcessIsolator;
using mesos::internal::slave::Slave;
using process::Clock;
using process::Future;
using process::PID;
using std::map;
using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::Return;
class MasterTest : public MesosTest {};
TEST_F(MasterTest, TaskRunning)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestingIsolator isolator(&exec);
Try<PID<Slave> > slave = StartSlave(&isolator);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<Nothing> resourcesChanged;
EXPECT_CALL(isolator,
resourcesChanged(_, _, Resources(offers.get()[0].resources())))
.WillOnce(FutureSatisfy(&resourcesChanged));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
AWAIT_READY(resourcesChanged);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestingIsolator isolator(&exec);
slave::Flags flags = CreateSlaveFlags();
flags.executor_shutdown_grace_period = Seconds(0);
Try<PID<Slave> > slave = StartSlave(&isolator, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<Nothing> resourcesChanged;
EXPECT_CALL(isolator,
resourcesChanged(_, _, Resources(offers.get()[0].resources())))
.WillOnce(FutureSatisfy(&resourcesChanged));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
AWAIT_READY(resourcesChanged);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
TEST_F(MasterTest, KillTask)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave = StartSlave(&exec);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskID taskId;
taskId.set_value("1");
TaskInfo task;
task.set_name("");
task.mutable_task_id()->MergeFrom(taskId);
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
EXPECT_CALL(exec, killTask(_, _))
.WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.killTask(taskId);
AWAIT_READY(status);
EXPECT_EQ(TASK_KILLED, status.get().state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
TEST_F(MasterTest, StatusUpdateAck)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave = StartSlave(&exec);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<StatusUpdateAcknowledgementMessage> acknowledgement =
FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, Eq(slave.get()));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
// Ensure the slave gets a status update ACK.
AWAIT_READY(acknowledgement);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
TEST_F(MasterTest, RecoverResources)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestingIsolator isolator(&exec);
slave::Flags flags = CreateSlaveFlags();
flags.resources = Option<string>(
"cpus:2;mem:1024;disk:1024;ports:[1-10, 20-30]");
Try<PID<Slave> > slave = StartSlave(&isolator, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorInfo executorInfo;
executorInfo.MergeFrom(DEFAULT_EXECUTOR_INFO);
Resources executorResources =
Resources::parse("cpus:0.3;mem:200;ports:[5-8, 23-25]");
executorInfo.mutable_resources()->MergeFrom(executorResources);
TaskID taskId;
taskId.set_value("1");
Resources taskResources = offers.get()[0].resources() - executorResources;
TaskInfo task;
task.set_name("");
task.mutable_task_id()->MergeFrom(taskId);
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(taskResources);
task.mutable_executor()->MergeFrom(executorInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
EXPECT_CALL(exec, killTask(_, _))
.WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// Scheduler should get an offer for killed task's resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.killTask(taskId);
AWAIT_READY(status);
EXPECT_EQ(TASK_KILLED, status.get().state());
driver.reviveOffers(); // Don't wait till the next allocation.
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_EQ(taskResources, offer.resources());
driver.declineOffer(offer.id());
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
// Now kill the executor, scheduler should get an offer it's resources.
// TODO(benh): WTF? Why aren't we dispatching?
isolator.killExecutor(offer.framework_id(), executorInfo.executor_id());
// TODO(benh): We can't do driver.reviveOffers() because we need to
// wait for the killed executors resources to get aggregated! We
// should wait for the allocator to recover the resources first. See
// the allocator tests for inspiration.
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
Resources slaveResources = Resources::parse(flags.resources.get());
EXPECT_EQ(slaveResources, offers.get()[0].resources());
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
TEST_F(MasterTest, FrameworkMessage)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave = StartSlave(&exec);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver schedDriver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&schedDriver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&schedDriver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
schedDriver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ExecutorDriver*> execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
.WillOnce(FutureArg<1>(&status));
schedDriver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
Future<string> execData;
EXPECT_CALL(exec, frameworkMessage(_, _))
.WillOnce(FutureArg<1>(&execData));
schedDriver.sendFrameworkMessage(
DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello");
AWAIT_READY(execData);
EXPECT_EQ("hello", execData.get());
Future<string> schedData;
EXPECT_CALL(sched, frameworkMessage(&schedDriver, _, _, _))
.WillOnce(FutureArg<3>(&schedData));
execDriver.get()->sendFrameworkMessage("world");
AWAIT_READY(schedData);
EXPECT_EQ("world", schedData.get());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
schedDriver.stop();
schedDriver.join();
Shutdown();
}
TEST_F(MasterTest, MultipleExecutors)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
ExecutorID executorId1;
executorId1.set_value("executor-1");
ExecutorID executorId2;
executorId2.set_value("executor-2");
MockExecutor exec1(executorId1);
MockExecutor exec2(executorId2);
map<ExecutorID, Executor*> execs;
execs[executorId1] = &exec1;
execs[executorId2] = &exec2;
TestingIsolator isolator(execs);
Try<PID<Slave> > slave = StartSlave(&isolator);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
ExecutorInfo executor1; // Bug in gcc 4.1.*, must assign on next line.
executor1 = CREATE_EXECUTOR_INFO(executorId1, "exit 1");
TaskInfo task1;
task1.set_name("");
task1.mutable_task_id()->set_value("1");
task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
task1.mutable_executor()->MergeFrom(executor1);
ExecutorInfo executor2; // Bug in gcc 4.1.*, must assign on next line.
executor2 = CREATE_EXECUTOR_INFO(executorId2, "exit 1");
TaskInfo task2;
task2.set_name("");
task2.mutable_task_id()->set_value("2");
task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:512"));
task2.mutable_executor()->MergeFrom(executor2);
vector<TaskInfo> tasks;
tasks.push_back(task1);
tasks.push_back(task2);
EXPECT_CALL(exec1, registered(_, _, _, _))
.Times(1);
Future<TaskInfo> exec1Task;
EXPECT_CALL(exec1, launchTask(_, _))
.WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
FutureArg<1>(&exec1Task)));
EXPECT_CALL(exec2, registered(_, _, _, _))
.Times(1);
Future<TaskInfo> exec2Task;
EXPECT_CALL(exec2, launchTask(_, _))
.WillOnce(DoAll(SendStatusUpdateFromTask(TASK_RUNNING),
FutureArg<1>(&exec2Task)));
Future<TaskStatus> status1, status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(exec1Task);
EXPECT_EQ(task1.task_id(), exec1Task.get().task_id());
AWAIT_READY(exec2Task);
EXPECT_EQ(task2.task_id(), exec2Task.get().task_id());
AWAIT_READY(status1);
EXPECT_EQ(TASK_RUNNING, status1.get().state());
AWAIT_READY(status2);
EXPECT_EQ(TASK_RUNNING, status2.get().state());
EXPECT_CALL(exec1, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(exec2, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
TEST_F(MasterTest, ShutdownUnregisteredExecutor)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
ProcessIsolator isolator;
// Need flags for 'executor_registration_timeout'.
slave::Flags flags = CreateSlaveFlags();
Try<PID<Slave> > slave = StartSlave(&isolator);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
// Launch a task with the command executor.
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
CommandInfo command;
command.set_value("sleep 10");
task.mutable_command()->MergeFrom(command);
vector<TaskInfo> tasks;
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
Future<process::Message> registerExecutor =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(registerExecutor);
Clock::pause();
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// Ensure that the slave times out and kills the executor.
Future<Nothing> killExecutor =
FUTURE_DISPATCH(_, &Isolator::killExecutor);
Clock::advance(flags.executor_registration_timeout);
AWAIT_READY(killExecutor);
Clock::settle(); // Wait for ProcessIsolator::killExecutor to complete.
// Now advance time until the reaper reaps the executor.
while (status.isPending()) {
Clock::advance(Seconds(1));
Clock::settle();
}
AWAIT_READY(status);
ASSERT_EQ(TASK_FAILED, status.get().state());
Clock::resume();
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
// This test verifies that when an executor terminates before
// registering with slave, it is properly cleaned up.
TEST_F(MasterTest, RemoveUnregisteredTerminatedExecutor)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestingIsolator isolator(&exec);
Try<PID<Slave> > slave = StartSlave(&isolator);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
// Drop the registration message from the executor to the slave.
Future<process::Message> registerExecutorMessage =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(registerExecutorMessage);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
Future<Nothing> schedule =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Now kill the executor.
dispatch(isolator,
&Isolator::killExecutor,
offers.get()[0].framework_id(),
DEFAULT_EXECUTOR_ID);
AWAIT_READY(status);
EXPECT_EQ(TASK_LOST, status.get().state());
// We use 'gc.schedule' as a signal for the executor being cleaned
// up by the slave.
AWAIT_READY(schedule);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
TEST_F(MasterTest, MasterInfo)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
Future<MasterInfo> masterInfo;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<2>(&masterInfo));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(Return()); // Ignore offers.
driver.start();
AWAIT_READY(masterInfo);
EXPECT_EQ(master.get().port, masterInfo.get().port());
EXPECT_EQ(master.get().ip, masterInfo.get().ip());
driver.stop();
driver.join();
Shutdown();
}
TEST_F(MasterTest, MasterInfoOnReElection)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(Return()); // Ignore offers.
Future<process::Message> message =
FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
driver.start();
AWAIT_READY(message);
// Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
// expiration) at the scheduler.
NewMasterDetectedMessage newMasterDetectedMsg;
newMasterDetectedMsg.set_pid(master.get());
Future<Nothing> disconnected;
EXPECT_CALL(sched, disconnected(&driver))
.WillOnce(FutureSatisfy(&disconnected));
Future<MasterInfo> masterInfo;
EXPECT_CALL(sched, reregistered(&driver, _))
.WillOnce(FutureArg<1>(&masterInfo));
process::post(message.get().to, newMasterDetectedMsg);
AWAIT_READY(disconnected);
AWAIT_READY(masterInfo);
EXPECT_EQ(master.get().port, masterInfo.get().port());
EXPECT_EQ(master.get().ip, masterInfo.get().ip());
driver.stop();
driver.join();
Shutdown();
}
class WhitelistTest : public MasterTest
{
protected:
WhitelistTest()
: path("whitelist.txt")
{}
virtual ~WhitelistTest()
{
os::rm(path);
}
const string path;
};
TEST_F(WhitelistTest, WhitelistSlave)
{
// Add some hosts to the white list.
Try<string> hostname = os::hostname();
ASSERT_SOME(hostname);
string hosts = hostname.get() + "\n" + "dummy-slave";
ASSERT_SOME(os::write(path, hosts)) << "Error writing whitelist";
master::Flags flags = CreateMasterFlags();
flags.whitelist = "file://" + path;
Try<PID<Master> > master = StartMaster(flags);
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers); // Implies the slave has registered.
driver.stop();
driver.join();
Shutdown();
}
TEST_F(MasterTest, MasterLost)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(Return()); // Ignore offers.
Future<process::Message> message =
FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
driver.start();
AWAIT_READY(message);
Future<Nothing> disconnected;
EXPECT_CALL(sched, disconnected(&driver))
.WillOnce(FutureSatisfy(&disconnected));
// Simulate a spurious noMasterDetected event at the scheduler.
process::post(message.get().to, NoMasterDetectedMessage());
AWAIT_READY(disconnected);
driver.stop();
driver.join();
Shutdown();
}