blob: 4e2db2ac40c59b9b9a97cd214b3cd1e727a4f0ad [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <queue>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/pid.hpp>
#include <process/queue.hpp>
#include <stout/lambda.hpp>
#include <stout/try.hpp>
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "master/allocator/mesos/allocator.hpp"
#include "master/master.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using mesos::internal::master::allocator::MesosAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using mesos::v1::scheduler::Mesos;
using process::Clock;
using process::Future;
using process::PID;
using process::Queue;
using std::string;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
using testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
class SchedulerTest : public MesosTest, public WithParamInterface<ContentType>
{
protected:
// Helper class for using EXPECT_CALL since the Mesos scheduler API
// is callback based.
class Callbacks
{
public:
MOCK_METHOD0(connected, void());
MOCK_METHOD0(disconnected, void());
MOCK_METHOD1(received, void(const std::queue<Event>&));
};
};
// The scheduler library tests are parameterized by the content type
// of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
SchedulerTest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
// Enqueues all received events into a libprocess queue.
ACTION_P(Enqueue, queue)
{
std::queue<Event> events = arg0;
while (!events.empty()) {
// Note that we currently drop HEARTBEATs because most of these tests
// are not designed to deal with heartbeats.
// TODO(vinod): Implement DROP_HTTP_CALLS that can filter heartbeats.
if (events.front().type() == Event::HEARTBEAT) {
VLOG(1) << "Ignoring HEARTBEAT event";
} else {
queue->put(events.front());
}
events.pop();
}
}
// This test verifies that when a scheduler resubscribes it receives
// SUBSCRIBED event with the previously assigned framework id.
TEST_P(SchedulerTest, Subscribe)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
subscribe->set_force(true);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
// Resubscribe with the same framework id.
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(id);
subscribe->set_force(true);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
EXPECT_EQ(id, event.get().subscribed().framework_id());
Shutdown();
}
TEST_P(SchedulerTest, TaskRunning)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<Nothing> update;
EXPECT_CALL(containerizer, update(_, _))
.WillOnce(DoAll(FutureSatisfy(&update),
Return(Nothing())))
.WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
v1::TaskInfo taskInfo;
taskInfo.set_name("");
taskInfo.mutable_task_id()->set_value("1");
taskInfo.mutable_agent_id()->CopyFrom(
event.get().offers().offers(0).agent_id());
taskInfo.mutable_resources()->CopyFrom(
event.get().offers().offers(0).resources());
taskInfo.mutable_executor()->CopyFrom(DEFAULT_V1_EXECUTOR_INFO);
// TODO(benh): Enable just running a task with a command in the tests:
// taskInfo.mutable_command()->set_value("sleep 10");
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(event.get().offers().offers(0).id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
EXPECT_TRUE(event.get().update().status().has_executor_id());
EXPECT_EQ(exec.id, devolve(event.get().update().status().executor_id()));
AWAIT_READY(update);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, ReconcileTask)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
v1::Offer offer = event.get().offers().offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::RECONCILE);
Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
task->mutable_task_id()->CopyFrom(taskInfo.task_id());
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_FALSE(event.get().update().status().has_uuid());
EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
EXPECT_EQ(v1::TaskStatus::REASON_RECONCILIATION,
event.get().update().status().reason());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, KillTask)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
v1::Offer offer = event.get().offers().offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
{
// Acknowledge TASK_RUNNING update.
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
acknowledge->set_uuid(event.get().update().status().uuid());
mesos.send(call);
}
EXPECT_CALL(exec, killTask(_, _))
.WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::KILL);
Call::Kill* kill = call.mutable_kill();
kill->mutable_task_id()->CopyFrom(taskInfo.task_id());
kill->mutable_agent_id()->CopyFrom(offer.agent_id());
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_KILLED, event.get().update().status().state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, ShutdownExecutor)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
v1::Offer offer = event.get().offers().offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_FINISHED, event.get().update().status().state());
Future<Nothing> shutdown;
EXPECT_CALL(exec, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::SHUTDOWN);
Call::Shutdown* shutdown = call.mutable_shutdown();
shutdown->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
shutdown->mutable_agent_id()->CopyFrom(offer.agent_id());
mesos.send(call);
}
AWAIT_READY(shutdown);
containerizer.destroy(devolve(id), DEFAULT_EXECUTOR_ID);
// Executor termination results in a 'FAILURE' event.
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::FAILURE, event.get().type());
v1::ExecutorID executorId(DEFAULT_V1_EXECUTOR_ID);
EXPECT_EQ(executorId, event.get().failure().executor_id());
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, Teardown)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
v1::Offer offer = event.get().offers().offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
Future<Nothing> shutdown;
EXPECT_CALL(exec, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::TEARDOWN);
mesos.send(call);
}
AWAIT_READY(shutdown);
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, Decline)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Try<PID<Slave>> slave = StartSlave();
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
ASSERT_EQ(1, event.get().offers().offers().size());
v1::Offer offer = event.get().offers().offers(0);
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 0s filter to immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(0);
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// If the resources were properly declined, the scheduler should
// get another offer with same amount of resources.
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
ASSERT_EQ(1, event.get().offers().offers().size());
ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources());
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, Revive)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Try<PID<Slave>> slave = StartSlave();
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
v1::Offer offer = event.get().offers().offers(0);
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 1hr filter to not immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(Hours(1).secs());
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// No offers should be sent within 30 mins because we set a filter
// for 1 hr.
Clock::pause();
Clock::advance(Minutes(30));
Clock::settle();
event = events.get();
ASSERT_TRUE(event.isPending());
// On revival the filters should be cleared and the scheduler should
// get another offer with same amount of resources.
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources());
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, Suppress)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Try<PID<Slave>> slave = StartSlave();
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
v1::Offer offer = event.get().offers().offers(0);
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 1hr filter to not immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(Hours(1).secs());
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
Future<Nothing> suppressOffers =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::suppressOffers);
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::SUPPRESS);
mesos.send(call);
}
AWAIT_READY(suppressOffers);
// Wait for allocator to finish executing 'suppressOffers()'.
Clock::pause();
Clock::settle();
// No offers should be sent within 100 mins because the framework
// suppressed offers.
Clock::advance(Minutes(100));
Clock::settle();
event = events.get();
ASSERT_TRUE(event.isPending());
// On reviving offers the scheduler should get another offer with same amount
// of resources.
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources());
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, Message)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Try<PID<Slave>> slave = StartSlave(&containerizer);
ASSERT_SOME(slave);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::OFFERS, event.get().type());
EXPECT_NE(0, event.get().offers().offers().size());
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
v1::Offer offer = event.get().offers().offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::UPDATE, event.get().type());
EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
Future<string> data;
EXPECT_CALL(exec, frameworkMessage(_, _))
.WillOnce(FutureArg<1>(&data));
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::MESSAGE);
Call::Message* message = call.mutable_message();
message->mutable_agent_id()->CopyFrom(offer.agent_id());
message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
message->set_data("hello world");
mesos.send(call);
}
AWAIT_ASSERT_EQ("hello world", data);
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
TEST_P(SchedulerTest, Request)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<PID<Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Callbacks callbacks;
Future<Nothing> connected;
EXPECT_CALL(callbacks, connected())
.WillOnce(FutureSatisfy(&connected));
Mesos mesos(
master.get(),
GetParam(),
lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
AWAIT_READY(connected);
Queue<Event> events;
EXPECT_CALL(callbacks, received(_))
.WillRepeatedly(Enqueue(&events));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
Future<Event> event = events.get();
AWAIT_READY(event);
EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
v1::FrameworkID id(event.get().subscribed().framework_id());
Future<Nothing> requestResources =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::requestResources);
{
Call call;
call.mutable_framework_id()->CopyFrom(id);
call.set_type(Call::REQUEST);
// Create a dummy request.
Call::Request* request = call.mutable_request();
request->add_requests();
mesos.send(call);
}
AWAIT_READY(requestResources);
Shutdown();
}
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
} // namespace tests {
} // namespace internal {
} // namespace mesos {