blob: a43019d836fcad82050124d5355ca9a779a5ee6a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <mesos/slave/container_logger.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/owned.hpp>
#include <process/subprocess.hpp>
#include <stout/duration.hpp>
#include <stout/uuid.hpp>
#ifdef __linux__
#include "linux/cgroups.hpp"
#include "linux/fs.hpp"
#endif // __linux__
#include "messages/messages.hpp"
#include "slave/containerizer/docker.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
#include "tests/mock_docker.hpp"
using namespace mesos::internal::slave::paths;
using namespace mesos::internal::slave::state;
using namespace process;
using mesos::internal::master::Master;
using mesos::internal::slave::DockerContainerizer;
using mesos::internal::slave::DockerContainerizerProcess;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerLogger;
using mesos::slave::ContainerTermination;
using std::list;
using std::string;
using std::vector;
using testing::_;
using testing::DoAll;
using testing::DoDefault;
using testing::Eq;
using testing::Invoke;
using testing::Return;
namespace process {
// We need to reinitialize libprocess in order to test against
// different configurations, such as when libprocess is initialized
// with SSL or IPv6 enabled.
void reinitialize(
const Option<string>& delegate,
const Option<string>& readonlyAuthenticationRealm,
const Option<string>& readwriteAuthenticationRealm);
} // namespace process {
namespace mesos {
namespace internal {
namespace tests {
class DockerContainerizerTest : public MesosTest
{
public:
static string containerName(const ContainerID& containerId)
{
return slave::DOCKER_NAME_PREFIX + containerId.value();
}
enum ContainerState
{
EXISTS,
RUNNING
};
static bool exists(
const process::Shared<Docker>& docker,
const ContainerID& containerId,
ContainerState state = ContainerState::EXISTS)
{
Duration waited = Duration::zero();
string expectedName = containerName(containerId);
do {
Future<Docker::Container> inspect = docker->inspect(expectedName);
if (!inspect.await(Seconds(3))) {
return false;
}
if (inspect.isReady()) {
switch (state) {
case ContainerState::RUNNING:
if (inspect->pid.isSome()) {
return true;
}
// Retry looking for running pid until timeout.
break;
case ContainerState::EXISTS:
return true;
}
}
os::sleep(Milliseconds(200));
waited += Milliseconds(200);
} while (waited < Seconds(5));
return false;
}
static bool containsLine(
const vector<string>& lines,
const string& expectedLine)
{
foreach (const string& line, lines) {
if (line == expectedLine) {
return true;
}
}
return false;
}
virtual void TearDown()
{
Try<Owned<Docker>> docker = Docker::create(
tests::flags.docker,
tests::flags.docker_socket,
false);
ASSERT_SOME(docker);
Future<list<Docker::Container>> containers =
docker.get()->ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
// Cleanup all mesos launched containers.
foreach (const Docker::Container& container, containers.get()) {
AWAIT_READY_FOR(docker.get()->rm(container.id, true), Seconds(30));
}
MesosTest::TearDown();
}
};
// Only enable executor launch on linux as other platforms
// requires running linux VM and need special port forwarding
// to get host networking to work.
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("/bin/test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
}
// This test verifies that a custom executor can be launched and
// registered with the slave with docker bridge network enabled.
// We're assuming that the custom executor is registering its public
// ip instead of 0.0.0.0 or equivalent to the slave as that's the
// default behavior for libprocess.
//
// Currently this test fails on ubuntu and centos since the slave is
// binding and advertising 127.0.x.x address and unreachable by executor
// in bridge network.
// TODO(tnachen): Re-enable this test when we are able to fix MESOS-3123.
TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Launch_Executor_Bridged)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("/bin/test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
Try<JSON::Array> array = JSON::parse<JSON::Array>(statusRunning->data());
ASSERT_SOME(array);
// Check if container information is exposed through master's state endpoint.
Future<http::Response> response = http::get(
master.get()->pid,
"state",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(parse);
Result<JSON::Value> find = parse->find<JSON::Value>(
"frameworks[0].tasks[0].container.docker.privileged");
EXPECT_SOME_FALSE(find);
// Check if container information is exposed through slave's state endpoint.
response = http::get(
slave.get()->pid,
"state",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
parse = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(parse);
find = parse->find<JSON::Value>(
"frameworks[0].executors[0].tasks[0].container.docker.privileged");
EXPECT_SOME_FALSE(find);
// Now verify the ContainerStatus fields in the TaskStatus.
ASSERT_TRUE(statusRunning->has_container_status());
EXPECT_TRUE(statusRunning->container_status().has_container_id());
ASSERT_EQ(1, statusRunning->container_status().network_infos().size());
EXPECT_EQ(1, statusRunning->container_status().network_infos(0).ip_addresses().size()); // NOLINT(whitespace/line_length)
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(
exists(docker, containerId.get(), ContainerState::RUNNING));
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilled));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.killTask(task.task_id());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled->state());
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
// Even though the task is killed, the executor should exit gracefully.
ASSERT_TRUE(termination.get()->has_status());
EXPECT_EQ(0, termination.get()->status());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
driver.stop();
driver.join();
}
// Ensures that the framework will receive a TASK_KILLING update
// before TASK_KILLED, if the capability is supported.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_TaskKillingCapability)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
// Start the framework with the task killing capability.
FrameworkInfo::Capability capability;
capability.set_type(FrameworkInfo::Capability::TASK_KILLING_STATE);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.add_capabilities()->CopyFrom(capability);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(
exists(docker, containerId.get(), ContainerState::RUNNING));
Future<TaskStatus> statusKilling, statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilling))
.WillOnce(FutureArg<1>(&statusKilled));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.killTask(task.task_id());
AWAIT_READY(statusKilling);
EXPECT_EQ(TASK_KILLING, statusKilling->state());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled->state());
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
driver.stop();
driver.join();
}
// This test tests DockerContainerizer::usage().
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.resources = Option<string>("cpus:2;mem:1024");
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
// Run a CPU intensive command, so we can measure utime and stime later.
command.set_value("dd if=/dev/zero of=/dev/null");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
// We ignore all update calls to prevent resizing cgroup limits.
EXPECT_CALL(dockerContainerizer, update(_, _))
.WillRepeatedly(Return(Nothing()));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Verify the usage.
ResourceStatistics statistics;
Duration waited = Duration::zero();
do {
Future<ResourceStatistics> usage =
dockerContainerizer.usage(containerId.get());
// TODO(tnachen): Replace await with AWAIT_COMPLETED once
// implemented.
ASSERT_TRUE(usage.await(Seconds(3)));
if (usage.isReady()) {
statistics = usage.get();
if (statistics.cpus_user_time_secs() > 0 &&
statistics.cpus_system_time_secs() > 0) {
break;
}
}
os::sleep(Milliseconds(200));
waited += Milliseconds(200);
} while (waited < Seconds(3));
// Usage includes the executor resources.
EXPECT_EQ(2.0 + slave::DEFAULT_EXECUTOR_CPUS, statistics.cpus_limit());
EXPECT_EQ((Gigabytes(1) + slave::DEFAULT_EXECUTOR_MEM).bytes(),
statistics.mem_limit_bytes());
EXPECT_LT(0, statistics.cpus_user_time_secs());
EXPECT_LT(0, statistics.cpus_system_time_secs());
EXPECT_GT(statistics.mem_rss_bytes(), 0u);
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
dockerContainerizer.destroy(containerId.get());
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
// Usage() should fail again since the container is destroyed.
Future<ResourceStatistics> usage =
dockerContainerizer.usage(containerId.get());
AWAIT_FAILED(usage);
driver.stop();
driver.join();
}
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(containerId);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(
exists(docker, containerId.get(), ContainerState::RUNNING));
string name = containerName(containerId.get());
Future<Docker::Container> inspect = docker->inspect(name);
AWAIT_READY(inspect);
Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
ASSERT_SOME(newResources);
Future<Nothing> update =
dockerContainerizer.update(containerId.get(), newResources.get());
AWAIT_READY(update);
Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
Result<string> memoryHierarchy = cgroups::hierarchy("memory");
ASSERT_SOME(cpuHierarchy);
ASSERT_SOME(memoryHierarchy);
Option<pid_t> pid = inspect->pid;
ASSERT_SOME(pid);
Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
ASSERT_SOME(cpuCgroup);
Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get());
ASSERT_SOME(memoryCgroup);
Try<uint64_t> cpu = cgroups::cpu::shares(
cpuHierarchy.get(),
cpuCgroup.get());
ASSERT_SOME(cpu);
Try<Bytes> mem = cgroups::memory::soft_limit_in_bytes(
memoryHierarchy.get(),
memoryCgroup.get());
ASSERT_SOME(mem);
EXPECT_EQ(1024u, cpu.get());
EXPECT_EQ(128u, mem->megabytes());
newResources = Resources::parse("cpus:1;mem:144");
// Issue second update that uses the cached pid instead of inspect.
update = dockerContainerizer.update(containerId.get(), newResources.get());
AWAIT_READY(update);
cpu = cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get());
ASSERT_SOME(cpu);
mem = cgroups::memory::soft_limit_in_bytes(
memoryHierarchy.get(),
memoryCgroup.get());
ASSERT_SOME(mem);
EXPECT_EQ(1024u, cpu.get());
EXPECT_EQ(144u, mem->megabytes());
driver.stop();
driver.join();
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
SlaveID slaveId;
slaveId.set_value("s1");
ContainerID containerId;
containerId.set_value(UUID::random().toString());
ContainerID reapedContainerId;
reapedContainerId.set_value(UUID::random().toString());
string container1 = containerName(containerId);
string container2 = containerName(reapedContainerId);
// Clean up artifacts if containers still exists.
ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30)));
ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30)));
Resources resources = Resources::parse("cpus:1;mem:512").get();
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
CommandInfo commandInfo;
commandInfo.set_value("sleep 1000");
Try<Docker::RunOptions> runOptions = Docker::RunOptions::create(
containerInfo,
commandInfo,
container1,
flags.work_dir,
flags.sandbox_directory,
resources);
ASSERT_SOME(runOptions);
docker->run(runOptions.get());
Try<Docker::RunOptions> orphanOptions = Docker::RunOptions::create(
containerInfo,
commandInfo,
container2,
flags.work_dir,
flags.sandbox_directory,
resources);
ASSERT_SOME(orphanOptions);
Future<Option<int>> orphanRun = docker->run(orphanOptions.get());
ASSERT_TRUE(
exists(docker, containerId, ContainerState::RUNNING));
ASSERT_TRUE(
exists(docker, reapedContainerId, ContainerState::RUNNING));
Future<Docker::Container> inspect = docker->inspect(container2);
AWAIT_READY(inspect);
SlaveState slaveState;
slaveState.id = slaveId;
FrameworkState frameworkState;
ExecutorID execId;
execId.set_value("e1");
ExecutorState execState;
ExecutorInfo execInfo;
execState.info = execInfo;
execState.latest = containerId;
Try<process::Subprocess> wait =
process::subprocess(tests::flags.docker + " wait " + container1);
ASSERT_SOME(wait);
FrameworkID frameworkId;
RunState runState;
runState.id = containerId;
runState.forkedPid = wait->pid();
execState.runs.put(containerId, runState);
frameworkState.executors.put(execId, execState);
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId);
ASSERT_FALSE(termination.isFailed());
// The reaped container should be cleaned up and unknown at this point.
Future<Option<ContainerTermination>> termination2 =
dockerContainerizer.wait(reapedContainerId);
AWAIT_READY(termination2);
EXPECT_NONE(termination2.get());
// Expect the orphan to be stopped!
AWAIT_EXPECT_WEXITSTATUS_EQ(128 + SIGKILL, orphanRun);
}
// This test ensures that orphaned docker containers unknown to the current
// agent instance are properly cleaned up.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_KillOrphanContainers)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
SlaveID slaveId;
slaveId.set_value("s1");
ContainerID containerId;
containerId.set_value(UUID::random().toString());
ContainerID orphanContainerId;
orphanContainerId.set_value(UUID::random().toString());
string container1 = containerName(containerId);
// Start the orphan container with the old slave id.
string container2 = containerName(orphanContainerId);
// Clean up artifacts if containers still exists.
ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30)));
ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30)));
Resources resources = Resources::parse("cpus:1;mem:512").get();
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
CommandInfo commandInfo;
commandInfo.set_value("sleep 1000");
Try<Docker::RunOptions> runOptions = Docker::RunOptions::create(
containerInfo,
commandInfo,
container1,
flags.work_dir,
flags.sandbox_directory,
resources);
ASSERT_SOME(runOptions);
docker->run(runOptions.get());
Try<Docker::RunOptions> orphanOptions = Docker::RunOptions::create(
containerInfo,
commandInfo,
container2,
flags.work_dir,
flags.sandbox_directory,
resources);
ASSERT_SOME(orphanOptions);
Future<Option<int>> orphanRun = docker->run(orphanOptions.get());
ASSERT_TRUE(
exists(docker, containerId, ContainerState::RUNNING));
ASSERT_TRUE(
exists(docker, orphanContainerId, ContainerState::RUNNING));
SlaveState slaveState;
slaveState.id = slaveId;
FrameworkState frameworkState;
ExecutorID execId;
execId.set_value("e1");
ExecutorState execState;
ExecutorInfo execInfo;
execState.info = execInfo;
execState.latest = containerId;
Try<process::Subprocess> wait =
process::subprocess(tests::flags.docker + " wait " + container1);
ASSERT_SOME(wait);
FrameworkID frameworkId;
RunState runState;
runState.id = containerId;
runState.forkedPid = wait->pid();
execState.runs.put(containerId, runState);
frameworkState.executors.put(execId, execState);
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId);
ASSERT_FALSE(termination.isFailed());
// The orphaned container should be correctly cleaned up.
Future<Option<ContainerTermination>> termination2 =
dockerContainerizer.wait(orphanContainerId);
AWAIT_READY(termination2);
EXPECT_NONE(termination2.get());
ASSERT_FALSE(exists(docker, orphanContainerId));
AWAIT_EXPECT_WEXITSTATUS_EQ(128 + SIGKILL, orphanRun);
}
// This test checks the docker containerizer doesn't recover executors
// that were started by another containerizer (e.g: mesos).
TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
ContainerID containerId;
containerId.set_value(UUID::random().toString());
ExecutorID executorId;
executorId.set_value(UUID::random().toString());
ExecutorInfo executorInfo;
executorInfo.mutable_container()->set_type(ContainerInfo::MESOS);
ExecutorState executorState;
executorState.info = executorInfo;
executorState.latest = containerId;
RunState runState;
runState.id = containerId;
executorState.runs.put(containerId, runState);
FrameworkState frameworkState;
frameworkState.executors.put(executorId, executorState);
SlaveState slaveState;
FrameworkID frameworkId;
frameworkId.set_value(UUID::random().toString());
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<hashset<ContainerID>> containers = dockerContainerizer.containers();
AWAIT_READY(containers);
// A MesosContainerizer task shouldn't be recovered by
// DockerContainerizer.
EXPECT_EQ(0u, containers->size());
}
// This test checks the docker containerizer doesn't recover containers
// with malformed uuid.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverMalformedUUID)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.docker_kill_orphans = true;
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
SlaveID slaveId;
slaveId.set_value("s1");
ContainerID containerId;
containerId.set_value("malformedUUID");
string container = containerName(containerId);
// Clean up container if it still exists.
ASSERT_TRUE(docker->rm(container, true).await(Seconds(30)));
Resources resources = Resources::parse("cpus:1;mem:512").get();
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
CommandInfo commandInfo;
commandInfo.set_value("sleep 1000");
Try<Docker::RunOptions> runOptions = Docker::RunOptions::create(
containerInfo,
commandInfo,
container,
flags.work_dir,
flags.sandbox_directory,
resources);
Future<Option<int>> run = docker->run(runOptions.get());
ASSERT_TRUE(
exists(docker, containerId, ContainerState::RUNNING));
SlaveState slaveState;
slaveState.id = slaveId;
FrameworkState frameworkState;
ExecutorID execId;
execId.set_value("e1");
ExecutorState execState;
ExecutorInfo execInfo;
execState.info = execInfo;
FrameworkID frameworkId;
frameworkState.executors.put(execId, execState);
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
// The container should still exist and should not get killed
// by containerizer recovery.
ASSERT_TRUE(exists(docker, containerId));
}
#ifdef __linux__
// This test verifies that we can launch a docker container with
// persistent volume.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpu:2;mem:2048;disk(role1):2048";
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role1");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
frameworkInfo.principal());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:64").get() + volume);
CommandInfo command;
command.set_value("echo abc > " +
path::join(flags.sandbox_directory, "path1", "file"));
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})},
filters);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
const string& volumePath = getPersistentVolumePath(
flags.work_dir,
volume);
EXPECT_SOME_EQ("abc\n", os::read(path::join(volumePath, "file")));
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
EXPECT_SOME(table);
// Verify that the persistent volume is unmounted.
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
EXPECT_FALSE(strings::contains(
entry.target,
path::join(containerConfig->directory(), "path1")));
}
}
// This test checks the docker containerizer is able to recover containers
// with persistent volumes and destroy it properly.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpu:2;mem:2048;disk(role1):2048";
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role1");
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Filters filters;
filters.set_refuse_seconds(0);
// NOTE: We set filter explicitly here so that the resources will
// not be filtered for 5 seconds (the default).
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
frameworkInfo.principal());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:64").get() + volume);
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})},
filters);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Recreate containerizer and start slave again.
slave.get()->terminate();
slave->reset();
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Wait until containerizer recover is complete.
AWAIT_READY(_recover);
Future<Option<ContainerTermination>> termination =
dockerContainerizer->wait(containerId.get());
dockerContainerizer->destroy(containerId.get());
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
EXPECT_SOME(table);
// Verify that the recovered container's persistent volume is
// unmounted.
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
EXPECT_FALSE(strings::contains(
entry.target,
path::join(containerConfig->directory(), "path1")));
}
driver.stop();
driver.join();
}
// This test checks the docker containerizer is able to clean up
// orphaned containers with persistent volumes.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpu:2;mem:2048;disk(role1):2048";
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role1");
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Filters filters;
filters.set_refuse_seconds(0);
// NOTE: We set filter explicitly here so that the resources will
// not be filtered for 5 seconds (the default).
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
frameworkInfo.principal());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:64").get() + volume);
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})},
filters);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Recreate containerizer and start slave again.
slave.get()->terminate();
slave->reset();
// Wipe the framework directory so that the slave will treat the
// above running task as an orphan. We don't want to wipe the whole
// meta directory since Docker Containerizer will skip recover if
// state is not found.
ASSERT_SOME(
os::rmdir(getFrameworkPath(
getMetaRootDir(flags.work_dir),
offer.slave_id(),
frameworkId.get())));
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Wait until containerizer recover is complete.
AWAIT_READY(_recover);
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
EXPECT_SOME(table);
// Verify that the orphaned container's persistent volume is
// unmounted.
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
EXPECT_FALSE(strings::contains(
entry.target,
path::join(containerConfig->directory(), "path1")));
}
driver.stop();
driver.join();
slave->reset();
EXPECT_FALSE(exists(docker, containerId.get()));
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
// NOTE: We prefix `echo` with `unbuffer` so that we can immediately
// flush the output of `echo`. This mitigates a race in Docker where
// it mangles reads from stdout/stderr and commits suicide.
// See MESOS-4676 for more information.
CommandInfo command;
command.set_value(
"unbuffer echo out" + uuid + " ; "
"unbuffer echo err" + uuid + " 1>&2");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
// NOTE: This is an image that is exactly
// `docker run -t -i alpine /bin/sh -c "apk add --update expect"`.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/alpine-expect");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
// Now check that the proper output is in stderr and stdout (which
// might also contain other things, hence the use of a UUID).
Try<string> read =
os::read(path::join(containerConfig->directory(), "stderr"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
EXPECT_TRUE(containsLine(lines, "err" + uuid));
EXPECT_FALSE(containsLine(lines, "out" + uuid));
read = os::read(path::join(containerConfig->directory(), "stdout"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_TRUE(containsLine(lines, "out" + uuid));
EXPECT_FALSE(containsLine(lines, "err" + uuid));
driver.stop();
driver.join();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_shell(false);
// NOTE: By not setting CommandInfo::value we're testing that we
// will still be able to run the container because it has a default
// entrypoint!
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
Try<string> read =
os::read(path::join(containerConfig->directory(), "stdout"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
// Since we're not passing any command value, we're expecting the
// default entry point to be run which is 'echo' with the default
// command from the image which is 'inky'.
EXPECT_TRUE(containsLine(lines, "inky"));
read = os::read(path::join(containerConfig->directory(), "stderr"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_FALSE(containsLine(lines, "inky"));
driver.stop();
driver.join();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_shell(false);
// We can set the value to just the 'uuid' since it should get
// passed as an argument to the entrypoint, i.e., 'echo uuid'.
command.set_value(uuid);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
// Now check that the proper output is in stderr and stdout.
Try<string> read =
os::read(path::join(containerConfig->directory(), "stdout"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
// We expect the passed in command value to override the image's
// default command, thus we should see the value of 'uuid' in the
// output instead of the default command which is 'inky'.
EXPECT_TRUE(containsLine(lines, uuid));
EXPECT_FALSE(containsLine(lines, "inky"));
read = os::read(path::join(containerConfig->directory(), "stderr"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_FALSE(containsLine(lines, "inky"));
EXPECT_FALSE(containsLine(lines, uuid));
driver.stop();
driver.join();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_shell(false);
// We should also be able to skip setting the command value and just
// set the arguments and those should also get passed through to the
// entrypoint!
command.add_arguments(uuid);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
// Now check that the proper output is in stderr and stdout.
Try<string> read =
os::read(path::join(containerConfig->directory(), "stdout"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
// We expect the passed in command arguments to override the image's
// default command, thus we should see the value of 'uuid' in the
// output instead of the default command which is 'inky'.
EXPECT_TRUE(containsLine(lines, uuid));
EXPECT_FALSE(containsLine(lines, "inky"));
read = os::read(path::join(containerConfig->directory(), "stderr"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_FALSE(containsLine(lines, "inky"));
EXPECT_FALSE(containsLine(lines, uuid));
driver.stop();
driver.join();
}
// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up we make sure the executor
// re-registers and the slave properly sends the update.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
// This is owned by the containerizer, so we'll need one per containerizer.
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Enable checkpointing for the framework.
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
// Drop the first update from the executor.
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(containerId);
// Stop the slave before the status update is received.
AWAIT_READY(statusUpdateMessage);
slave.get()->terminate();
Future<Message> reregisterExecutorMessage =
FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
// This is owned by the containerizer, so we'll need one per containerizer.
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Ensure the executor re-registers.
AWAIT_READY(reregisterExecutorMessage);
ReregisterExecutorMessage reregister;
reregister.ParseFromString(reregisterExecutorMessage->body);
// Executor should inform about the unacknowledged update.
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
ASSERT_EQ(TASK_RUNNING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer->wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
}
// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up we make sure the executor
// re-registers and the slave properly sends the update.
//
// TODO(benh): This test is currently disabled because the executor
// inside the image mesosphere/test-executor does not properly set the
// executor PID that is uses during registration, so when the new
// slave recovers it can't reconnect and instead destroys that
// container. In particular, it uses '0' for its IP which we properly
// parse and can even properly use for sending other messages, but the
// current implementation of 'UPID::operator bool()' fails if the IP
// component of a PID is '0'.
TEST_F(DockerContainerizerTest,
DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
// This is owned by the containerizer, so we'll need one per containerizer.
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Enable checkpointing for the framework.
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<ContainerID> containerId;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
// We need to wait until the container's pid has been been
// checkpointed so that when the next slave recovers it won't treat
// the executor as having gone lost! We know this has completed
// after Containerizer::launch returns and the
// Slave::executorLaunched gets dispatched.
Future<Nothing> executorLaunched =
FUTURE_DISPATCH(_, &Slave::executorLaunched);
// The test-executor in the image immediately sends a TASK_RUNNING
// followed by TASK_FINISHED (no sleep/delay in between) so we need
// to drop the first TWO updates that come from the executor rather
// than only the first update like above where we can control how
// the length of the task.
Future<StatusUpdateMessage> statusUpdateMessage1 =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
// Drop the first update from the executor.
Future<StatusUpdateMessage> statusUpdateMessage2 =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(containerId);
AWAIT_READY(executorLaunched);
AWAIT_READY(statusUpdateMessage1);
AWAIT_READY(statusUpdateMessage2);
slave.get()->terminate();
Future<Message> reregisterExecutorMessage =
FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
// This is owned by the containerizer, so we'll need one per containerizer.
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Ensure the executor re-registers.
AWAIT_READY(reregisterExecutorMessage);
ReregisterExecutorMessage reregister;
reregister.ParseFromString(reregisterExecutorMessage->body);
// Executor should inform about the unacknowledged update.
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
ASSERT_EQ(TASK_RUNNING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
ASSERT_TRUE(exists(docker, containerId.get()));
driver.stop();
driver.join();
}
// This test verifies that port mapping with bridge network is
// exposing the host port to the container port, by sending data
// to the host port and receiving it in the container by listening
// to the mapped container port.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpus:1;mem:1024;ports:[10000-10000]";
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_shell(false);
command.set_value("nc");
command.add_arguments("-l");
command.add_arguments("-p");
command.add_arguments("1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
ContainerInfo::DockerInfo::PortMapping portMapping;
portMapping.set_host_port(10000);
portMapping.set_container_port(1000);
dockerInfo.add_port_mappings()->CopyFrom(portMapping);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<ContainerConfig> containerConfig;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<1>(&containerConfig),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(containerConfig);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(
exists(docker,
containerId.get(),
ContainerState::RUNNING));
string uuid = UUID::random().toString();
// Write uuid to docker mapped host port.
Try<process::Subprocess> s = process::subprocess(
"echo " + uuid + " | nc localhost 10000");
ASSERT_SOME(s);
AWAIT_READY_FOR(s->status(), Seconds(60));
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished->state());
// Now check that the proper output is in stdout.
Try<string> read =
os::read(path::join(containerConfig->directory(), "stdout"));
ASSERT_SOME(read);
const vector<string> lines = strings::split(read.get(), "\n");
// We expect the uuid that is sent to host port to be written
// to stdout by the docker container running nc -l.
EXPECT_TRUE(containsLine(lines, uuid));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
}
// This test verifies that sandbox with ':' in the path can still
// run successfully. This a limitation of the Docker CLI where
// the volume map parameter treats colons (:) as separators,
// and incorrectly separates the sandbox directory.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("test:colon");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Promise<Nothing> promise;
Future<Nothing> fetch;
// We want to pause the fetch call to simulate a long fetch time.
EXPECT_CALL(*process, fetch(_))
.WillOnce(DoAll(FutureSatisfy(&fetch),
Return(promise.future())));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(fetch);
dockerContainerizer.destroy(containerId.get());
// Resume docker launch.
promise.set(Nothing());
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed->state());
driver.stop();
driver.join();
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Future<Nothing> fetch;
EXPECT_CALL(*process, fetch(_))
.WillOnce(DoAll(FutureSatisfy(&fetch),
Return(Nothing())));
Promise<Nothing> promise;
// We want to pause the fetch call to simulate a long fetch time.
EXPECT_CALL(*process, pull(_))
.WillOnce(Return(promise.future()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
// Wait until fetch is finished.
AWAIT_READY(fetch);
dockerContainerizer.destroy(containerId.get());
// Resume docker launch.
promise.set(Nothing());
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed->state());
driver.stop();
driver.join();
}
// Ensures the containerizer responds correctly (false Future) to
// a request to destroy an unknown container.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyUnknownContainer)
{
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<DockerContainerizer*> create =
DockerContainerizer::create(flags, &fetcher);
ASSERT_SOME(create);
DockerContainerizer* containerizer = create.get();
ContainerID containerId;
containerId.set_value(UUID::random().toString());
AWAIT_EXPECT_FALSE(containerizer->destroy(containerId));
}
// This test checks that when a docker containerizer update failed
// and the container failed before the executor started, the executor
// is properly killed and cleaned up.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.add_capabilities()->set_type(
FrameworkInfo::Capability::PARTITION_AWARE);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("ls");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusGone;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusGone));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
// Fail the update so we don't proceed to send run task to the executor.
EXPECT_CALL(dockerContainerizer, update(_, _))
.WillRepeatedly(Return(Failure("Fail resource update")));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusGone);
EXPECT_EQ(TASK_GONE, statusGone->state());
EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED,
statusGone->reason());
driver.stop();
driver.join();
}
// When the fetch fails we should send the scheduler a status
// update with message the shows the actual error.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("ls");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
EXPECT_CALL(*process, fetch(_))
.WillOnce(Return(Failure("some error from fetch")));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed->state());
EXPECT_EQ("Failed to launch container: some error from fetch",
statusFailed->message());
// TODO(jaybuff): When MESOS-2035 is addressed we should validate
// that statusFailed->reason() is correctly set here.
driver.stop();
driver.join();
}
// When the docker pull fails we should send the scheduler a status
// update with message the shows the actual error.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerPullFailure)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("ls");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
EXPECT_CALL(*mockDocker, pull(_, _, _))
.WillOnce(Return(Failure("some error from docker pull")));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed->state());
EXPECT_EQ("Failed to launch container: some error from docker pull",
statusFailed->message());
// TODO(jaybuff): When MESOS-2035 is addressed we should validate
// that statusFailed->reason() is correctly set here.
driver.stop();
driver.join();
}
// When the docker executor container fails to launch, docker inspect
// future that is in a retry loop should be discarded.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerInspectDiscard)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Future<Docker::Container> inspect;
EXPECT_CALL(*mockDocker, inspect(_, _))
.WillOnce(FutureResult(&inspect,
Invoke((MockDocker*) docker.get(),
&MockDocker::_inspect)));
EXPECT_CALL(*mockDocker, run(_, _, _))
.WillOnce(Return(Failure("Run failed")));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("/bin/test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<Nothing> executorLost;
EXPECT_CALL(sched, executorLost(&driver, executorId, _, _))
.WillOnce(FutureSatisfy(&executorLost));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed->state());
AWAIT_READY(executorLost);
AWAIT_DISCARDED(inspect);
driver.stop();
driver.join();
}
// Ensures the containerizer responds correctly (returns None)
// to a request to wait on an unknown container.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_WaitUnknownContainer)
{
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<DockerContainerizer*> create =
DockerContainerizer::create(flags, &fetcher);
ASSERT_SOME(create);
DockerContainerizer* containerizer = create.get();
ContainerID containerId;
containerId.set_value(UUID::random().toString());
Future<Option<ContainerTermination>> wait = containerizer->wait(containerId);
AWAIT_READY(wait);
EXPECT_NONE(wait.get());
}
// This test ensures that a task will transition straight from `TASK_KILLING` to
// `TASK_KILLED`, even if the health check begins to fail during the kill policy
// grace period.
//
// TODO(gkleiman): this test takes about 7 seconds to run, consider using mock
// tasks and health checkers to speed it up.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_NoTransitionFromKillingToRunning)
{
Shared<Docker> docker(new MockDocker(
tests::flags.docker, tests::flags.docker_socket));
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags agentFlags = CreateSlaveFlags();
Fetcher fetcher(agentFlags);
Try<ContainerLogger*> logger =
ContainerLogger::create(agentFlags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer containerizer(
agentFlags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> agent =
StartSlave(detector.get(), &containerizer, agentFlags);
ASSERT_SOME(agent);
// Start the framework with the task killing capability.
FrameworkInfo::Capability capability;
capability.set_type(FrameworkInfo::Capability::TASK_KILLING_STATE);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.add_capabilities()->CopyFrom(capability);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_EQ(1u, offers->size());
const uint16_t testPort = getFreePort().get();
// Launch a HTTP server until SIGTERM is received, then sleep for
// 15 seconds to let the health check fail.
const string command = strings::format(
"trap \"sleep 15\" SIGTERM && nc -lk -p %u -e echo",
testPort).get();
TaskInfo task = createTask(offers->front(), command);
// The docker container runs in host network mode.
//
// TODO(tnachen): Use local image to test if possible.
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
containerInfo.mutable_docker()->set_image("alpine");
containerInfo.mutable_docker()->set_network(
ContainerInfo::DockerInfo::HOST);
task.mutable_container()->CopyFrom(containerInfo);
// Set `grace_period_seconds` here because it takes some time to launch
// Netcat to serve requests.
HealthCheck healthCheck;
healthCheck.set_type(HealthCheck::TCP);
healthCheck.mutable_tcp()->set_port(testPort);
healthCheck.set_delay_seconds(0);
healthCheck.set_grace_period_seconds(15);
healthCheck.set_interval_seconds(0);
task.mutable_health_check()->CopyFrom(healthCheck);
// Set the kill policy grace period to 5 seconds.
KillPolicy killPolicy;
killPolicy.mutable_grace_period()->set_nanoseconds(Seconds(5).ns());
task.mutable_kill_policy()->CopyFrom(killPolicy);
Future<ContainerID> containerId;
EXPECT_CALL(containerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusHealthy;
Future<TaskStatus> statusKilling;
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusHealthy))
.WillOnce(FutureArg<1>(&statusKilling))
.WillOnce(FutureArg<1>(&statusKilled));
driver.launchTasks(offers->front().id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY(statusHealthy);
EXPECT_EQ(TASK_RUNNING, statusHealthy->state());
EXPECT_TRUE(statusHealthy->has_healthy());
EXPECT_TRUE(statusHealthy->healthy());
driver.killTask(task.task_id());
AWAIT_READY(statusKilling);
EXPECT_EQ(TASK_KILLING, statusKilling->state());
EXPECT_FALSE(statusKilling->has_healthy());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled->state());
EXPECT_FALSE(statusKilled->has_healthy());
Future<Option<ContainerTermination>> termination =
containerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
}
// This test ensures that when `cgroups_enable_cfs` is set on agent,
// the docker container launched through docker containerizer has
// `cpuQuotas` limit.
// Cgroups cpu quota is only available on Linux.
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_CGROUPS_CFS_CgroupsEnableCFS)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.cgroups_enable_cfs = true;
flags.resources = "cpus:1;mem:128";
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
// Find cgroups cpu hierarchy of the container and verifies
// quota is set.
string name = containerName(containerId.get());
Future<Docker::Container> inspect = docker->inspect(name);
AWAIT_READY(inspect);
Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
ASSERT_SOME(cpuHierarchy);
Option<pid_t> pid = inspect->pid;
ASSERT_SOME(pid);
Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
ASSERT_SOME(cpuCgroup);
Try<Duration> cfsQuota =
cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), cpuCgroup.get());
ASSERT_SOME(cfsQuota);
const Duration expectedCpuQuota = mesos::internal::slave::CPU_CFS_PERIOD * 1;
EXPECT_EQ(expectedCpuQuota, cfsQuota.get());
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
}
#endif // __linux__
// Run a task as non root while inheriting this ownership from the
// framework supplied default user. Tests if the sandbox "stdout"
// is correctly owned and writeable by the tasks user.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Non_Root_Sandbox)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Shared<Docker> docker(
new MockDocker(tests::flags.docker, tests::flags.docker_socket));
slave::Flags flags = CreateSlaveFlags();
flags.switch_user = true;
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage.get().slave_id();
FrameworkInfo framework;
framework.set_name("default");
framework.set_user("nobody");
framework.set_principal(DEFAULT_CREDENTIAL.principal());
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
// Start the task as a user without supplying an explicit command
// user. This should inherit the framework user for the task
// ownership.
CommandInfo* command = task.mutable_command();
command->set_value("echo \"foo\" && sleep 1000");
ContainerInfo* containerInfo = task.mutable_container();
containerInfo->set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo* dockerInfo = containerInfo->mutable_docker();
dockerInfo->set_image("alpine");
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
// Check that the sandbox was written to.
const string sandboxDirectory = slave::paths::getExecutorRunPath(
flags.work_dir,
slaveId,
frameworkId.get(),
statusRunning->executor_id(),
containerId.get());
ASSERT_TRUE(os::exists(sandboxDirectory));
// Check the sandbox "stdout" was written to.
const string stdoutPath = path::join(sandboxDirectory, "stdout");
ASSERT_TRUE(os::exists(stdoutPath));
// Check the sandbox "stdout" is owned by the framework default user.
struct stat stdoutStat;
ASSERT_GE(::stat(stdoutPath.c_str(), &stdoutStat), 0);
Result<uid_t> uid = os::getuid(framework.user());
ASSERT_SOME(uid);
ASSERT_EQ(stdoutStat.st_uid, uid.get());
// Validate that our task was able to log into the sandbox.
Result<string> stdout = os::read(stdoutPath);
ASSERT_SOME(stdout);
EXPECT_TRUE(strings::contains(stdout.get(), "foo"));
}
// This test verifies the DNS configuration of the Docker container
// can be successfully set with the agent flag `--default_container_dns`.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DefaultDNS)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Try<ContainerDNSInfo> parse = flags::parse<ContainerDNSInfo>(
R"~(
{
"docker": [
{
"network_mode": "BRIDGE",
"dns": {
"nameservers": [ "8.8.8.8", "8.8.4.4" ],
"search": [ "example1.com", "example2.com" ],
"options": [ "timeout:3", "attempts:2" ]
}
}
]
})~");
ASSERT_SOME(parse);
flags.default_container_dns = parse.get();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
// Find the DNS configuration of the container and verify
// if it is consistent with `flags.default_container_dns`.
string name = containerName(containerId.get());
Future<Docker::Container> inspect = docker->inspect(name);
AWAIT_READY(inspect);
vector<string> defaultDNS;
std::copy(
flags.default_container_dns->docker(0).dns().nameservers().begin(),
flags.default_container_dns->docker(0).dns().nameservers().end(),
std::back_inserter(defaultDNS));
EXPECT_EQ(inspect->dns, defaultDNS);
vector<string> defaultDNSSearch;
std::copy(
flags.default_container_dns->docker(0).dns().search().begin(),
flags.default_container_dns->docker(0).dns().search().end(),
std::back_inserter(defaultDNSSearch));
EXPECT_EQ(inspect->dnsSearch, defaultDNSSearch);
vector<string> defaultDNSOption;
std::copy(
flags.default_container_dns->docker(0).dns().options().begin(),
flags.default_container_dns->docker(0).dns().options().end(),
std::back_inserter(defaultDNSOption));
EXPECT_EQ(inspect->dnsOptions, defaultDNSOption);
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
}
// Fixture for testing IPv6 support for docker containers on host network.
//
// TODO(asridharan): Currently in the `Setup` and `TearDown` methods
// of this class we re-initialize libprocess to take an IPv6 address.
// Ideally, we should be moving this into a more general test fixture
// in mesos.hpp to be used by any other tests for IPv6. This might
// need changes to `MesosTest` in to order to allow for multiple
// inheritance.
class DockerContainerizerIPv6Test : public DockerContainerizerTest
{
protected:
virtual void SetUp()
{
os::setenv("LIBPROCESS_IP6", "::1234");
process::reinitialize(
None(),
READWRITE_HTTP_AUTHENTICATION_REALM,
READONLY_HTTP_AUTHENTICATION_REALM);
DockerContainerizerTest::SetUp();
}
virtual void TearDown()
{
DockerContainerizerTest::TearDown();
os::unsetenv("LIBPROCESS_IP6");
process::reinitialize(
None(),
READWRITE_HTTP_AUTHENTICATION_REALM,
READONLY_HTTP_AUTHENTICATION_REALM);
}
};
// Launches a docker container on the host network. The host network
// is assumed to have an IPv4 address and an IPv6 address. The test
// passes if the Mesos state correctly exposes both the IPv4 and IPv6
// address.
TEST_F(DockerContainerizerIPv6Test, ROOT_DOCKER_LaunchIPv6HostNetwork)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
// Check if the slave has the IPv6 address stored in its PID.
EXPECT_SOME(slave.get()->pid.addresses.v6);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task = createTask(
offer.slave_id(),
offer.resources(),
SLEEP_COMMAND(10000));
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(statusRunning->has_data());
Try<JSON::Array> array = JSON::parse<JSON::Array>(statusRunning->data());
ASSERT_SOME(array);
// Check if container information is exposed through master's state endpoint.
Future<http::Response> response = http::get(
master.get()->pid,
"state",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(parse);
Result<JSON::Value> find = parse->find<JSON::Value>(
"frameworks[0].tasks[0].container.docker.privileged");
EXPECT_SOME_FALSE(find);
// Check if container information is exposed through slave's state endpoint.
response = http::get(
slave.get()->pid,
"state",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
parse = JSON::parse<JSON::Object>(response->body);
ASSERT_SOME(parse);
find = parse->find<JSON::Value>(
"frameworks[0].executors[0].tasks[0].container.docker.privileged");
EXPECT_SOME_FALSE(find);
// Now verify the ContainerStatus fields in the TaskStatus.
ASSERT_TRUE(statusRunning->has_container_status());
EXPECT_TRUE(statusRunning->container_status().has_container_id());
ASSERT_EQ(1, statusRunning->container_status().network_infos().size());
EXPECT_EQ(2, statusRunning->container_status().network_infos(0).ip_addresses().size()); // NOLINT(whitespace/line_length)
Option<string> containerIPv4 = None();
Option<string> containerIPv6 = None();
foreach(const NetworkInfo::IPAddress& ipAddress,
statusRunning->container_status().network_infos(0).ip_addresses()) {
if (ipAddress.protocol() == NetworkInfo::IPv4) {
containerIPv4 = ipAddress.ip_address();
}
if (ipAddress.protocol() == NetworkInfo::IPv6) {
containerIPv6 = ipAddress.ip_address();
}
}
EXPECT_SOME(containerIPv4);
ASSERT_SOME(containerIPv6);
EXPECT_EQ(containerIPv6.get(), "::1234");
ASSERT_TRUE(exists(docker, containerId.get()));
Future<Option<ContainerTermination>> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
EXPECT_SOME(termination.get());
ASSERT_FALSE(
exists(docker, containerId.get(), ContainerState::RUNNING));
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {