blob: 910d76fd8485945482b3c18fd71c02a92dd61e28 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <queue>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/queue.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/try.hpp>
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "master/allocator/mesos/allocator.hpp"
#include "master/detector/standalone.hpp"
#include "master/master.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using mesos::internal::master::allocator::MesosAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using mesos::v1::scheduler::Mesos;
using process::Clock;
using process::Future;
using process::Owned;
using process::PID;
using process::Queue;
using process::http::OK;
using std::cout;
using std::endl;
using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
using testing::WithParamInterface;
namespace process {
// We need to reinitialize libprocess in order to test against different
// configurations, such as when libprocess is initialized with SSL enabled.
void reinitialize(
const Option<string>& delegate,
const Option<string>& readonlyAuthenticationRealm,
const Option<string>& readwriteAuthenticationRealm);
} // namespace process {
namespace mesos {
namespace internal {
namespace tests {
class SchedulerTest
: public MesosTest,
public WithParamInterface<ContentType> {};
// The scheduler library tests are parameterized by the content type
// of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
SchedulerTest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
// This test verifies that a scheduler can subscribe with the master.
TEST_P(SchedulerTest, Subscribe)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
ASSERT_EQ(master::DEFAULT_HEARTBEAT_INTERVAL.secs(),
subscribed->heartbeat_interval_seconds());
ASSERT_EQ(evolve(master.get()->getMasterInfo()), subscribed->master_info());
}
// This test verifies that a scheduler can subscribe with the master after
// failing over to another instance.
TEST_P(SchedulerTest, SchedulerFailover)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId = subscribed->framework_id();
auto scheduler2 = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected2;
EXPECT_CALL(*scheduler2, connected(_))
.WillOnce(FutureSatisfy(&connected2));
// Failover to another scheduler instance.
v1::scheduler::TestMesos mesos2(
master.get()->pid,
contentType,
scheduler2);
AWAIT_READY(connected2);
// The previously connected scheduler instance should receive an
// error/disconnected event.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler2, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler2, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
mesos2.send(call);
}
AWAIT_READY(error);
AWAIT_READY(disconnected);
AWAIT_READY(subscribed);
EXPECT_EQ(frameworkId, subscribed->framework_id());
}
// This test verifies that the scheduler can subscribe after a master failover.
TEST_P_TEMP_DISABLED_ON_WINDOWS(SchedulerTest, MasterFailover)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler,
detector);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId = subscribed->framework_id();
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected))
.WillRepeatedly(Return()); // Ignore future invocations.
// Failover the master.
master->reset();
master = StartMaster();
ASSERT_SOME(master);
AWAIT_READY(disconnected);
Future<Nothing> connected2;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected2));
detector->appoint(master.get()->pid);
AWAIT_READY(connected2);
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
mesos.send(call);
}
AWAIT_READY(subscribed);
EXPECT_EQ(frameworkId, subscribed->framework_id());
}
// This test verifies that scheduler library also exposes metrics like
// scheduler driver.
TEST_P(SchedulerTest, MetricsEndpoint)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
Future<process::http::Response> response =
process::http::get(process::metrics::internal::metrics, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(parse);
JSON::Object metrics = parse.get();
// "scheduler/event_queue_messages" metric reports the number of message
// events in event queue. Message events are invoked when any custom
// message is generated by the executor.
EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_messages"));
// "scheduler/event_queue_dispatches" metric reports the number of dispatch
// events in event queue. Dispatch events are invoked when any function is
// dispatched as process as a result of any call by scheduler.
EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_dispatches"));
}
TEST_P(SchedulerTest, TaskRunning)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> statusUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&statusUpdate));
Future<Nothing> update;
EXPECT_CALL(containerizer, update(_, _))
.WillOnce(DoAll(FutureSatisfy(&update),
Return(Nothing())))
.WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
v1::TaskInfo taskInfo;
taskInfo.set_name("");
taskInfo.mutable_task_id()->set_value("1");
taskInfo.mutable_agent_id()->CopyFrom(
offers->offers(0).agent_id());
taskInfo.mutable_resources()->CopyFrom(
offers->offers(0).resources());
taskInfo.mutable_executor()->CopyFrom(v1::DEFAULT_EXECUTOR_INFO);
// TODO(benh): Enable just running a task with a command in the tests:
// taskInfo.mutable_command()->set_value("sleep 10");
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(statusUpdate);
EXPECT_EQ(v1::TASK_RUNNING, statusUpdate->status().state());
EXPECT_TRUE(statusUpdate->status().has_executor_id());
EXPECT_EQ(executorId, devolve(statusUpdate->status().executor_id()));
AWAIT_READY(update);
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// Ensures that a task group can be successfully launched
// on the `DEFAULT` executor.
TEST_P(SchedulerTest, TaskGroupRunning)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
slave::Flags flags = CreateSlaveFlags();
#ifndef USE_SSL_SOCKET
// Executor authentication currently has SSL as a dependency, so we cannot
// require executors to authenticate with the agent operator API if Mesos
// was not built with SSL support.
flags.authenticate_http_readwrite = false;
#endif // USE_SSL_SOCKET
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
Future<RunTaskGroupMessage> runTaskGroupMessage =
FUTURE_PROTOBUF(RunTaskGroupMessage(), master.get()->pid, slave.get()->pid);
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
v1::ExecutorInfo executor;
executor.set_type(v1::ExecutorInfo::DEFAULT);
executor.mutable_executor_id()->set_value("E");
executor.mutable_framework_id()->CopyFrom(subscribed->framework_id());
executor.mutable_resources()->CopyFrom(resources);
v1::TaskInfo task1;
task1.set_name("1");
task1.mutable_task_id()->set_value("1");
task1.mutable_agent_id()->CopyFrom(
offers->offers(0).agent_id());
task1.mutable_resources()->CopyFrom(resources);
task1.mutable_command()->set_value("exit 0");
v1::TaskInfo task2;
task2.set_name("2");
task2.mutable_task_id()->set_value("2");
task2.mutable_agent_id()->CopyFrom(
offers->offers(0).agent_id());
task2.mutable_resources()->CopyFrom(resources);
task2.mutable_command()->set_value("exit 0");
v1::TaskGroupInfo taskGroup;
taskGroup.add_tasks()->CopyFrom(task1);
taskGroup.add_tasks()->CopyFrom(task2);
Future<Event::Update> runningUpdate1;
Future<Event::Update> runningUpdate2;
Future<Event::Update> finishedUpdate1;
Future<Event::Update> finishedUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2))
.WillOnce(FutureArg<1>(&finishedUpdate1))
.WillOnce(FutureArg<1>(&finishedUpdate2));
EXPECT_CALL(*scheduler, failure(_, _))
.Times(AtMost(1));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
v1::Offer::Operation::LaunchGroup* launchGroup =
operation->mutable_launch_group();
launchGroup->mutable_executor()->CopyFrom(executor);
launchGroup->mutable_task_group()->CopyFrom(taskGroup);
mesos.send(call);
}
AWAIT_READY(runTaskGroupMessage);
EXPECT_EQ(devolve(frameworkId), runTaskGroupMessage->framework().id());
EXPECT_EQ(devolve(executor.executor_id()),
runTaskGroupMessage->executor().executor_id());
ASSERT_EQ(2, runTaskGroupMessage->task_group().tasks().size());
EXPECT_EQ(devolve(task1.task_id()),
runTaskGroupMessage->task_group().tasks(0).task_id());
EXPECT_EQ(devolve(task2.task_id()),
runTaskGroupMessage->task_group().tasks(1).task_id());
AWAIT_READY(runningUpdate1);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
AWAIT_READY(runningUpdate2);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
// TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
runningUpdate1->status().task_id(),
runningUpdate2->status().task_id()};
ASSERT_EQ(tasks, tasksRunning);
// Acknowledge the TASK_RUNNING updates so
// that subsequent updates can be received.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(
runningUpdate1->status().task_id());
acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
acknowledge->set_uuid(runningUpdate1->status().uuid());
mesos.send(call);
}
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(
runningUpdate2->status().task_id());
acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
acknowledge->set_uuid(runningUpdate2->status().uuid());
mesos.send(call);
}
AWAIT_READY(finishedUpdate1);
EXPECT_EQ(v1::TASK_FINISHED, finishedUpdate1->status().state());
AWAIT_READY(finishedUpdate2);
EXPECT_EQ(v1::TASK_FINISHED, finishedUpdate2->status().state());
const hashset<v1::TaskID> tasksFinished{
finishedUpdate1->status().task_id(),
finishedUpdate2->status().task_id()};
EXPECT_EQ(tasks, tasksFinished);
}
TEST_P(SchedulerTest, ReconcileTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update1;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update1));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update1);
EXPECT_EQ(v1::TASK_RUNNING, update1->status().state());
Future<Event::Update> update2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update2));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::RECONCILE);
Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
task->mutable_task_id()->CopyFrom(taskInfo.task_id());
mesos.send(call);
}
AWAIT_READY(update2);
EXPECT_FALSE(update2->status().has_uuid());
EXPECT_EQ(v1::TASK_RUNNING, update2->status().state());
EXPECT_EQ(v1::TaskStatus::REASON_RECONCILIATION,
update2->status().reason());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
TEST_P(SchedulerTest, KillTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged))
.WillRepeatedly(Return());
Future<Event::Update> update1;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update1));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update1);
EXPECT_EQ(v1::TASK_RUNNING, update1->status().state());
{
// Acknowledge TASK_RUNNING update.
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
acknowledge->set_uuid(update1->status().uuid());
mesos.send(call);
}
Future<Event::Update> update2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update2));
EXPECT_CALL(*executor, kill(_, _))
.WillOnce(v1::executor::SendUpdateFromTaskID(
frameworkId, evolve(executorId), v1::TASK_KILLED));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::KILL);
Call::Kill* kill = call.mutable_kill();
kill->mutable_task_id()->CopyFrom(taskInfo.task_id());
kill->mutable_agent_id()->CopyFrom(offer.agent_id());
mesos.send(call);
}
AWAIT_READY(update2);
EXPECT_EQ(v1::TASK_KILLED, update2->status().state());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
TEST_P(SchedulerTest, ShutdownExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_FINISHED));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_FINISHED, update->status().state());
Future<Nothing> shutdown;
EXPECT_CALL(*executor, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
Future<Event::Failure> failure;
EXPECT_CALL(*scheduler, failure(_, _))
.WillOnce(FutureArg<1>(&failure));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SHUTDOWN);
Call::Shutdown* shutdown = call.mutable_shutdown();
shutdown->mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
shutdown->mutable_agent_id()->CopyFrom(offer.agent_id());
mesos.send(call);
}
AWAIT_READY(shutdown);
containerizer.destroy(devolve(frameworkId), executorId);
// Executor termination results in a 'FAILURE' event.
AWAIT_READY(failure);
EXPECT_EQ(executorId, devolve(failure->executor_id()));
}
TEST_P(SchedulerTest, Decline)
{
master::Flags flags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers1));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers1);
ASSERT_EQ(1, offers1->offers().size());
const v1::Offer& offer = offers1->offers(0);
Future<Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers2));
Future<Nothing> recoverResources =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 0s filter to immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(0);
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// Make sure the dispatch event for `recoverResources` has been enqueued.
AWAIT_READY(recoverResources);
Clock::pause();
Clock::advance(flags.allocation_interval);
Clock::resume();
// If the resources were properly declined, the scheduler should
// get another offer with same amount of resources.
AWAIT_READY(offers2);
ASSERT_EQ(1, offers2->offers().size());
ASSERT_EQ(offer.resources(), offers2->offers(0).resources());
}
TEST_P(SchedulerTest, Revive)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers1));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers1);
EXPECT_NE(0, offers1->offers().size());
const v1::Offer& offer = offers1->offers(0);
Future<Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers2));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 1hr filter to not immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(Hours(1).secs());
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// No offers should be sent within 30 mins because we set a filter
// for 1 hr.
Clock::pause();
Clock::advance(Minutes(30));
Clock::settle();
ASSERT_TRUE(offers2.isPending());
// On revival the filters should be cleared and the scheduler should
// get another offer with same amount of resources.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(offers2);
EXPECT_NE(0, offers2->offers().size());
ASSERT_EQ(offer.resources(), offers2->offers(0).resources());
}
TEST_P(SchedulerTest, Suppress)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers1));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers1);
EXPECT_NE(0, offers1->offers().size());
const v1::Offer& offer = offers1->offers(0);
Future<Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers2));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 1hr filter to not immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(Hours(1).secs());
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
Future<Nothing> suppressOffers =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::suppressOffers);
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUPPRESS);
mesos.send(call);
}
AWAIT_READY(suppressOffers);
// Wait for allocator to finish executing 'suppressOffers()'.
Clock::pause();
Clock::settle();
// No offers should be sent within 100 mins because the framework
// suppressed offers.
Clock::advance(Minutes(100));
Clock::settle();
ASSERT_TRUE(offers2.isPending());
// On reviving offers the scheduler should get another offer with same amount
// of resources.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(offers2);
EXPECT_NE(0, offers2->offers().size());
ASSERT_EQ(offer.resources(), offers2->offers(0).resources());
}
TEST_P(SchedulerTest, NoOffersWithAllRolesSuppressed)
{
master::Flags flags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.Times(0); // No offers extended since all roles are suppressed.
{
Call call;
call.set_type(Call::SUBSCRIBE);
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
subscribe->add_suppressed_roles(frameworkInfo.roles(0));
mesos.send(call);
}
// Since the framework is subscribed with its role being suppressed, no
// offers should be received by the framework.
Clock::pause();
Clock::advance(flags.allocation_interval);
Clock::resume();
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
// On revival the scheduler should get an offer.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REVIVE);
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
Call::Revive* revive = call.mutable_revive();
revive->add_roles(frameworkInfo.roles(0));
mesos.send(call);
}
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
}
TEST_P(SchedulerTest, Message)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
Future<v1::executor::Event::Message> message;
EXPECT_CALL(*executor, message(_, _))
.WillOnce(FutureArg<1>(&message));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::MESSAGE);
Call::Message* message = call.mutable_message();
message->mutable_agent_id()->CopyFrom(offer.agent_id());
message->mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
message->set_data("hello world");
mesos.send(call);
}
AWAIT_READY(message);
ASSERT_EQ("hello world", message->data());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
TEST_P(SchedulerTest, Request)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
Future<Nothing> requestResources =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::requestResources);
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REQUEST);
// Create a dummy request.
Call::Request* request = call.mutable_request();
request->add_requests();
mesos.send(call);
}
AWAIT_READY(requestResources);
}
// This test verifies that the scheduler is able to force a reconnection with
// the master.
TEST_P(SchedulerTest, SchedulerReconnect)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler,
detector);
AWAIT_READY(connected);
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
// Force a reconnection with the master. This should result in a
// `disconnected` callback followed by a `connected` callback.
mesos.reconnect();
AWAIT_READY(disconnected);
// The scheduler should be able to immediately reconnect with the master.
AWAIT_READY(connected);
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
// Simulate a spurious master failure event at the scheduler.
detector->appoint(None());
AWAIT_READY(disconnected);
EXPECT_CALL(*scheduler, disconnected(_))
.Times(0);
EXPECT_CALL(*scheduler, connected(_))
.Times(0);
mesos.reconnect();
// Flush any possible remaining events. The mocked scheduler will fail if the
// reconnection attempt resulted in any additional callbacks after the
// scheduler has disconnected.
Clock::pause();
Clock::settle();
}
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
class SchedulerReconcileTasks_BENCHMARK_Test
: public MesosTest,
public WithParamInterface<size_t> {};
// The scheduler reconcile benchmark tests are parameterized by the number of
// tasks that need to be reconciled.
INSTANTIATE_TEST_CASE_P(
Tasks,
SchedulerReconcileTasks_BENCHMARK_Test,
::testing::Values(1000U, 10000U, 50000U, 100000U));
// This benchmark simulates a large reconcile request containing tasks unknown
// to the master using the scheduler library/driver. It then measures the time
// required for processing the received `TASK_LOST` status updates.
TEST_P(SchedulerReconcileTasks_BENCHMARK_Test, SchedulerLibrary)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
const size_t tasks = GetParam();
EXPECT_CALL(*scheduler, update(_, _))
.Times(tasks);
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::RECONCILE);
for (size_t i = 0; i < tasks; ++i) {
Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
task->mutable_task_id()->set_value("task " + stringify(i));
}
Stopwatch watch;
watch.start();
mesos.send(call);
Clock::pause();
Clock::settle();
cout << "Reconciling " << tasks << " tasks took " << watch.elapsed()
<< " using the scheduler library" << endl;
}
TEST_P(SchedulerReconcileTasks_BENCHMARK_Test, SchedulerDriver)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
false,
DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
driver.start();
AWAIT_READY(frameworkId);
const size_t tasks = GetParam();
EXPECT_CALL(sched, statusUpdate(&driver, _))
.Times(tasks);
vector<TaskStatus> statuses;
for (size_t i = 0; i < tasks; ++i) {
TaskStatus status;
status.mutable_task_id()->set_value("task " + stringify(i));
statuses.push_back(status);
}
Stopwatch watch;
watch.start();
driver.reconcileTasks(statuses);
Clock::pause();
Clock::settle();
cout << "Reconciling " << tasks << " tasks took " << watch.elapsed()
<< " using the scheduler driver" << endl;
driver.stop();
driver.join();
}
// A fixture class for scheduler tests that can be run with SSL either enabled
// or disabled.
class SchedulerSSLTest
: public MesosTest,
public WithParamInterface<std::tuple<ContentType, string>>
{
// These test setup/teardown methods are only needed when compiled with SSL.
#ifdef USE_SSL_SOCKET
protected:
virtual void SetUp()
{
MesosTest::SetUp();
if (std::get<1>(GetParam()) == "https") {
generate_keys_and_certs();
set_environment_variables({
{"LIBPROCESS_SSL_ENABLED", "true"},
{"LIBPROCESS_SSL_KEY_FILE", key_path()},
{"LIBPROCESS_SSL_CERT_FILE", certificate_path()},
{"LIBPROCESS_SSL_CA_FILE", certificate_path().string()},
{"LIBPROCESS_SSL_REQUIRE_CERT", "true"}});
process::reinitialize(
None(),
READONLY_HTTP_AUTHENTICATION_REALM,
READWRITE_HTTP_AUTHENTICATION_REALM);
} else {
set_environment_variables({});
process::reinitialize(
None(),
READONLY_HTTP_AUTHENTICATION_REALM,
READWRITE_HTTP_AUTHENTICATION_REALM);
}
}
public:
static void TearDownTestCase()
{
// The teardown code of `MesosTest` calls `set_environment_variables({})`,
// so we invoke it first.
MesosTest::TearDownTestCase();
process::reinitialize(
None(),
READONLY_HTTP_AUTHENTICATION_REALM,
READWRITE_HTTP_AUTHENTICATION_REALM);
}
#endif // USE_SSL_SOCKET
};
// NOTE: `#ifdef`'ing out the argument `string("https")` argument causes a
// build break on Windows, because the preprocessor is not required to to
// process the text it expands.
#ifdef USE_SSL_SOCKET
INSTANTIATE_TEST_CASE_P(
ContentTypeAndSSLConfig,
SchedulerSSLTest,
::testing::Combine(
::testing::Values(ContentType::PROTOBUF, ContentType::JSON),
::testing::Values(
string("https"),
string("http"))));
#else
INSTANTIATE_TEST_CASE_P(
ContentTypeAndSSLConfig,
SchedulerSSLTest,
::testing::Combine(
::testing::Values(ContentType::PROTOBUF, ContentType::JSON),
::testing::Values(
string("http"))));
#endif // USE_SSL_SOCKET
// Tests that a scheduler can subscribe, run a task, and then tear itself down.
TEST_P(SchedulerSSLTest, RunTaskAndTeardown)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = std::get<0>(GetParam());
v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
Future<Nothing> shutdown;
EXPECT_CALL(*executor, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::TEARDOWN);
mesos.send(call);
}
AWAIT_READY(shutdown);
AWAIT_READY(disconnected);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {