blob: a26935299485a4669485c3aa1bdcfcb729252893 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/pid.hpp>
#include <process/owned.hpp>
#include <stout/lambda.hpp>
#include <stout/try.hpp>
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "master/allocator/mesos/allocator.hpp"
#include "master/master.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using mesos::internal::master::allocator::MesosAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using mesos::v1::scheduler::Mesos;
using process::Clock;
using process::Future;
using process::PID;
using process::Owned;
using std::string;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
using testing::SaveArg;
namespace mesos {
namespace internal {
namespace tests {
// NOTE: These tests are for the v1 HTTP API and are semantically similar to
// the corresponding tests in `src/tests/fault_tolerance_tests.cpp`.
class HttpFaultToleranceTest : public MesosTest {};
// This test verifies that a framework attempting to resubscribe
// with a different principal during its failover timeout
// gets an error.
TEST_F(HttpFaultToleranceTest, FrameworkPrincipalChangeFails)
{
master::Flags flags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
v1::FrameworkID frameworkId;
// Launch the first (i.e., failing) scheduler and wait until it receives
// a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
{
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_failover_timeout(Weeks(2).secs());
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::JSON,
scheduler);
AWAIT_READY(subscribed);
frameworkId = subscribed->framework_id();
}
// Now launch the second (i.e., failover) scheduler using the framework id
// recorded from the first scheduler but another set of valid credentials.
// The scheduler should get an error instead of "Subscribed" message.
// The master should disconnect the scheduler after sending an error.
{
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.mutable_id()->CopyFrom(frameworkId);
frameworkInfo.set_principal(v1::DEFAULT_CREDENTIAL_2.principal());
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
// We do not resubscribe after the master disconnects us.
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo, frameworkId))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
// Fail the test if we got `Subscribed` before the master disconnects us.
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.Times(AtMost(0));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
Future<Event::Error> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureArg<1>(&error));
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::JSON,
scheduler,
None(),
v1::DEFAULT_CREDENTIAL_2);
AWAIT_READY(error);
EXPECT_EQ(error->message(),
"Changing framework's principal is not allowed.");
AWAIT_READY(disconnected);
}
}
// This test verifies that a framework attempting to subscribe
// after its failover timeout has elapsed is disallowed.
TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterFailoverTimeout)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_failover_timeout(Weeks(2).secs());
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Future<Nothing> deactivateFramework = FUTURE_DISPATCH(
_, &master::allocator::MesosAllocatorProcess::deactivateFramework);
v1::FrameworkID frameworkId;
ContentType contentType = ContentType::PROTOBUF;
// Launch the first (i.e., failing) scheduler and wait until it receives
// a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
{
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
schedulerLibrary.send(call);
}
AWAIT_READY(subscribed);
frameworkId = subscribed->framework_id();
}
// Wait until master schedules the framework for removal.
AWAIT_READY(deactivateFramework);
// Simulate framework failover timeout.
Clock::pause();
Clock::settle();
Try<Duration> failoverTimeout =
Duration::create(frameworkInfo.failover_timeout());
ASSERT_SOME(failoverTimeout);
Future<Nothing> frameworkFailoverTimeout =
FUTURE_DISPATCH(_, &Master::frameworkFailoverTimeout);
Clock::advance(failoverTimeout.get());
Clock::resume();
// Wait until master actually marks the framework as completed.
AWAIT_READY(frameworkFailoverTimeout);
// Now launch the second (i.e., failover) scheduler using the
// framework id recorded from the first scheduler.
{
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
// Framework should get `Error` event because the framework with this id
// is marked as completed.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
EXPECT_CALL(*scheduler, disconnected(_))
.Times(AtMost(1));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
schedulerLibrary.send(call);
}
AWAIT_READY(error);
}
}
// This test verifies that a framework attempting to subscribe after teardown
// is disallowed.
TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterTeardown)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
v1::FrameworkID frameworkId;
ContentType contentType = ContentType::PROTOBUF;
// Launch the first (i.e., failing) scheduler and wait until it receives
// a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
{
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
schedulerLibrary.send(call);
}
AWAIT_READY(subscribed);
frameworkId = subscribed->framework_id();
Future<Nothing> removeFramework = FUTURE_DISPATCH(
_, &master::allocator::MesosAllocatorProcess::removeFramework);
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
// Teardown the scheduler now.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::TEARDOWN);
schedulerLibrary.send(call);
}
// Wait until master actually marks the framework as completed.
AWAIT_READY(removeFramework);
// Wait for `removeFramework()` to be completed on the master.
Clock::pause();
Clock::settle();
Clock::resume();
// The scheduler should eventually realize the disconnection.
AWAIT_READY(disconnected);
}
// Now launch the second (i.e., failover) scheduler using the
// framework id recorded from the first scheduler.
{
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
// Framework should get `Error` event because the framework
// with this id is marked as completed.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
EXPECT_CALL(*scheduler, disconnected(_))
.Times(AtMost(1));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
schedulerLibrary.send(call);
}
AWAIT_READY(error);
}
}
// This test checks that a failed over scheduler gets the retried status update
// when the original instance dies without acknowledging the update.
TEST_F(HttpFaultToleranceTest, SchedulerFailoverStatusUpdate)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = ContentType::PROTOBUF;
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
schedulerLibrary.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
Future<Nothing> acknowledged;
EXPECT_CALL(*executor, acknowledged(_, _))
.WillOnce(FutureSatisfy(&acknowledged));
Future<Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", executorId));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
schedulerLibrary.send(call);
}
AWAIT_READY(acknowledged);
AWAIT_READY(update);
EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
EXPECT_EQ(executorId, devolve(update->status().executor_id()));
EXPECT_TRUE(update->status().has_executor_id());
EXPECT_TRUE(update->status().has_uuid());
// Failover the scheduler without acknowledging the status update.
auto scheduler2 = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected2;
EXPECT_CALL(*scheduler2, connected(_))
.WillOnce(FutureSatisfy(&connected2));
// Failover to another scheduler instance.
v1::scheduler::TestMesos schedulerLibrary2(
master.get()->pid,
contentType,
scheduler2);
AWAIT_READY(connected2);
// The previously connected scheduler instance should receive an
// error/disconnected event.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler2, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler2, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
// Scheduler2 should receive the retried status update.
Future<Nothing> update2;
EXPECT_CALL(*scheduler2, update(_, _))
.WillOnce(FutureSatisfy(&update2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
schedulerLibrary2.send(call);
}
AWAIT_READY(error);
AWAIT_READY(disconnected);
AWAIT_READY(subscribed);
EXPECT_EQ(frameworkId, subscribed->framework_id());
Clock::pause();
// Now advance time enough for the reliable timeout to kick in and
// another status update to be sent.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(update2);
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// This test ensures that the failed over scheduler receives the executor to
// framework message.
TEST_F(HttpFaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = ContentType::PROTOBUF;
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
schedulerLibrary.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
v1::executor::Mesos* executorLib;
EXPECT_CALL(*executor, subscribed(_, _))
.WillOnce(SaveArg<0>(&executorLib));
Future<Nothing> launch;
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(FutureSatisfy(&launch));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", executorId));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
schedulerLibrary.send(call);
}
AWAIT_READY(launch);
auto scheduler2 = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected2;
EXPECT_CALL(*scheduler2, connected(_))
.WillOnce(FutureSatisfy(&connected2));
// Failover to another scheduler instance.
v1::scheduler::TestMesos schedulerLibrary2(
master.get()->pid,
contentType,
scheduler2);
AWAIT_READY(connected2);
// The previously connected scheduler instance should receive an
// error/disconnected event.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler2, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler2, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
schedulerLibrary2.send(call);
}
AWAIT_READY(error);
AWAIT_READY(disconnected);
AWAIT_READY(subscribed);
EXPECT_EQ(frameworkId, subscribed->framework_id());
Future<Event::Message> message;
EXPECT_CALL(*scheduler2, message(_, _))
.WillOnce(FutureArg<1>(&message));
{
v1::executor::Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.mutable_executor_id()->CopyFrom(evolve(executorId));
call.set_type(v1::executor::Call::MESSAGE);
v1::executor::Call::Message* message = call.mutable_message();
message->set_data("hello world");
executorLib->send(call);
}
AWAIT_READY(message);
ASSERT_EQ("hello world", message->data());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// This test ensures that the failed over scheduler is able to send a message
// to the executor.
TEST_F(HttpFaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = ContentType::PROTOBUF;
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
schedulerLibrary.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
Future<Nothing> launch;
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(FutureSatisfy(&launch));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", executorId));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
schedulerLibrary.send(call);
}
AWAIT_READY(launch);
auto scheduler2 = std::make_shared<v1::MockHTTPScheduler>();
Future<Nothing> connected2;
EXPECT_CALL(*scheduler2, connected(_))
.WillOnce(FutureSatisfy(&connected2));
// Failover to another scheduler instance.
v1::scheduler::TestMesos schedulerLibrary2(
master.get()->pid,
contentType,
scheduler2);
AWAIT_READY(connected2);
// The previously connected scheduler instance should receive an
// error/disconnected event.
Future<Nothing> error;
EXPECT_CALL(*scheduler, error(_, _))
.WillOnce(FutureSatisfy(&error));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler2, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler2, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
schedulerLibrary2.send(call);
}
AWAIT_READY(error);
AWAIT_READY(disconnected);
AWAIT_READY(subscribed);
EXPECT_EQ(frameworkId, subscribed->framework_id());
Future<v1::executor::Event::Message> message;
EXPECT_CALL(*executor, message(_, _))
.WillOnce(FutureArg<1>(&message));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::MESSAGE);
Call::Message* message = call.mutable_message();
message->mutable_agent_id()->CopyFrom(offer.agent_id());
message->mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
message->set_data("hello world");
schedulerLibrary2.send(call);
}
AWAIT_READY(message);
ASSERT_EQ("hello world", message->data());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// This test checks that a scheduler exit shuts down the executor.
TEST_F(HttpFaultToleranceTest, SchedulerExit)
{
master::Flags flags = CreateMasterFlags();
flags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
ContentType contentType = ContentType::PROTOBUF;
v1::scheduler::TestMesos schedulerLibrary(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
schedulerLibrary.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
Future<Nothing> launch;
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(FutureSatisfy(&launch));
const v1::Offer& offer = offers->offers(0);
v1::TaskInfo taskInfo =
evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
schedulerLibrary.send(call);
}
AWAIT_READY(launch);
EXPECT_CALL(*scheduler, disconnected(_))
.Times(AtMost(1));
Future<Nothing> shutdown;
EXPECT_CALL(*executor, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
{
Call call;
call.mutable_framework_id()->CopyFrom(frameworkId);
call.set_type(Call::TEARDOWN);
schedulerLibrary.send(call);
}
// Ensure that the executor receives a `Event::Shutdown` after the
// scheduler exit.
AWAIT_READY(shutdown);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {