blob: db080c4e9c8b0c036294a8f7a42617ca1231f884 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <type_traits>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
#include "internal/devolve.hpp"
#include "master/detector/standalone.hpp"
#include "slave/constants.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/mesos/isolators/network/ports.hpp"
#include "tests/mesos.hpp"
using mesos::internal::slave::NetworkPortsIsolatorProcess;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using process::Clock;
using process::Future;
using process::Owned;
using mesos::v1::scheduler::Event;
using std::string;
using std::vector;
using testing::DoAll;
using namespace routing::diagnosis;
namespace mesos {
namespace internal {
namespace tests {
class NetworkPortsIsolatorTest : public MesosTest
{
public:
virtual void SetUp()
{
MesosTest::SetUp();
std::srand(std::time(0));
}
// Wait until a status update is received and subsequently acknowledged.
// If we don't wait for the acknowledgement, then advancing the clock can
// cause the agent to time out on receiving the acknowledgement, at which
// point it will re-send and the test will intercept an unexpected duplicate
// status update.
template <typename Update>
void awaitStatusUpdateAcked(Future<Update>& status)
{
Future<Nothing> ack =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
AWAIT_READY(status);
AWAIT_READY(ack);
}
// Expect that the TaskStatus is reporting a successful health check.
void expectHealthyStatus(const TaskStatus& status)
{
EXPECT_EQ(TASK_RUNNING, status.state());
EXPECT_EQ(
TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED,
status.reason());
ASSERT_TRUE(status.has_healthy());
EXPECT_TRUE(status.healthy());
}
// Expect that the TaskStatus is a container limitation that tells us
// about a single ports resource.
void expectPortsLimitation(
const TaskStatus& status,
const Option<uint64_t>& port = None())
{
EXPECT_EQ(TaskStatus::REASON_CONTAINER_LIMITATION, status.reason());
ASSERT_TRUE(status.has_limitation()) << JSON::protobuf(status);
Resources limit = Resources(status.limitation().resources());
EXPECT_EQ(1u, limit.size());
ASSERT_SOME(limit.ports());
if (port.isSome()) {
ASSERT_EQ(1, limit.ports()->range().size());
EXPECT_EQ(port.get(), limit.ports()->range(0).begin());
EXPECT_EQ(port.get(), limit.ports()->range(0).end());
}
}
};
// Select a random port from an offer.
template <typename R>
static uint16_t selectRandomPort(const R& resources)
{
auto ports = resources.ports()->range(0);
return ports.begin() + std::rand() % (ports.end() - ports.begin() + 1);
}
// Select a random port that is not the same as the one given.
template <typename R>
static uint16_t selectOtherPort(const R& resources, uint16_t port)
{
uint16_t selected;
do {
selected = selectRandomPort(resources);
} while (selected == port);
return selected;
}
template <typename T>
static void addTcpHealthCheck(T& taskInfo, uint16_t port)
{
auto* checkInfo = taskInfo.mutable_health_check();
checkInfo->set_type(std::remove_pointer<decltype(checkInfo)>::type::TCP);
checkInfo->set_delay_seconds(0);
checkInfo->set_grace_period_seconds(10);
checkInfo->set_interval_seconds(1);
checkInfo->mutable_tcp()->set_port(port);
}
// This test verifies that we can correctly detect sockets that
// a process is listening on. We take advantage of the fact that
// libprocess always implicitly listens on a socket, so we can
// query our current PID for listening sockets and verify that
// result against the libprocess address.
TEST(NetworkPortsIsolatorUtilityTest, QueryProcessSockets)
{
Try<hashmap<uint32_t, socket::Info>> listeners =
NetworkPortsIsolatorProcess::getListeningSockets();
ASSERT_SOME(listeners);
EXPECT_GT(listeners->size(), 0u);
foreachvalue (const socket::Info& info, listeners.get()) {
EXPECT_SOME(info.sourceIP);
EXPECT_SOME(info.sourcePort);
}
Try<std::vector<uint32_t>> socketInodes =
NetworkPortsIsolatorProcess::getProcessSockets(getpid());
ASSERT_SOME(socketInodes);
EXPECT_GT(socketInodes->size(), 0u);
vector<socket::Info> socketInfos;
// Collect the Info for our own listening sockets.
foreach (uint32_t inode, socketInodes.get()) {
if (listeners->contains(inode)) {
socketInfos.push_back(listeners->at(inode));
}
}
// libprocess always listens on a socket, so the fact that we
// are running implies that we should at least find out about
// the libprocess socket.
EXPECT_GT(socketInfos.size(), 0u);
bool matched = false;
process::network::inet::Address processAddress = process::address();
foreach (const auto& info, socketInfos) {
// We can only match on the port, since libprocess will typically
// indicate that it is listening on the ANY address (i.e. 0.0.0.0)
// but the socket diagnostics will publish the actual address of a
// network interface.
if (ntohs(info.sourcePort.get()) == processAddress.port) {
matched = true;
}
}
// Verify that we matched the libprocess address in the set of
// listening sockets for this process.
EXPECT_TRUE(matched) << "Unmatched libprocess address "
<< processAddress;
}
// This test verifies that the `network/ports` isolator throws
// an error unless the `linux` launcher is being used.
TEST_F(NetworkPortsIsolatorTest, ROOT_IsolatorFlags)
{
StandaloneMasterDetector detector;
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
Try<Owned<cluster::Slave>> slave = Owned<cluster::Slave>();
flags.launcher = "posix";
slave = StartSlave(&detector, flags);
ASSERT_ERROR(slave);
flags.launcher = "linux";
slave = StartSlave(&detector, flags);
ASSERT_SOME(slave);
}
// libprocess always listens on a port when it is initialized
// with no control over whether task resources are allocated
// for that port. This test verifies that a task that uses the
// command executor will always be killed due to the libprocess
// port even when it doesn't open any ports itself.
TEST_F(NetworkPortsIsolatorTest, ROOT_CommandExecutorPorts)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
flags.check_agent_port_range_only = false;
flags.enforce_container_ports = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task = createTask(
offer.slave_id(),
Resources::parse("cpus:1;mem:32").get(),
"sleep 10000");
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
Future<TaskStatus> failedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&failedStatus));
// Even though the task itself never listened on any ports, we expect
// that it gets killed because the isolator detects the libprocess
// port the command executor is listening on.
AWAIT_READY(failedStatus);
EXPECT_EQ(task.task_id(), failedStatus->task_id());
EXPECT_EQ(TASK_FAILED, failedStatus->state());
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
expectPortsLimitation(failedStatus.get());
driver.stop();
driver.join();
}
// This test verifies that a task that correctly listens on
// ports for which it holds resources is allowed to run and it
// not killed by the `network/ports` isolator.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_AllocatedPorts)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
// Watch only the agent ports resources range because we want this
// test to trigger on the nc command, not on the command executor.
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
Resources resources(offers.get()[0].resources());
// Make sure we have a `ports` resource.
ASSERT_SOME(resources.ports());
ASSERT_LE(1, resources.ports()->range().size());
uint16_t taskPort = selectRandomPort(resources);
resources = Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(taskPort));
addTcpHealthCheck(task, taskPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
Future<TaskStatus> killedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&killedStatus));
driver.killTask(task.task_id());
AWAIT_READY(killedStatus);
EXPECT_EQ(task.task_id(), killedStatus->task_id());
EXPECT_EQ(TASK_KILLED, killedStatus->state());
driver.stop();
driver.join();
}
// This test verifies that if the agent has an empty ports
// resource, and the check_agent_port_range_only flag is enabled,
// a task using an arbitrary port is allowed to start up and
// become healthy. This is correct because it effectively reduces
// the set of ports we are protecting to zero.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_NoPortsResource)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
// Omit the default ports from the agent resources.
flags.resources = R"(
[
{
"name": "cpus",
"type": "SCALAR",
"scalar": {
"value": 2
}
},
{
"name": "gpus",
"type": "SCALAR",
"scalar": {
"value": 0
}
},
{
"name": "mem",
"type": "SCALAR",
"scalar": {
"value": 1024
}
},
{
"name": "disk",
"type": "SCALAR",
"scalar": {
"value": 1024
}
},
{
"name": "ports",
"type": "RANGES",
"ranges": {
}
}
]
)";
// Watch only the agent ports resources range because we want this
// test to trigger on the nc command, not on the command executor.
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
Resources resources(offers.get()[0].resources());
// Make sure we do not have a `ports` resource in the offer.
ASSERT_NONE(resources.ports());
// Select a random task port from the default range.
resources = Resources::parse(
"ports",
stringify(slave::DEFAULT_PORTS),
flags.default_role).get();
uint16_t taskPort = selectRandomPort(resources);
resources = Resources::parse("cpus:1;mem:32").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(taskPort));
addTcpHealthCheck(task, taskPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
Future<TaskStatus> killedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&killedStatus));
driver.killTask(task.task_id());
AWAIT_READY(killedStatus);
EXPECT_EQ(task.task_id(), killedStatus->task_id());
EXPECT_EQ(TASK_KILLED, killedStatus->state());
driver.stop();
driver.join();
}
// This test verifies that the isolator correctly defaults the agent ports
// resource when the operator doesn't specify any ports. We verify that a
// task that uses an unallocated port in the agent's offer is detected and
// killed by a container limitation.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_DefaultPortsResource)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
// Clear resources set by CreateSlaveFlags() to force the agent and
// isolator to apply built-in defaults.
flags.resources = None();
// Watch only the agent ports resources range because we want this
// test to trigger on the nc command, not on the command executor.
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
// Make sure we have a `ports` resource.
Resources resources(offer.resources());
ASSERT_SOME(resources.ports());
ASSERT_LE(1, resources.ports()->range().size());
uint16_t taskPort = selectRandomPort(resources);
uint16_t usedPort = selectOtherPort(resources, taskPort);
resources = Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(usedPort));
addTcpHealthCheck(task, usedPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
Future<TaskStatus> failedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&failedStatus));
// We expect that the task will get killed by the isolator.
AWAIT_READY(failedStatus);
EXPECT_EQ(task.task_id(), failedStatus->task_id());
EXPECT_EQ(TASK_FAILED, failedStatus->state());
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
expectPortsLimitation(failedStatus.get(), usedPort);
driver.stop();
driver.join();
}
// This test verifies that a task that listens on a port for which it has
// no resources is detected and killed by a container limitation.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_UnallocatedPorts)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
// Watch only the agent ports resources range because we want this
// test to trigger on the nc command, not on the command executor.
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
// Make sure we have a `ports` resource.
Resources resources(offer.resources());
ASSERT_SOME(resources.ports());
ASSERT_LE(1, resources.ports()->range().size());
uint16_t taskPort = selectRandomPort(resources);
uint16_t usedPort = selectOtherPort(resources, taskPort);
resources = Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Launch a task that uses a port that it hasn't been allocated. Use
// "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(usedPort));
addTcpHealthCheck(task, usedPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
Future<TaskStatus> failedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&failedStatus));
// We expect that the task will get killed by the isolator.
AWAIT_READY(failedStatus);
EXPECT_EQ(task.task_id(), failedStatus->task_id());
EXPECT_EQ(TASK_FAILED, failedStatus->state());
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
expectPortsLimitation(failedStatus.get(), usedPort);
driver.stop();
driver.join();
}
// This test verifies that a task that listens on a port for which
// it has no resources is detected and will not be killed by
// a container limitation if enforce_container_ports is false.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_NoPortEnforcement)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
// Watch only the agent ports resources range because we want this
// test to trigger on the nc command, not on the command executor.
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = false;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
DEFAULT_FRAMEWORK_INFO,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
// Make sure we have a `ports` resource.
Resources resources(offer.resources());
ASSERT_SOME(resources.ports());
ASSERT_LE(1, resources.ports()->range().size());
uint16_t taskPort = selectRandomPort(resources);
uint16_t usedPort = selectOtherPort(resources, taskPort);
resources = Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Launch a task that uses a port that it hasn't been allocated. Use
// "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(usedPort));
addTcpHealthCheck(task, usedPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
// Since container ports are not being enforced, we expect that the task
// should still be running after the check and that we should be able to
// explicitly kill it.
Future<TaskStatus> killedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&killedStatus));
driver.killTask(task.task_id());
AWAIT_READY(killedStatus);
EXPECT_EQ(task.task_id(), killedStatus->task_id());
EXPECT_EQ(TASK_KILLED, killedStatus->state());
driver.stop();
driver.join();
}
// Test that after we recover a task, the isolator notices that it
// is using the wrong ports and kills it.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverBadTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// Start the agent without any `network/ports` isolation.
slave::Flags flags = CreateSlaveFlags();
flags.launcher = "linux";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
frameworkInfo,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
Resources resources(offers.get()[0].resources());
// Make sure we have a `ports` resource.
ASSERT_SOME(resources.ports());
ASSERT_LE(1, resources.ports()->range().size());
uint16_t taskPort = selectRandomPort(resources);
uint16_t usedPort = selectOtherPort(resources, taskPort);
resources = Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Launch a task that uses a port that it hasn't been allocated. Use
// "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(usedPort));
addTcpHealthCheck(task, usedPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
// Restart the agent.
slave.get()->terminate();
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Add `network/ports` isolation to the restarted agent. This tests that when
// the isolator goes through recovery we will notice the nc command listening
// and terminate it.
flags.isolation = "network/ports";
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
// Wait for the slave to reregister.
AWAIT_READY(slaveReregisteredMessage);
// Now force a ports check, which should terminate the nc command.
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Future<TaskStatus> failedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&failedStatus));
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
// We expect that the task will get killed by the isolator.
AWAIT_READY(failedStatus);
EXPECT_EQ(task.task_id(), failedStatus->task_id());
EXPECT_EQ(TASK_FAILED, failedStatus->state());
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
expectPortsLimitation(failedStatus.get(), usedPort);
driver.stop();
driver.join();
}
// Test that the isolator doesn't kill well-behaved tasks on recovery.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverGoodTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.launcher = "linux";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
frameworkInfo,
master.get()->pid,
DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
const Offer& offer = offers.get()[0];
Resources resources(offers.get()[0].resources());
// Make sure we have a `ports` resource.
ASSERT_SOME(resources.ports());
ASSERT_LE(1, resources.ports()->range().size());
uint16_t taskPort = selectRandomPort(resources);
resources = Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
TaskInfo task = createTask(
offer.slave_id(),
resources,
"nc -k -l " + stringify(taskPort));
addTcpHealthCheck(task, taskPort);
Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
Future<TaskStatus> healthStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus))
.WillOnce(FutureArg<1>(&healthStatus));
driver.launchTasks(offer.id(), {task});
awaitStatusUpdateAcked(startingStatus);
EXPECT_EQ(task.task_id(), startingStatus->task_id());
EXPECT_EQ(TASK_STARTING, startingStatus->state());
awaitStatusUpdateAcked(runningStatus);
EXPECT_EQ(task.task_id(), runningStatus->task_id());
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
awaitStatusUpdateAcked(healthStatus);
ASSERT_EQ(task.task_id(), healthStatus->task_id());
expectHealthyStatus(healthStatus.get());
// Restart the agent.
slave.get()->terminate();
// Add `network/ports` isolation to the restarted agent. This tests that
// when the isolator goes through recovery we will notice the nc command
// listening and will let it continue running.
flags.isolation = "network/ports";
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
// Wait for the slave to reregister.
AWAIT_READY(slaveReregisteredMessage);
// We should not get any status updates because the task should
// stay running. We wait for a check to run and settle any
// messages that result from that to ensure we don't miss any
// triggered limitations.
EXPECT_CALL(sched, statusUpdate(&driver, _)).Times(0);
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::pause();
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
// Since the task is still running, we should be able to kill it
// and receive the expected status update state.
Future<TaskStatus> killedStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&killedStatus))
.RetiresOnSaturation();
driver.killTask(task.task_id());
AWAIT_READY(killedStatus);
EXPECT_EQ(task.task_id(), killedStatus->task_id());
EXPECT_EQ(TASK_KILLED, killedStatus->state());
driver.stop();
driver.join();
}
// Verify that a nested container that listens on ports it does
// not hold resources for is detected and killed.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_TaskGroup)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.isolation = "network/ports";
flags.launcher = "linux";
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
"cpus:0.1;mem:32;disk:32",
v1::ExecutorInfo::DEFAULT,
frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0, offers->offers().size());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
uint16_t taskPort = selectRandomPort(v1::Resources(offer.resources()));
uint16_t usedPort =
selectOtherPort(v1::Resources(offer.resources()), taskPort);
v1::Resources resources = v1::Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
v1::TaskInfo taskInfo = v1::createTask(
agentId,
resources,
"nc -k -l " + stringify(usedPort));
addTcpHealthCheck(taskInfo, usedPort);
Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateHealth;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(FutureArg<1>(&updateStarting),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateHealth),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())));
v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
executorInfo,
v1::createTaskGroupInfo({taskInfo}));
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
awaitStatusUpdateAcked(updateStarting);
ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
awaitStatusUpdateAcked(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
// This update is sent when the first healthcheck succeeds.
awaitStatusUpdateAcked(updateHealth);
ASSERT_EQ(taskInfo.task_id(), updateHealth->status().task_id());
expectHealthyStatus(devolve(updateHealth->status()));
Future<Event::Update> updateFinished;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(FutureArg<1>(&updateFinished),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())));
Future<Nothing> failure;
EXPECT_CALL(*scheduler, failure(_, _))
.WillOnce(FutureSatisfy(&failure));
Clock::pause();
Clock::settle();
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
// Wait for the final status update which should tell us the
// task has been killed.
AWAIT_READY(updateFinished);
AWAIT_READY(failure);
ASSERT_EQ(v1::TASK_FAILED, updateFinished->status().state())
<< JSON::protobuf(updateFinished->status());
EXPECT_EQ(taskInfo.task_id(), updateFinished->status().task_id())
<< JSON::protobuf(updateFinished->status());
// Depending on event ordering, the status source can be SOURCE_AGENT or
// SOURCE_EXECUTOR. It doesn't matter who sends it, since we expect the
// contents of the update to be identical.
EXPECT_NE(v1::TaskStatus::SOURCE_MASTER, updateFinished->status().source())
<< JSON::protobuf(updateFinished->status());
expectPortsLimitation(devolve(updateFinished->status()), usedPort);
}
// Test that after we recover a task, the isolator notices that it
// is using the wrong ports and kills it.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverNestedBadTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
const string slaveId = "RecoverNestedBadTask";
slave::Flags flags = CreateSlaveFlags();
flags.launcher = "linux";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), slaveId, flags);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
"cpus:0.1;mem:32;disk:32",
v1::ExecutorInfo::DEFAULT,
frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0, offers->offers().size());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
uint16_t taskPort = selectRandomPort(v1::Resources(offer.resources()));
uint16_t usedPort =
selectOtherPort(v1::Resources(offer.resources()), taskPort);
v1::Resources resources = v1::Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
v1::TaskInfo taskInfo = v1::createTask(
agentId,
resources,
"nc -k -l " + stringify(usedPort));
addTcpHealthCheck(taskInfo, usedPort);
Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateHealth;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(FutureArg<1>(&updateStarting),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateHealth),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())));
v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
executorInfo,
v1::createTaskGroupInfo({taskInfo}));
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
awaitStatusUpdateAcked(updateStarting);
ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
awaitStatusUpdateAcked(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
// This update is sent when the first healthcheck succeeds.
awaitStatusUpdateAcked(updateHealth);
ASSERT_EQ(taskInfo.task_id(), updateHealth->status().task_id());
expectHealthyStatus(devolve(updateHealth->status()));
// Restart the agent.
slave.get()->terminate();
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Add `network/ports` isolation to the restarted agent. This tests that when
// the isolator goes through recovery we will notice the nc command listening
// and terminate it.
flags.isolation = "network/ports";
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
slave = cluster::Slave::create(detector.get(), flags, slaveId);
ASSERT_SOME(slave);
slave.get()->start();
// Wait for the slave to reregister.
AWAIT_READY(slaveReregisteredMessage);
Future<Event::Update> updateFinished;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(FutureArg<1>(&updateFinished),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())));
Future<Nothing> failure;
EXPECT_CALL(*scheduler, failure(_, _))
.WillOnce(FutureSatisfy(&failure));
// Now force a ports check, which should terminate the nc command.
Clock::pause();
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
// Wait for the final status update which should tell us the
// task has been killed.
AWAIT_READY(updateFinished);
AWAIT_READY(failure);
ASSERT_EQ(v1::TASK_FAILED, updateFinished->status().state())
<< JSON::protobuf(updateFinished->status());
EXPECT_EQ(taskInfo.task_id(), updateFinished->status().task_id())
<< JSON::protobuf(updateFinished->status());
// Depending on event ordering, the status source can be SOURCE_AGENT or
// SOURCE_EXECUTOR. It doesn't matter who sends it, since we expect the
// contents of the update to be identical.
EXPECT_NE(v1::TaskStatus::SOURCE_MASTER, updateFinished->status().source())
<< JSON::protobuf(updateFinished->status());
EXPECT_EQ(
v1::TaskStatus::REASON_CONTAINER_LIMITATION,
updateFinished->status().reason())
<< JSON::protobuf(updateFinished->status());
expectPortsLimitation(devolve(updateFinished->status()), usedPort);
}
// This test verifies that the `network/ports` isolator does not kill a
// well-behaved nested container when it recovers it after an agent restart.
TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverNestedGoodTask)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
const string slaveId = "RecoverNestedGoodTask";
slave::Flags flags = CreateSlaveFlags();
flags.launcher = "linux";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), slaveId, flags);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
"cpus:0.1;mem:32;disk:32",
v1::ExecutorInfo::DEFAULT,
frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0, offers->offers().size());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
uint16_t taskPort = selectRandomPort(v1::Resources(offer.resources()));
v1::Resources resources = v1::Resources::parse(
"cpus:1;mem:32;"
"ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
// Use "nc -k" so nc keeps running after accepting the healthcheck connection.
v1::TaskInfo taskInfo = v1::createTask(
agentId,
resources,
"nc -k -l " + stringify(taskPort));
addTcpHealthCheck(taskInfo, taskPort);
Future<Event::Update> updateStarting;
Future<Event::Update> updateRunning;
Future<Event::Update> updateHealth;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(FutureArg<1>(&updateStarting),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateRunning),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.WillOnce(DoAll(FutureArg<1>(&updateHealth),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())));
v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
executorInfo,
v1::createTaskGroupInfo({taskInfo}));
mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
awaitStatusUpdateAcked(updateStarting);
ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state());
EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
awaitStatusUpdateAcked(updateRunning);
ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state());
EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
// This update is sent when the first healthcheck succeeds.
awaitStatusUpdateAcked(updateHealth);
ASSERT_EQ(taskInfo.task_id(), updateHealth->status().task_id());
expectHealthyStatus(devolve(updateHealth->status()));
// Restart the agent.
slave.get()->terminate();
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
// Add `network/ports` isolation to the restarted agent. This tests that
// when the isolator goes through recovery we will notice the nc command
// listening and will let it continue running.
flags.isolation = "network/ports";
flags.check_agent_port_range_only = true;
flags.enforce_container_ports = true;
slave = cluster::Slave::create(detector.get(), flags, slaveId);
ASSERT_SOME(slave);
slave.get()->start();
// Wait for the slave to reregister.
AWAIT_READY(slaveReregisteredMessage);
// We expect that the task will continue to run, so the health check
// status won't change and we will not get any status updates from it.
EXPECT_CALL(*scheduler, update(_, _)).Times(0);
// Now force a ports check to ensure that we have an opportunity to
// kill the task but correctly leave it alone.
Clock::pause();
Future<Nothing> check =
FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
Clock::advance(flags.container_ports_watch_interval);
AWAIT_READY(check);
Clock::settle();
Clock::resume();
Future<Event::Update> updateKilled;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(FutureArg<1>(&updateKilled),
v1::scheduler::SendAcknowledge(
frameworkId,
offer.agent_id())))
.RetiresOnSaturation();
mesos.send(v1::createCallKill(frameworkId, taskInfo.task_id()));
// Since the task is still running, we should be able to kill it
// and receive the expected status update state.
awaitStatusUpdateAcked(updateKilled);
ASSERT_EQ(v1::TASK_KILLED, updateKilled->status().state());
EXPECT_EQ(taskInfo.task_id(), updateKilled->status().task_id());
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {