blob: 7804ad043fef4fb9d138937709656ef6cecd7869 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <queue>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/queue.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/try.hpp>
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "master/constants.hpp"
#include "master/master.hpp"
#include "master/allocator/mesos/allocator.hpp"
#include "master/detector/standalone.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using mesos::internal::master::allocator::MesosAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using mesos::v1::scheduler::Mesos;
using process::Clock;
using process::Future;
using process::Owned;
using process::PID;
using process::Queue;
using process::http::OK;
using std::cout;
using std::endl;
using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
using testing::WithParamInterface;
namespace process {
// We need to reinitialize libprocess in order to test against different
// configurations, such as when libprocess is initialized with SSL enabled.
void reinitialize(
const Option<string>& delegate,
const Option<string>& readonlyAuthenticationRealm,
const Option<string>& readwriteAuthenticationRealm);
} // namespace process {
namespace mesos {
namespace internal {
namespace tests {
class SchedulerTest
: public MesosTest,
public WithParamInterface<ContentType> {};
// The scheduler library tests are parameterized by the content type
// of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
SchedulerTest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
// This test verifies that a scheduler can subscribe with the master.
TEST_P(SchedulerTest, Subscribe)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
ASSERT_EQ(master::DEFAULT_HEARTBEAT_INTERVAL.secs(),
subscribed->heartbeat_interval_seconds());
ASSERT_EQ(evolve(master.get()->getMasterInfo()), subscribed->master_info());
}
// Test validates that the scheduler library will not allow multiple
// SUBSCRIBE requests over the same connection.
TEST_P(SchedulerTest, SubscribeDrop)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Nothing> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureSatisfy(&subscribed));
Future<Nothing> heartbeat;
EXPECT_CALL(*scheduler, heartbeat(_))
.WillOnce(FutureSatisfy(&heartbeat));
Clock::pause();
mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
// Send another SUBSCRIBE request. This one should get dropped as we
// already have a SUBSCRIBE in flight on that same connection.
mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
AWAIT_READY(subscribed);
AWAIT_READY(heartbeat);
Clock::resume();
{
JSON::Object metrics = Metrics();
EXPECT_EQ(1u, metrics.values["master/messages_register_framework"]);
}
}
// This test verifies that a scheduler can subscribe with the master after
// failing over to another instance.
TEST_P(SchedulerTest, SchedulerFailover)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId = subscribed->framework_id();
auto scheduler2 = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected2;
EXPECT_CALL(*scheduler2, connected(_))
.WillOnce(FutureSatisfy(&connected2));
// Failover to another scheduler instance.
v1::scheduler::TestMesos mesos2(
master.get()->pid,
contentType,
scheduler2);
AWAIT_READY(connected2);
// The previously connected scheduler instance should receive an
// error/disconnected event.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler2, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler2, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
mesos2.send(call);
}
AWAIT_READY(error);
AWAIT_READY(disconnected);
AWAIT_READY(subscribed);
EXPECT_EQ(frameworkId, subscribed->framework_id());
}
// This test verifies that the scheduler can subscribe after a master failover.
TEST_P_TEMP_DISABLED_ON_WINDOWS(SchedulerTest, MasterFailover)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO))
.WillRepeatedly(Return());
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid, contentType, scheduler, detector);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId = subscribed->framework_id();
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected))
.WillRepeatedly(Return()); // Ignore future invocations.
// Failover the master.
// Also wipe the leading master from the detector, so the scheduler
// does not try to reconnect while the master actor is not routable.
master->reset();
detector->appoint(None());
master = StartMaster();
ASSERT_SOME(master);
AWAIT_READY(disconnected);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(
v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO, frameworkId));
Future<Event::Subscribed> subscribed2;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed2))
.WillRepeatedly(Return());
// Give the master leadership now that our scheduler expectations are set up.
detector->appoint(master.get()->pid);
AWAIT_READY(subscribed2);
EXPECT_EQ(frameworkId, subscribed2->framework_id());
}
// This test verifies that scheduler library also exposes metrics like
// scheduler driver.
TEST_P(SchedulerTest, MetricsEndpoint)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
Future<process::http::Response> response =
process::http::get(process::metrics::internal::metrics, "snapshot");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(parse);
JSON::Object metrics = parse.get();
// "scheduler/event_queue_messages" metric reports the number of message
// events in event queue. Message events are invoked when any custom
// message is generated by the executor.
EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_messages"));
// "scheduler/event_queue_dispatches" metric reports the number of dispatch
// events in event queue. Dispatch events are invoked when any function is
// dispatched as process as a result of any call by scheduler.
EXPECT_EQ(1u, metrics.values.count("scheduler/event_queue_dispatches"));
}
TEST_P(SchedulerTest, TaskRunning)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> statusUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&statusUpdate));
Future<Nothing> update;
EXPECT_CALL(containerizer, update(_, _, _))
.WillOnce(DoAll(FutureSatisfy(&update),
Return(Nothing())))
.WillRepeatedly(Return(Future<Nothing>())); // Ignore subsequent calls.
v1::TaskInfo taskInfo;
taskInfo.set_name("");
taskInfo.mutable_task_id()->set_value("1");
taskInfo.mutable_agent_id()->CopyFrom(
offers->offers(0).agent_id());
taskInfo.mutable_resources()->CopyFrom(
offers->offers(0).resources());
taskInfo.mutable_executor()->CopyFrom(v1::DEFAULT_EXECUTOR_INFO);
// TODO(benh): Enable just running a task with a command in the tests:
// taskInfo.mutable_command()->set_value("sleep 10");
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(statusUpdate);
EXPECT_EQ(v1::TASK_RUNNING, statusUpdate->status().state());
EXPECT_TRUE(statusUpdate->status().has_executor_id());
EXPECT_EQ(executorId, devolve(statusUpdate->status().executor_id()));
AWAIT_READY(update);
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// Ensures that a task group can be successfully launched
// on the `DEFAULT` executor.
TEST_P(SchedulerTest, TaskGroupRunning)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
slave::Flags flags = CreateSlaveFlags();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
Future<RunTaskGroupMessage> runTaskGroupMessage =
FUTURE_PROTOBUF(RunTaskGroupMessage(), master.get()->pid, slave.get()->pid);
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
v1::ExecutorInfo executor;
executor.set_type(v1::ExecutorInfo::DEFAULT);
executor.mutable_executor_id()->set_value("E");
executor.mutable_framework_id()->CopyFrom(subscribed->framework_id());
executor.mutable_resources()->CopyFrom(resources);
v1::TaskInfo task1;
task1.set_name("1");
task1.mutable_task_id()->set_value("1");
task1.mutable_agent_id()->CopyFrom(
offers->offers(0).agent_id());
task1.mutable_resources()->CopyFrom(resources);
task1.mutable_command()->set_value("exit 0");
v1::TaskInfo task2;
task2.set_name("2");
task2.mutable_task_id()->set_value("2");
task2.mutable_agent_id()->CopyFrom(
offers->offers(0).agent_id());
task2.mutable_resources()->CopyFrom(resources);
task2.mutable_command()->set_value("exit 0");
v1::TaskGroupInfo taskGroup;
taskGroup.add_tasks()->CopyFrom(task1);
taskGroup.add_tasks()->CopyFrom(task2);
Future<Event::Update> startingUpdate1;
Future<Event::Update> startingUpdate2;
Future<Event::Update> runningUpdate1;
Future<Event::Update> runningUpdate2;
Future<Event::Update> finishedUpdate1;
Future<Event::Update> finishedUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&startingUpdate1))
.WillOnce(FutureArg<1>(&startingUpdate2))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2))
.WillOnce(FutureArg<1>(&finishedUpdate1))
.WillOnce(FutureArg<1>(&finishedUpdate2));
EXPECT_CALL(*scheduler, failure(_, _))
.Times(AtMost(1));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
v1::Offer::Operation::LaunchGroup* launchGroup =
operation->mutable_launch_group();
launchGroup->mutable_executor()->CopyFrom(executor);
launchGroup->mutable_task_group()->CopyFrom(taskGroup);
mesos.send(call);
}
AWAIT_READY(runTaskGroupMessage);
EXPECT_EQ(devolve(frameworkId), runTaskGroupMessage->framework().id());
EXPECT_EQ(devolve(executor.executor_id()),
runTaskGroupMessage->executor().executor_id());
ASSERT_EQ(2, runTaskGroupMessage->task_group().tasks().size());
EXPECT_EQ(devolve(task1.task_id()),
runTaskGroupMessage->task_group().tasks(0).task_id());
EXPECT_EQ(devolve(task2.task_id()),
runTaskGroupMessage->task_group().tasks(1).task_id());
AWAIT_READY(startingUpdate1);
ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state());
AWAIT_READY(startingUpdate2);
ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state());
const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
// TASK_STARTING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksStarting{
startingUpdate1->status().task_id(),
startingUpdate2->status().task_id()};
ASSERT_EQ(tasks, tasksStarting);
// Acknowledge the TASK_STARTING updates so
// that subsequent updates can be received.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(
startingUpdate1->status().task_id());
acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
acknowledge->set_uuid(startingUpdate1->status().uuid());
mesos.send(call);
}
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(
startingUpdate2->status().task_id());
acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
acknowledge->set_uuid(startingUpdate2->status().uuid());
mesos.send(call);
}
AWAIT_READY(runningUpdate1);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
AWAIT_READY(runningUpdate2);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
// TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
runningUpdate1->status().task_id(),
runningUpdate2->status().task_id()};
ASSERT_EQ(tasks, tasksRunning);
// Acknowledge the TASK_RUNNING updates so
// that subsequent updates can be received.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(
runningUpdate1->status().task_id());
acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
acknowledge->set_uuid(runningUpdate1->status().uuid());
mesos.send(call);
}
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(
runningUpdate2->status().task_id());
acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
acknowledge->set_uuid(runningUpdate2->status().uuid());
mesos.send(call);
}
AWAIT_READY(finishedUpdate1);
EXPECT_EQ(v1::TASK_FINISHED, finishedUpdate1->status().state());
AWAIT_READY(finishedUpdate2);
EXPECT_EQ(v1::TASK_FINISHED, finishedUpdate2->status().state());
const hashset<v1::TaskID> tasksFinished{
finishedUpdate1->status().task_id(),
finishedUpdate2->status().task_id()};
EXPECT_EQ(tasks, tasksFinished);
}
TEST_P(SchedulerTest, ReconcileTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update1;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update1));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update1);
EXPECT_EQ(v1::TASK_RUNNING, update1->status().state());
Future<Event::Update> update2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update2));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::RECONCILE);
Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
task->mutable_task_id()->CopyFrom(taskInfo.task_id());
mesos.send(call);
}
AWAIT_READY(update2);
EXPECT_FALSE(update2->status().has_uuid());
EXPECT_EQ(v1::TASK_RUNNING, update2->status().state());
EXPECT_EQ(v1::TaskStatus::REASON_RECONCILIATION,
update2->status().reason());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
TEST_P(SchedulerTest, KillTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged))
.WillRepeatedly(Return());
Future<Event::Update> update1;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update1));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update1);
EXPECT_EQ(v1::TASK_RUNNING, update1->status().state());
{
// Acknowledge TASK_RUNNING update.
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACKNOWLEDGE);
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
acknowledge->set_uuid(update1->status().uuid());
mesos.send(call);
}
Future<Event::Update> update2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update2));
EXPECT_CALL(*executor, kill(_, _))
.WillOnce(v1::executor::SendUpdateFromTaskID(
frameworkId, evolve(executorId), v1::TASK_KILLED));
mesos.send(
v1::createCallKill(frameworkId, taskInfo.task_id(), offer.agent_id()));
AWAIT_READY(update2);
EXPECT_EQ(v1::TASK_KILLED, update2->status().state());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// Verifies invalidation of LAUNCH and LAUNCH_GROUP operations with `id` set.
TEST_P(SchedulerTest, OperationFeedbackValidationWithResourceProviderCapability)
{
Clock::pause();
master::Flags masterFlags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
Clock::advance(masterFlags.allocation_interval);
Clock::settle();
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
Future<v1::scheduler::Event::Update> taskStatusUpdate1;
Future<v1::scheduler::Event::Update> taskStatusUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&taskStatusUpdate1))
.WillOnce(FutureArg<1>(&taskStatusUpdate2));
// LAUNCH and LAUNCH_GROUP operations should not have the `id` field set.
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
v1::TaskInfo taskInfo1 =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
v1::Offer::Operation launch = v1::LAUNCH({taskInfo1});
launch.mutable_id()->set_value("LAUNCH_OPERATION");
v1::TaskInfo taskInfo2 = taskInfo1;
taskInfo2.mutable_task_id()->set_value("TASK_ID_2");
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
resources,
v1::ExecutorInfo::DEFAULT,
frameworkId);
v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo2}));
launchGroup.mutable_id()->set_value("LAUNCH_GROUP_OPERATION");
mesos.send(
v1::createCallAccept(
frameworkId,
offer,
{launch, launchGroup}));
AWAIT_READY(taskStatusUpdate1);
AWAIT_READY(taskStatusUpdate2);
EXPECT_EQ(v1::TASK_ERROR, taskStatusUpdate1->status().state());
EXPECT_EQ(v1::TASK_ERROR, taskStatusUpdate2->status().state());
}
// Verifies invalidation of RESERVE operations with `id` set, acting upon an
// offer from an agent without the RESOURCE_PROVIDER capability.
TEST_P(SchedulerTest, OperationFeedbackValidationNoResourceProviderCapability)
{
Clock::pause();
master::Flags masterFlags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.agent_features = SlaveCapabilities();
foreach (
const SlaveInfo::Capability& slaveCapability,
slave::AGENT_CAPABILITIES()) {
if (slaveCapability.type() != SlaveInfo::Capability::RESOURCE_PROVIDER) {
slaveFlags.agent_features->add_capabilities()->CopyFrom(slaveCapability);
}
}
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.add_roles("framework-role");
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
Clock::advance(masterFlags.allocation_interval);
Clock::settle();
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
Future<v1::scheduler::Event::UpdateOperationStatus> updateOperationStatus;
EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
.WillOnce(FutureArg<1>(&updateOperationStatus));
// RESERVE operations should not have the `id` field set when acting upon
// resources from an agent without the RESOURCE_PROVIDER capability.
const v1::Offer& offer = offers->offers(0);
v1::Resources resources = v1::Resources::parse("cpus:0.1").get();
resources = resources.pushReservation(v1::createDynamicReservationInfo(
frameworkInfo.roles(1), frameworkInfo.principal()));
v1::Offer::Operation operation = v1::RESERVE(resources);
operation.mutable_id()->set_value("RESERVE_OPERATION");
mesos.send(v1::createCallAccept(frameworkId, offer, {operation}));
AWAIT_READY(updateOperationStatus);
EXPECT_EQ(
mesos::v1::OPERATION_ERROR,
updateOperationStatus->status().state());
EXPECT_TRUE(metricEquals("master/operations/error", 1));
}
TEST_P(SchedulerTest, ShutdownExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_FINISHED));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_FINISHED, update->status().state());
Future<Nothing> shutdown;
EXPECT_CALL(*executor, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
Future<Event::Failure> failure;
EXPECT_CALL(*scheduler, failure(_, _))
.WillOnce(FutureArg<1>(&failure));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SHUTDOWN);
Call::Shutdown* shutdown = call.mutable_shutdown();
shutdown->mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
shutdown->mutable_agent_id()->CopyFrom(offer.agent_id());
mesos.send(call);
}
AWAIT_READY(shutdown);
containerizer.destroy(devolve(frameworkId), executorId);
// Executor termination results in a 'FAILURE' event.
AWAIT_READY(failure);
EXPECT_EQ(executorId, devolve(failure->executor_id()));
}
TEST_P(SchedulerTest, Decline)
{
master::Flags flags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers1));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers1);
ASSERT_EQ(1, offers1->offers().size());
const v1::Offer& offer = offers1->offers(0);
Future<Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers2));
Future<Nothing> recoverResources =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 0s filter to immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(0);
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// Make sure the dispatch event for `recoverResources` has been enqueued.
AWAIT_READY(recoverResources);
Clock::pause();
Clock::advance(flags.allocation_interval);
Clock::resume();
// If the resources were properly declined, the scheduler should
// get another offer with same amount of resources.
AWAIT_READY(offers2);
ASSERT_EQ(1, offers2->offers().size());
ASSERT_EQ(offer.resources(), offers2->offers(0).resources());
}
TEST_P(SchedulerTest, Revive)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers1));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers1);
ASSERT_FALSE(offers1->offers().empty());
const v1::Offer& offer = offers1->offers(0);
Future<Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers2));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 1hr filter to not immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(Hours(1).secs());
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// No offers should be sent within 30 mins because we set a filter
// for 1 hr.
Clock::pause();
Clock::advance(Minutes(30));
Clock::settle();
ASSERT_TRUE(offers2.isPending());
// On revival the filters should be cleared and the scheduler should
// get another offer with same amount of resources.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(offers2);
ASSERT_FALSE(offers2->offers().empty());
ASSERT_EQ(offer.resources(), offers2->offers(0).resources());
}
TEST_P(SchedulerTest, Suppress)
{
const string ROLE = "foo";
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers1;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers1));
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.clear_roles();
frameworkInfo.add_roles(ROLE);
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
frameworkInfo.mutable_id()->CopyFrom(frameworkId);
AWAIT_READY(offers1);
ASSERT_FALSE(offers1->offers().empty());
const v1::Offer& offer = offers1->offers(0);
Future<Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers2));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
// Set 1hr filter to not immediately get another offer.
v1::Filters filters;
filters.set_refuse_seconds(Hours(1).secs());
decline->mutable_filters()->CopyFrom(filters);
mesos.send(call);
}
// Wait for the master to process the DECLINE call.
Clock::pause();
Clock::settle();
Clock::resume();
JSON::Object metrics1 = Metrics();
const string prefix =
master::getFrameworkMetricPrefix(devolve(frameworkInfo));
EXPECT_EQ(1, metrics1.values[prefix + "subscribed"]);
EXPECT_EQ(1, metrics1.values[prefix + "offers/declined"]);
EXPECT_EQ(0, metrics1.values[prefix + "roles/" + ROLE + "/suppressed"]);
Future<Nothing> suppressOffers =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::suppressOffers);
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUPPRESS);
mesos.send(call);
}
AWAIT_READY(suppressOffers);
// Wait for allocator to finish executing 'suppressOffers()'.
Clock::pause();
Clock::settle();
JSON::Object metrics2 = Metrics();
EXPECT_EQ(1, metrics2.values[prefix + "roles/" + ROLE + "/suppressed"]);
// No offers should be sent within 100 mins because the framework
// suppressed offers.
Clock::advance(Minutes(100));
Clock::settle();
ASSERT_TRUE(offers2.isPending());
// On reviving offers the scheduler should get another offer with same amount
// of resources.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(offers2);
ASSERT_FALSE(offers2->offers().empty());
ASSERT_EQ(offer.resources(), offers2->offers(0).resources());
}
// This test verifies that when a framework registers with all roles
// suppressing offers, it does not receive offers.
TEST_P(SchedulerTest, NoOffersWithAllRolesSuppressed)
{
master::Flags flags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
Clock::pause();
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<Nothing> heartbeat;
EXPECT_CALL(*scheduler, heartbeat(_))
.WillOnce(FutureSatisfy(&heartbeat));
// The framework will subscribe with its role being suppressed so no
// offers should be received by the framework.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.Times(0);
{
Call call;
call.set_type(Call::SUBSCRIBE);
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
subscribe->add_suppressed_roles(frameworkInfo.roles(0));
mesos.send(call);
}
AWAIT_READY(subscribed);
AWAIT_READY(heartbeat);
// We use an additional heartbeat as a synchronization mechanism to make
// sure an offer would be received by the scheduler if one was ever extended.
// Note that Clock::settle() wouldn't be sufficient here.
EXPECT_CALL(*scheduler, heartbeat(_))
.WillOnce(FutureSatisfy(&heartbeat))
.WillRepeatedly(Return()); // Ignore additional heartbeats.
Clock::advance(master::DEFAULT_HEARTBEAT_INTERVAL);
AWAIT_READY(heartbeat);
v1::FrameworkID frameworkId(subscribed->framework_id());
// On revival the scheduler should get an offer.
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REVIVE);
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
Call::Revive* revive = call.mutable_revive();
revive->add_roles(frameworkInfo.roles(0));
mesos.send(call);
}
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
}
// This test verifies that if a framework (initially with no roles
// suppressed) decides to suppress offers for its roles on reregisteration,
// no offers will be made.
TEST_P(SchedulerTest, NoOffersOnReregistrationWithAllRolesSuppressed)
{
master::Flags flags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
Clock::pause();
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<Nothing> heartbeat;
EXPECT_CALL(*scheduler, heartbeat(_))
.WillOnce(FutureSatisfy(&heartbeat));
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
// Enable failover.
frameworkInfo.set_failover_timeout(Weeks(1).secs());
Call::Subscribe* subscribe = call.mutable_subscribe();
*(subscribe->mutable_framework_info()) = frameworkInfo;
mesos.send(call);
}
AWAIT_READY(subscribed);
AWAIT_READY(heartbeat);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
// Now fail over and reregister with all roles suppressed.
EXPECT_CALL(*scheduler, disconnected(_));
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillOnce(FutureSatisfy(&heartbeat));
// The framework will subscribe with its role being suppressed so no
// offers should be received by the framework.
EXPECT_CALL(*scheduler, offers(_, _))
.Times(0);
// Now fail over the scheduler.
mesos.reconnect();
AWAIT_READY(connected);
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
{
Call call;
call.set_type(Call::SUBSCRIBE);
*(call.mutable_framework_id()) = frameworkId;
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
*(frameworkInfo.mutable_id()) = frameworkId;
Call::Subscribe* subscribe = call.mutable_subscribe();
*(subscribe->mutable_framework_info()) = frameworkInfo;
subscribe->add_suppressed_roles(frameworkInfo.role());
mesos.send(call);
}
AWAIT_READY(subscribed);
AWAIT_READY(heartbeat);
// We use an additional heartbeat as a synchronization mechanism to make
// sure an offer would be received by the scheduler if one was ever extended.
// Note that Clock::settle() wouldn't be sufficient here.
EXPECT_CALL(*scheduler, heartbeat(_))
.WillOnce(FutureSatisfy(&heartbeat))
.WillRepeatedly(Return()); // Ignore additional heartbeats.
Clock::advance(master::DEFAULT_HEARTBEAT_INTERVAL);
AWAIT_READY(heartbeat);
// On revival the scheduler should get an offer.
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
*(call.mutable_framework_id()) = frameworkId;
call.set_type(Call::REVIVE);
mesos.send(call);
}
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
}
TEST_P(SchedulerTest, Message)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
Future<v1::executor::Event::Message> message;
EXPECT_CALL(*executor, message(_, _))
.WillOnce(FutureArg<1>(&message));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::MESSAGE);
Call::Message* message = call.mutable_message();
message->mutable_agent_id()->CopyFrom(offer.agent_id());
message->mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
message->set_data("hello world");
mesos.send(call);
}
AWAIT_READY(message);
ASSERT_EQ("hello world", message->data());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
TEST_P(SchedulerTest, Request)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
Future<Nothing> requestResources =
FUTURE_DISPATCH(_, &MesosAllocatorProcess::requestResources);
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::REQUEST);
// Create a dummy request.
Call::Request* request = call.mutable_request();
request->add_requests();
mesos.send(call);
}
AWAIT_READY(requestResources);
}
// This test verifies that the scheduler is able to force a reconnection with
// the master.
TEST_P(SchedulerTest, SchedulerReconnect)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
ContentType contentType = GetParam();
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler,
detector);
AWAIT_READY(connected);
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
// Force a reconnection with the master. This should result in a
// `disconnected` callback followed by a `connected` callback.
mesos.reconnect();
AWAIT_READY(disconnected);
// The scheduler should be able to immediately reconnect with the master.
AWAIT_READY(connected);
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
// Simulate a spurious master failure event at the scheduler.
detector->appoint(None());
AWAIT_READY(disconnected);
EXPECT_CALL(*scheduler, disconnected(_))
.Times(0);
EXPECT_CALL(*scheduler, connected(_))
.Times(0);
mesos.reconnect();
// Flush any possible remaining events. The mocked scheduler will fail if the
// reconnection attempt resulted in any additional callbacks after the
// scheduler has disconnected.
Clock::pause();
Clock::settle();
}
// TODO(benh): Write test for sending Call::Acknowledgement through
// master to slave when Event::Update was generated locally.
class SchedulerReconcileTasks_BENCHMARK_Test
: public MesosTest,
public WithParamInterface<size_t> {};
// The scheduler reconcile benchmark tests are parameterized by the number of
// tasks that need to be reconciled.
INSTANTIATE_TEST_CASE_P(
Tasks,
SchedulerReconcileTasks_BENCHMARK_Test,
::testing::Values(1000U, 10000U, 50000U, 100000U));
// This benchmark simulates a large reconcile request containing tasks unknown
// to the master using the scheduler library/driver. It then measures the time
// required for processing the received `TASK_LOST` status updates.
TEST_P(SchedulerReconcileTasks_BENCHMARK_Test, SchedulerLibrary)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
const size_t tasks = GetParam();
EXPECT_CALL(*scheduler, update(_, _))
.Times(static_cast<int>(tasks));
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::RECONCILE);
for (size_t i = 0; i < tasks; ++i) {
Call::Reconcile::Task* task = call.mutable_reconcile()->add_tasks();
task->mutable_task_id()->set_value("task " + stringify(i));
}
Stopwatch watch;
watch.start();
mesos.send(call);
Clock::pause();
Clock::settle();
cout << "Reconciling " << tasks << " tasks took " << watch.elapsed()
<< " using the scheduler library" << endl;
}
TEST_P(SchedulerReconcileTasks_BENCHMARK_Test, SchedulerDriver)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
false,
DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
driver.start();
AWAIT_READY(frameworkId);
const size_t tasks = GetParam();
EXPECT_CALL(sched, statusUpdate(&driver, _))
.Times(static_cast<int>(tasks));
vector<TaskStatus> statuses;
for (size_t i = 0; i < tasks; ++i) {
TaskStatus status;
status.mutable_task_id()->set_value("task " + stringify(i));
statuses.push_back(status);
}
Stopwatch watch;
watch.start();
driver.reconcileTasks(statuses);
Clock::pause();
Clock::settle();
cout << "Reconciling " << tasks << " tasks took " << watch.elapsed()
<< " using the scheduler driver" << endl;
driver.stop();
driver.join();
}
// A fixture class for scheduler tests that can be run with SSL either enabled
// or disabled.
class SchedulerSSLTest
: public MesosTest,
public WithParamInterface<std::tuple<ContentType, string>>
{
// These test setup/teardown methods are only needed when compiled with SSL.
#ifdef USE_SSL_SOCKET
protected:
void SetUp() override
{
MesosTest::SetUp();
if (std::get<1>(GetParam()) == "https") {
generate_keys_and_certs();
set_environment_variables({
{"LIBPROCESS_SSL_ENABLED", "true"},
{"LIBPROCESS_SSL_KEY_FILE", key_path()},
{"LIBPROCESS_SSL_CERT_FILE", certificate_path()},
{"LIBPROCESS_SSL_CA_FILE", certificate_path().string()},
{"LIBPROCESS_SSL_REQUIRE_CERT", "true"}});
process::reinitialize(
None(),
READONLY_HTTP_AUTHENTICATION_REALM,
READWRITE_HTTP_AUTHENTICATION_REALM);
} else {
set_environment_variables({});
process::reinitialize(
None(),
READONLY_HTTP_AUTHENTICATION_REALM,
READWRITE_HTTP_AUTHENTICATION_REALM);
}
}
public:
static void TearDownTestCase()
{
// The teardown code of `MesosTest` calls `set_environment_variables({})`,
// so we invoke it first.
MesosTest::TearDownTestCase();
process::reinitialize(
None(),
READONLY_HTTP_AUTHENTICATION_REALM,
READWRITE_HTTP_AUTHENTICATION_REALM);
}
#endif // USE_SSL_SOCKET
};
// NOTE: `#ifdef`'ing out the argument `string("https")` argument causes a
// build break on Windows, because the preprocessor is not required to to
// process the text it expands.
#ifdef USE_SSL_SOCKET
INSTANTIATE_TEST_CASE_P(
ContentTypeAndSSLConfig,
SchedulerSSLTest,
::testing::Combine(
::testing::Values(ContentType::PROTOBUF, ContentType::JSON),
::testing::Values(
string("https"),
string("http"))));
#else
INSTANTIATE_TEST_CASE_P(
ContentTypeAndSSLConfig,
SchedulerSSLTest,
::testing::Combine(
::testing::Values(ContentType::PROTOBUF, ContentType::JSON),
::testing::Values(
string("http"))));
#endif // USE_SSL_SOCKET
// Tests that a scheduler can subscribe, run a task, and then tear itself down.
TEST_P(SchedulerSSLTest, RunTaskAndTeardown)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = std::get<0>(GetParam());
v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
mesos.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
Future<Nothing> shutdown;
EXPECT_CALL(*executor, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::TEARDOWN);
mesos.send(call);
}
AWAIT_READY(shutdown);
AWAIT_READY(disconnected);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {