blob: 060039eed0bb8246ac034c0fb8560bfc7b25ab98 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gmock/gmock.h>
#include <vector>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <stout/strings.hpp>
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "slave/slave.hpp"
#include "tests/mesos.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using process::Future;
using process::PID;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::Return;
class ResourceOffersTest : public MesosTest {};
TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
// Start 10 slaves.
for (int i = 0; i < 10; i++) {
slave::Flags flags = CreateSlaveFlags();
flags.resources = Option<std::string>("cpus:2;mem:1024");
Try<PID<Slave> > slave = StartSlave(flags);
ASSERT_SOME(slave);
}
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // All 10 slaves might not be in first offer.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
EXPECT_GE(10u, offers.get().size());
Resources resources(offers.get()[0].resources());
EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value());
EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value());
driver.stop();
driver.join();
Shutdown();
}
TEST_F(ResourceOffersTest, TaskUsesNoResources)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
EXPECT_EQ("Task uses no resources", status.get().message());
driver.stop();
driver.join();
Shutdown();
}
TEST_F(ResourceOffersTest, TaskUsesInvalidResources)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Resource* cpus = task.add_resources();
cpus->set_name("cpus");
cpus->set_type(Value::SCALAR);
cpus->mutable_scalar()->set_value(0);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
driver.stop();
driver.join();
Shutdown();
}
TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Resource* cpus = task.add_resources();
cpus->set_name("cpus");
cpus->set_type(Value::SCALAR);
cpus->mutable_scalar()->set_value(2.01);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
EXPECT_TRUE(strings::contains(
status.get().message(), "greater than offered"));
driver.stop();
driver.join();
Shutdown();
}
TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched1;
MesosSchedulerDriver driver1(
&sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched1, registered(&driver1, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
driver1.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
driver1.stop();
driver1.join();
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched2, registered(&driver2, _, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers));
driver2.start();
AWAIT_READY(offers);
driver2.stop();
driver2.join();
Shutdown();
}
TEST_F(ResourceOffersTest, ResourcesGetReofferedWhenUnused)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched1;
MesosSchedulerDriver driver1(
&sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched1, registered(&driver1, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
driver1.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
vector<TaskInfo> tasks; // Use nothing!
driver1.launchTasks(offers.get()[0].id(), tasks);
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched2, registered(&driver2, _, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers));
driver2.start();
AWAIT_READY(offers);
// Stop first framework before second so no offers are sent.
driver1.stop();
driver1.join();
driver2.stop();
driver2.join();
Shutdown();
}
TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
Try<PID<Slave> > slave = StartSlave();
ASSERT_SOME(slave);
MockScheduler sched1;
MesosSchedulerDriver driver1(
&sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched1, registered(&driver1, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver1.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Resource* cpus = task.add_resources();
cpus->set_name("cpus");
cpus->set_type(Value::SCALAR);
cpus->mutable_scalar()->set_value(0);
Resource* mem = task.add_resources();
mem->set_name("mem");
mem->set_type(Value::SCALAR);
mem->mutable_scalar()->set_value(Gigabytes(1).bytes());
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<TaskStatus> status;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
.WillOnce(FutureArg<1>(&status));
driver1.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched2, registered(&driver2, _, _))
.Times(1);
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver2.start();
AWAIT_READY(offers);
driver1.stop();
driver1.join();
driver2.stop();
driver2.join();
Shutdown();
}
// TODO(benh): Add tests for checking correct slave IDs.
// TODO(benh): Add tests for checking executor resource usage.
// TODO(benh): Add tests which launch multiple tasks and check for
// unique task IDs and aggregate resource usage.
TEST_F(ResourceOffersTest, Request)
{
MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator;
EXPECT_CALL(allocator, initialize(_, _, _))
.Times(1);
Try<PID<Master> > master = StartMaster(&allocator);
ASSERT_SOME(master);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(allocator, frameworkAdded(_, _, _))
.Times(1);
Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(&registered));
driver.start();
AWAIT_READY(registered);
vector<Request> sent;
Request request;
request.mutable_slave_id()->set_value("test");
sent.push_back(request);
Future<vector<Request> > received;
EXPECT_CALL(allocator, resourcesRequested(_, _))
.WillOnce(FutureArg<1>(&received));
driver.requestResources(sent);
AWAIT_READY(received);
EXPECT_EQ(sent.size(), received.get().size());
EXPECT_NE(0u, received.get().size());
EXPECT_EQ(request.slave_id(), received.get()[0].slave_id());
EXPECT_CALL(allocator, frameworkDeactivated(_))
.Times(AtMost(1)); // Races with shutting down the cluster.
EXPECT_CALL(allocator, frameworkRemoved(_))
.Times(AtMost(1)); // Races with shutting down the cluster.
driver.stop();
driver.join();
Shutdown();
}
class MultipleExecutorsTest : public MesosTest {};
// This test verifies that two tasks launched on the same slave with
// the same executor id but different executor info are rejected.
TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnSameSlave)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave = StartSlave(&exec);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorInfo executor;
executor.mutable_executor_id()->set_value("default");
executor.mutable_command()->set_value("exit 1");
TaskInfo task1;
task1.set_name("");
task1.mutable_task_id()->set_value("1");
task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task1.mutable_resources()->MergeFrom(
Resources::parse("cpus:1;mem:512").get());
task1.mutable_executor()->MergeFrom(executor);
executor.mutable_command()->set_value("exit 2");
TaskInfo task2;
task2.set_name("");
task2.mutable_task_id()->set_value("2");
task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task2.mutable_resources()->MergeFrom(
Resources::parse("cpus:1;mem:512").get());
task2.mutable_executor()->MergeFrom(executor);
vector<TaskInfo> tasks;
tasks.push_back(task1);
tasks.push_back(task2);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
// Grab the "good" task but don't send a status update.
Future<TaskInfo> task;
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(FutureArg<1>(&task));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(task);
EXPECT_EQ(task1.task_id(), task.get().task_id());
AWAIT_READY(status);
EXPECT_EQ(task2.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
EXPECT_TRUE(strings::contains(
status.get().message(), "Task has invalid ExecutorInfo"));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}
// This test verifies that two tasks each launched on a different
// slave with same executor id but different executor info are
// allowed.
TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnDifferentSlaves)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(&registered));
driver.start();
AWAIT_READY(registered);
Future<vector<Offer> > offers1;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers1));
// Start the first slave.
MockExecutor exec1(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave1 = StartSlave(&exec1);
ASSERT_SOME(slave1);
AWAIT_READY(offers1);
EXPECT_NE(0u, offers1.get().size());
// Launch the first task with the default executor id.
ExecutorInfo executor1;
executor1 = DEFAULT_EXECUTOR_INFO;
executor1.mutable_command()->set_value("exit 1");
TaskInfo task1 = createTask(
offers1.get()[0], executor1.command().value(), executor1.executor_id());
vector<TaskInfo> tasks1;
tasks1.push_back(task1);
EXPECT_CALL(exec1, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec1, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1));
driver.launchTasks(offers1.get()[0].id(), tasks1);
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1.get().state());
Future<vector<Offer> > offers2;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers2))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Now start the second slave.
MockExecutor exec2(DEFAULT_EXECUTOR_ID);
Try<PID<Slave> > slave2 = StartSlave(&exec2);
ASSERT_SOME(slave2);
AWAIT_READY(offers2);
EXPECT_NE(0u, offers2.get().size());
// Now launch the second task with the same executor id but
// a different executor command.
ExecutorInfo executor2;
executor2 = executor1;
executor2.mutable_command()->set_value("exit 2");
TaskInfo task2 = createTask(
offers2.get()[0], executor2.command().value(), executor2.executor_id());
vector<TaskInfo> tasks2;
tasks2.push_back(task2);
EXPECT_CALL(exec2, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec2, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
driver.launchTasks(offers2.get()[0].id(), tasks2);
AWAIT_READY(status2);
ASSERT_EQ(TASK_RUNNING, status2.get().state());
EXPECT_CALL(exec1, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(exec2, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Shutdown();
}