blob: 9c31eeae1a1b67af142a01e6c548b509ba06740c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <vector>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/mesos.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/allocator/allocator.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include "common/protobuf_utils.hpp"
#include "master/master.hpp"
#include "master/detector/standalone.hpp"
#include "slave/slave.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
using namespace mesos::internal::protobuf;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::master::detector::StandaloneMasterDetector;
using process::Clock;
using process::Future;
using process::Message;
using process::Owned;
using process::PID;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::Return;
using testing::SaveArg;
namespace mesos {
namespace internal {
namespace tests {
class MasterSlaveReconciliationTest : public MesosTest {};
// This test verifies that a re-registering slave sends the terminal
// unacknowledged tasks for a terminal executor. This is required
// for the master to correctly reconcile its view with the slave's
// view of tasks. This test drops a terminal update to the master
// and then forces the slave to re-register.
TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminatedExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
= FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
master.get()->pid,
slave.get()->pid);
driver.start();
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
// Make sure the acknowledgement reaches the slave.
AWAIT_READY(statusUpdateAcknowledgementMessage);
// Drop the TASK_FINISHED status update sent to the master.
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
Future<ExitedExecutorMessage> executorExitedMessage =
FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
TaskStatus finishedStatus;
finishedStatus = status.get();
finishedStatus.set_state(TASK_FINISHED);
execDriver->sendStatusUpdate(finishedStatus);
// Ensure the update was sent.
AWAIT_READY(statusUpdateMessage);
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
// Now kill the executor.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
// We drop the 'UpdateFrameworkMessage' from the master to slave to
// stop the status update manager from retrying the update that was
// already sent due to the new master detection.
DROP_PROTOBUFS(UpdateFrameworkMessage(), _, _);
detector.appoint(master.get()->pid);
AWAIT_READY(status2);
EXPECT_EQ(TASK_FINISHED, status2->state());
driver.stop();
driver.join();
}
// This test verifies that the master reconciles non-partition-aware
// tasks that are missing from a re-registering slave. In this case,
// we drop the RunTaskMessage, so the slave should send TASK_LOST.
TEST_F(MasterSlaveReconciliationTest, ReconcileLostTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test task");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
// We now launch a task and drop the corresponding RunTaskMessage on
// the slave, to ensure that only the master knows about this task.
Future<RunTaskMessage> runTaskMessage =
DROP_PROTOBUF(RunTaskMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(runTaskMessage);
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
Future<StatusUpdateMessage> statusUpdateMessage =
FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
detector.appoint(master.get()->pid);
AWAIT_READY(slaveReregisteredMessage);
// Make sure the slave generated the TASK_LOST.
AWAIT_READY(statusUpdateMessage);
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status->task_id());
EXPECT_EQ(TASK_LOST, status->state());
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, status->reason());
// Before we obtain the metrics, ensure that the master has finished
// processing the status update so metrics have been updated.
Clock::pause();
Clock::settle();
Clock::resume();
// Check metrics.
JSON::Object stats = Metrics();
EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
EXPECT_EQ(
1u,
stats.values["master/task_lost/source_slave/reason_reconciliation"]);
driver.stop();
driver.join();
}
// This test verifies that the master reconciles partition-aware tasks
// that are missing from a re-registering slave. In this case, we drop
// the RunTaskMessage, so the slave should send TASK_DROPPED.
TEST_F(MasterSlaveReconciliationTest, ReconcileDroppedTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.add_capabilities()->set_type(
FrameworkInfo::Capability::PARTITION_AWARE);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test task");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
// We now launch a task and drop the corresponding RunTaskMessage on
// the slave, to ensure that only the master knows about this task.
Future<RunTaskMessage> runTaskMessage =
DROP_PROTOBUF(RunTaskMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(runTaskMessage);
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
Future<StatusUpdateMessage> statusUpdateMessage =
FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
detector.appoint(master.get()->pid);
AWAIT_READY(slaveReregisteredMessage);
// Make sure the slave generated the TASK_DROPPED.
AWAIT_READY(statusUpdateMessage);
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status->task_id());
EXPECT_EQ(TASK_DROPPED, status->state());
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, status->reason());
// Before we obtain the metrics, ensure that the master has finished
// processing the status update so metrics have been updated.
Clock::pause();
Clock::settle();
Clock::resume();
// Check metrics.
JSON::Object stats = Metrics();
EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
EXPECT_EQ(
1u,
stats.values["master/task_dropped/source_slave/reason_reconciliation"]);
driver.stop();
driver.join();
}
// This test verifies that the master reconciles tasks that are
// missing from a re-registering slave. In this case, we trigger
// a race between the slave re-registration message and the launch
// message. There should be no TASK_LOST / TASK_DROPPED.
// This was motivated by MESOS-1696.
TEST_F(MasterSlaveReconciliationTest, ReconcileRace)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
// Since the agent may have retried registration, we want to
// ensure that any duplicate registrations are flushed before
// we appoint the master again. Otherwise, the agent may
// receive a stale registration message.
Clock::pause();
Clock::settle();
Clock::resume();
// Trigger a re-registration of the slave and capture the message
// so that we can spoof a race with a launch task message.
DROP_PROTOBUFS(ReregisterSlaveMessage(), slave.get()->pid, master.get()->pid);
Future<ReregisterSlaveMessage> reregisterSlaveMessage =
DROP_PROTOBUF(
ReregisterSlaveMessage(),
slave.get()->pid,
master.get()->pid);
detector.appoint(master.get()->pid);
AWAIT_READY(reregisterSlaveMessage);
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test task");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
ExecutorDriver* executorDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(SaveArg<0>(&executorDriver));
// Leave the task in TASK_STAGING.
Future<Nothing> launchTask;
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(FutureSatisfy(&launchTask));
EXPECT_CALL(sched, statusUpdate(&driver, _))
.Times(0);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(launchTask);
// Send the stale re-registration message, which does not contain
// the task we just launched. This will trigger a reconciliation
// by the master.
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Prevent this from being dropped per the DROP_PROTOBUFS above.
FUTURE_PROTOBUF(
ReregisterSlaveMessage(),
slave.get()->pid,
master.get()->pid);
process::post(
slave.get()->pid,
master.get()->pid,
reregisterSlaveMessage.get());
AWAIT_READY(slaveReregisteredMessage);
// Neither the master nor the slave should send a TASK_LOST
// as part of the reconciliation. We check this by calling
// Clock::settle() to flush all pending events.
Clock::pause();
Clock::settle();
Clock::resume();
// Now send TASK_FINISHED and make sure it's the only message
// received by the scheduler.
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
TaskStatus taskStatus;
taskStatus.mutable_task_id()->CopyFrom(task.task_id());
taskStatus.set_state(TASK_FINISHED);
executorDriver->sendStatusUpdate(taskStatus);
AWAIT_READY(status);
ASSERT_EQ(TASK_FINISHED, status->state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that the slave reports pending tasks when
// re-registering, otherwise the master will report them as being
// lost.
TEST_F(MasterSlaveReconciliationTest, SlaveReregisterPendingTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
// No TASK_LOST updates should occur!
EXPECT_CALL(sched, statusUpdate(&driver, _))
.Times(0);
// We drop the _run dispatch to ensure the task remains
// pending in the slave.
Future<Nothing> _run = DROP_DISPATCH(slave.get()->pid, &Slave::_run);
TaskInfo task1;
task1.set_name("test task");
task1.mutable_task_id()->set_value("1");
task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task1.mutable_resources()->MergeFrom(offers.get()[0].resources());
task1.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
driver.launchTasks(offers.get()[0].id(), {task1});
AWAIT_READY(_run);
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
detector.appoint(master.get()->pid);
AWAIT_READY(slaveReregisteredMessage);
Clock::pause();
Clock::settle();
Clock::resume();
driver.stop();
driver.join();
}
// This test verifies that when the slave re-registers, the master
// does not send TASK_LOST update for a task that has reached terminal
// state but is waiting for an acknowledgement.
TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminalTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test task");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
EXPECT_CALL(exec, registered(_, _, _, _));
// Send a terminal update right away.
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
// Drop the status update from slave to the master, so that
// the slave has a pending terminal update when it re-registers.
DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()->pid);
Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore retried update due to update framework.
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(_statusUpdate);
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
detector.appoint(master.get()->pid);
AWAIT_READY(slaveReregisteredMessage);
// The master should not send a TASK_LOST after the slave
// re-registers. We check this by calling Clock::settle() so that
// the only update the scheduler receives is the retried
// TASK_FINISHED update.
// NOTE: The status update manager resends the status update when
// it detects a new master.
Clock::pause();
Clock::settle();
AWAIT_READY(status);
ASSERT_EQ(TASK_FINISHED, status->state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that when the slave re-registers, we correctly
// send the information about actively running frameworks.
TEST_F(MasterSlaveReconciliationTest, SlaveReregisterFrameworks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test task");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
EXPECT_CALL(exec, registered(_, _, _, _));
// Send an update right away.
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore retried update due to update framework.
driver.launchTasks(offers.get()[0].id(), {task});
// Wait until TASK_RUNNING of the task is received.
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
Future<ReregisterSlaveMessage> reregisterSlave =
FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
// Simulate a spurious master change event (e.g., due to ZooKeeper
// expiration) at the slave to force re-registration.
detector.appoint(master.get()->pid);
// Expect to receive the 'ReregisterSlaveMessage' containing the
// active frameworks.
AWAIT_READY(reregisterSlave);
EXPECT_EQ(1u, reregisterSlave->frameworks().size());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {