blob: 4d049ed160041748f6150d78bdf65a71baa891bd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <mesos/slave/container_logger.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/owned.hpp>
#include <process/subprocess.hpp>
#include <stout/duration.hpp>
#ifdef __linux__
#include "linux/cgroups.hpp"
#include "linux/fs.hpp"
#endif // __linux__
#include "messages/messages.hpp"
#include "slave/containerizer/docker.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
using namespace mesos::internal::slave::paths;
using namespace mesos::internal::slave::state;
using namespace process;
using mesos::internal::master::Master;
using mesos::internal::slave::DockerContainerizer;
using mesos::internal::slave::DockerContainerizerProcess;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::slave::ContainerLogger;
using std::list;
using std::string;
using std::vector;
using testing::_;
using testing::DoAll;
using testing::DoDefault;
using testing::Eq;
using testing::Invoke;
using testing::Return;
namespace mesos {
namespace internal {
namespace tests {
class DockerContainerizerTest : public MesosTest
{
public:
static string containerName(
const SlaveID& slaveId,
const ContainerID& containerId)
{
return slave::DOCKER_NAME_PREFIX + slaveId.value() +
slave::DOCKER_NAME_SEPERATOR + containerId.value();
}
enum ContainerState
{
EXISTS,
RUNNING
};
static bool exists(
const process::Shared<Docker>& docker,
const SlaveID& slaveId,
const ContainerID& containerId,
ContainerState state = ContainerState::EXISTS)
{
Duration waited = Duration::zero();
string expectedName = containerName(slaveId, containerId);
do {
Future<Docker::Container> inspect = docker->inspect(expectedName);
if (!inspect.await(Seconds(3))) {
return false;
}
if (inspect.isReady()) {
switch (state) {
case ContainerState::RUNNING:
if (inspect.get().pid.isSome()) {
return true;
}
// Retry looking for running pid until timeout.
break;
case ContainerState::EXISTS:
return true;
}
}
os::sleep(Milliseconds(200));
waited += Milliseconds(200);
} while (waited < Seconds(5));
return false;
}
static bool containsLine(
const vector<string>& lines,
const string& expectedLine)
{
foreach (const string& line, lines) {
if (line == expectedLine) {
return true;
}
}
return false;
}
virtual void TearDown()
{
Try<Owned<Docker>> docker = Docker::create(
tests::flags.docker,
tests::flags.docker_socket,
false);
ASSERT_SOME(docker);
Future<list<Docker::Container>> containers =
docker.get()->ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
// Cleanup all mesos launched containers.
foreach (const Docker::Container& container, containers.get()) {
AWAIT_READY_FOR(docker.get()->rm(container.id, true), Seconds(30));
}
}
};
// Only enable executor launch on linux as other platforms
// requires running linux VM and need special port forwarding
// to get host networking to work.
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("/bin/test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launchExecutor)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
ASSERT_FALSE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
}
// This test verifies that a custom executor can be launched and
// registered with the slave with docker bridge network enabled.
// We're assuming that the custom executor is registering its public
// ip instead of 0.0.0.0 or equivalent to the slave as that's the
// default behavior for libprocess.
//
// Currently this test fails on ubuntu and centos since the slave is
// binding and advertising 127.0.x.x address and unreachable by executor
// in bridge network.
// TODO(tnachen): Re-enable this test when we are able to fix MESOS-3123.
TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Launch_Executor_Bridged)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("/bin/test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launchExecutor)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
ASSERT_FALSE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
ASSERT_TRUE(statusRunning.get().has_data());
Try<JSON::Array> array = JSON::parse<JSON::Array>(statusRunning.get().data());
ASSERT_SOME(array);
// Check if container information is exposed through master's state endpoint.
Future<http::Response> response = http::get(
master.get()->pid,
"state",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
ASSERT_SOME(parse);
Result<JSON::Value> find = parse.get().find<JSON::Value>(
"frameworks[0].tasks[0].container.docker.privileged");
EXPECT_SOME_FALSE(find);
// Check if container information is exposed through slave's state endpoint.
response = http::get(
slave.get()->pid,
"state",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
parse = JSON::parse<JSON::Object>(response.get().body);
ASSERT_SOME(parse);
find = parse.get().find<JSON::Value>(
"frameworks[0].executors[0].tasks[0].container.docker.privileged");
EXPECT_SOME_FALSE(find);
// Now verify that the TaskStatus contains the container IP address.
ASSERT_TRUE(statusRunning.get().has_container_status());
EXPECT_EQ(1, statusRunning.get().container_status().network_infos().size());
EXPECT_EQ(1, statusRunning.get().container_status().network_infos(0).ip_addresses().size()); // NOLINT(whitespace/line_length)
ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
ASSERT_FALSE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
ASSERT_TRUE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilled));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.killTask(task.task_id());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled.get().state());
AWAIT_READY(termination);
ASSERT_FALSE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
driver.stop();
driver.join();
}
// Ensures that the framework will receive a TASK_KILLING update
// before TASK_KILLED, if the capability is supported.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_TaskKillingCapability)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
// Start the framework with the task killing capability.
FrameworkInfo::Capability capability;
capability.set_type(FrameworkInfo::Capability::TASK_KILLING_STATE);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.add_capabilities()->CopyFrom(capability);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
ASSERT_TRUE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
Future<TaskStatus> statusKilling, statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilling))
.WillOnce(FutureArg<1>(&statusKilled));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.killTask(task.task_id());
AWAIT_READY(statusKilling);
EXPECT_EQ(TASK_KILLING, statusKilling->state());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled->state());
AWAIT_READY(termination);
ASSERT_FALSE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
driver.stop();
driver.join();
}
// This test tests DockerContainerizer::usage().
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.resources = Option<string>("cpus:2;mem:1024");
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
// Run a CPU intensive command, so we can measure utime and stime later.
command.set_value("dd if=/dev/zero of=/dev/null");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
// We ignore all update calls to prevent resizing cgroup limits.
EXPECT_CALL(dockerContainerizer, update(_, _))
.WillRepeatedly(Return(Nothing()));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
// Verify the usage.
ResourceStatistics statistics;
Duration waited = Duration::zero();
do {
Future<ResourceStatistics> usage =
dockerContainerizer.usage(containerId.get());
// TODO(tnachen): Replace await with AWAIT_COMPLETED once
// implemented.
ASSERT_TRUE(usage.await(Seconds(3)));
if (usage.isReady()) {
statistics = usage.get();
if (statistics.cpus_user_time_secs() > 0 &&
statistics.cpus_system_time_secs() > 0) {
break;
}
}
os::sleep(Milliseconds(200));
waited += Milliseconds(200);
} while (waited < Seconds(3));
// Usage includes the executor resources.
EXPECT_EQ(2.0 + slave::DEFAULT_EXECUTOR_CPUS, statistics.cpus_limit());
EXPECT_EQ((Gigabytes(1) + slave::DEFAULT_EXECUTOR_MEM).bytes(),
statistics.mem_limit_bytes());
EXPECT_LT(0, statistics.cpus_user_time_secs());
EXPECT_LT(0, statistics.cpus_system_time_secs());
EXPECT_GT(statistics.mem_rss_bytes(), 0u);
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
dockerContainerizer.destroy(containerId.get());
AWAIT_READY(termination);
// Usage() should fail again since the container is destroyed.
Future<ResourceStatistics> usage =
dockerContainerizer.usage(containerId.get());
AWAIT_FAILED(usage);
driver.stop();
driver.join();
}
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(containerId);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
ASSERT_TRUE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
string name = containerName(slaveId, containerId.get());
Future<Docker::Container> inspect = docker->inspect(name);
AWAIT_READY(inspect);
Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
ASSERT_SOME(newResources);
Future<Nothing> update =
dockerContainerizer.update(containerId.get(), newResources.get());
AWAIT_READY(update);
Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
Result<string> memoryHierarchy = cgroups::hierarchy("memory");
ASSERT_SOME(cpuHierarchy);
ASSERT_SOME(memoryHierarchy);
Option<pid_t> pid = inspect.get().pid;
ASSERT_SOME(pid);
Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
ASSERT_SOME(cpuCgroup);
Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get());
ASSERT_SOME(memoryCgroup);
Try<uint64_t> cpu = cgroups::cpu::shares(
cpuHierarchy.get(),
cpuCgroup.get());
ASSERT_SOME(cpu);
Try<Bytes> mem = cgroups::memory::soft_limit_in_bytes(
memoryHierarchy.get(),
memoryCgroup.get());
ASSERT_SOME(mem);
EXPECT_EQ(1024u, cpu.get());
EXPECT_EQ(128u, mem.get().megabytes());
newResources = Resources::parse("cpus:1;mem:144");
// Issue second update that uses the cached pid instead of inspect.
update = dockerContainerizer.update(containerId.get(), newResources.get());
AWAIT_READY(update);
cpu = cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get());
ASSERT_SOME(cpu);
mem = cgroups::memory::soft_limit_in_bytes(
memoryHierarchy.get(),
memoryCgroup.get());
ASSERT_SOME(mem);
EXPECT_EQ(1024u, cpu.get());
EXPECT_EQ(144u, mem.get().megabytes());
driver.stop();
driver.join();
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
SlaveID slaveId;
slaveId.set_value("s1");
ContainerID containerId;
containerId.set_value(UUID::random().toString());
ContainerID reapedContainerId;
reapedContainerId.set_value(UUID::random().toString());
string container1 = containerName(slaveId, containerId);
string container2 = containerName(slaveId, reapedContainerId);
// Clean up artifacts if containers still exists.
ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30)));
ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30)));
Resources resources = Resources::parse("cpus:1;mem:512").get();
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
CommandInfo commandInfo;
commandInfo.set_value("sleep 1000");
docker->run(
containerInfo,
commandInfo,
container1,
flags.work_dir,
flags.sandbox_directory,
resources);
Future<Option<int>> orphanRun =
docker->run(
containerInfo,
commandInfo,
container2,
flags.work_dir,
flags.sandbox_directory,
resources);
ASSERT_TRUE(
exists(docker, slaveId, containerId, ContainerState::RUNNING));
ASSERT_TRUE(
exists(docker, slaveId, reapedContainerId, ContainerState::RUNNING));
Future<Docker::Container> inspect = docker->inspect(container2);
AWAIT_READY(inspect);
SlaveState slaveState;
slaveState.id = slaveId;
FrameworkState frameworkState;
ExecutorID execId;
execId.set_value("e1");
ExecutorState execState;
ExecutorInfo execInfo;
execState.info = execInfo;
execState.latest = containerId;
Try<process::Subprocess> wait =
process::subprocess(tests::flags.docker + " wait " + container1);
ASSERT_SOME(wait);
FrameworkID frameworkId;
RunState runState;
runState.id = containerId;
runState.forkedPid = wait.get().pid();
execState.runs.put(containerId, runState);
frameworkState.executors.put(execId, execState);
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId);
ASSERT_FALSE(termination.isFailed());
AWAIT_FAILED(dockerContainerizer.wait(reapedContainerId));
// Expect the orphan to be stopped!
AWAIT_READY(orphanRun);
ASSERT_SOME(orphanRun.get());
EXPECT_TRUE(WIFEXITED(orphanRun->get())) << orphanRun->get();
EXPECT_EQ(128 + SIGKILL, WEXITSTATUS(orphanRun->get())) << orphanRun->get();
}
// This test ensures that orphaned docker containers unknown to the current
// agent instance are properly cleaned up.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_KillOrphanContainers)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
SlaveID slaveId;
slaveId.set_value("s1");
SlaveID oldSlaveId;
oldSlaveId.set_value("old-agent-id");
ContainerID containerId;
containerId.set_value(UUID::random().toString());
ContainerID orphanContainerId;
orphanContainerId.set_value(UUID::random().toString());
string container1 = containerName(slaveId, containerId);
// Start the orphan container with the old slave id.
string container2 = containerName(oldSlaveId, orphanContainerId);
// Clean up artifacts if containers still exists.
ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30)));
ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30)));
Resources resources = Resources::parse("cpus:1;mem:512").get();
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
CommandInfo commandInfo;
commandInfo.set_value("sleep 1000");
docker->run(
containerInfo,
commandInfo,
container1,
flags.work_dir,
flags.sandbox_directory,
resources);
Future<Option<int>> orphanRun =
docker->run(
containerInfo,
commandInfo,
container2,
flags.work_dir,
flags.sandbox_directory,
resources);
ASSERT_TRUE(
exists(docker, slaveId, containerId, ContainerState::RUNNING));
ASSERT_TRUE(
exists(docker, oldSlaveId, orphanContainerId, ContainerState::RUNNING));
SlaveState slaveState;
slaveState.id = slaveId;
FrameworkState frameworkState;
ExecutorID execId;
execId.set_value("e1");
ExecutorState execState;
ExecutorInfo execInfo;
execState.info = execInfo;
execState.latest = containerId;
Try<process::Subprocess> wait =
process::subprocess(tests::flags.docker + " wait " + container1);
ASSERT_SOME(wait);
FrameworkID frameworkId;
RunState runState;
runState.id = containerId;
runState.forkedPid = wait.get().pid();
execState.runs.put(containerId, runState);
frameworkState.executors.put(execId, execState);
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId);
ASSERT_FALSE(termination.isFailed());
// The orphaned container should be correctly cleaned up.
AWAIT_FAILED(dockerContainerizer.wait(orphanContainerId));
ASSERT_FALSE(exists(docker, oldSlaveId, orphanContainerId));
AWAIT_READY(orphanRun);
ASSERT_SOME(orphanRun.get());
EXPECT_TRUE(WIFEXITED(orphanRun->get())) << orphanRun->get();
EXPECT_EQ(128 + SIGKILL, WEXITSTATUS(orphanRun->get())) << orphanRun->get();
}
// This test checks the docker containerizer doesn't recover executors
// that were started by another containerizer (e.g: mesos).
TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker)
{
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
ContainerID containerId;
containerId.set_value(UUID::random().toString());
ExecutorID executorId;
executorId.set_value(UUID::random().toString());
ExecutorInfo executorInfo;
executorInfo.mutable_container()->set_type(ContainerInfo::MESOS);
ExecutorState executorState;
executorState.info = executorInfo;
executorState.latest = containerId;
RunState runState;
runState.id = containerId;
executorState.runs.put(containerId, runState);
FrameworkState frameworkState;
frameworkState.executors.put(executorId, executorState);
SlaveState slaveState;
FrameworkID frameworkId;
frameworkId.set_value(UUID::random().toString());
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<hashset<ContainerID>> containers = dockerContainerizer.containers();
AWAIT_READY(containers);
// A MesosContainerizer task shouldn't be recovered by
// DockerContainerizer.
EXPECT_EQ(0u, containers.get().size());
}
#ifdef __linux__
// This test verifies that we can launch a docker container with
// persistent volume.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpu:2;mem:2048;disk(role1):2048";
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role("role1");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
Offer offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
frameworkInfo.principal());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:64;").get() + volume);
CommandInfo command;
command.set_value("echo abc > " +
path::join(flags.sandbox_directory, "path1", "file"));
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})},
filters);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
ASSERT_FALSE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
const string& volumePath = getPersistentVolumePath(
flags.work_dir,
volume);
EXPECT_SOME_EQ("abc\n", os::read(path::join(volumePath, "file")));
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
EXPECT_SOME(table);
// Verify that the persistent volume is unmounted.
foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
EXPECT_FALSE(
strings::contains(entry.target, path::join(directory.get(), "path1")));
}
}
// This test checks the docker containerizer is able to recover containers
// with persistent volumes and destroy it properly.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpu:2;mem:2048;disk(role1):2048";
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role("role1");
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Filters filters;
filters.set_refuse_seconds(0);
// NOTE: We set filter explicitly here so that the resources will
// not be filtered for 5 seconds (the default).
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
Offer offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
frameworkInfo.principal());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:64;").get() + volume);
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})},
filters);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
// Recreate containerizer and start slave again.
slave.get()->terminate();
slave->reset();
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Wait until containerizer recover is complete.
AWAIT_READY(_recover);
Future<containerizer::Termination> termination =
dockerContainerizer->wait(containerId.get());
dockerContainerizer->destroy(containerId.get());
AWAIT_READY(termination);
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
EXPECT_SOME(table);
// Verify that the recovered container's persistent volume is
// unmounted.
foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
EXPECT_FALSE(
strings::contains(entry.target, path::join(directory.get(), "path1")));
}
driver.stop();
driver.join();
}
// This test checks the docker containerizer is able to clean up
// orphaned containers with persistent volumes.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpu:2;mem:2048;disk(role1):2048";
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role("role1");
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Filters filters;
filters.set_refuse_seconds(0);
// NOTE: We set filter explicitly here so that the resources will
// not be filtered for 5 seconds (the default).
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
Offer offer = offers.get()[0];
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
frameworkInfo.principal());
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(
Resources::parse("cpus:1;mem:64;").get() + volume);
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})},
filters);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
// Recreate containerizer and start slave again.
slave.get()->terminate();
slave->reset();
// Wipe the framework directory so that the slave will treat the
// above running task as an orphan. We don't want to wipe the whole
// meta directory since Docker Containerizer will skip recover if
// state is not found.
ASSERT_SOME(
os::rmdir(getFrameworkPath(
getMetaRootDir(flags.work_dir),
offer.slave_id(),
frameworkId.get())));
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
// Wait until containerizer recover is complete.
AWAIT_READY(_recover);
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
EXPECT_SOME(table);
// Verify that the orphaned container's persistent volume is
// unmounted.
foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
EXPECT_FALSE(
strings::contains(entry.target, path::join(directory.get(), "path1")));
}
driver.stop();
driver.join();
slave->reset();
EXPECT_FALSE(exists(docker, offer.slave_id(), containerId.get()));
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
// NOTE: We prefix `echo` with `unbuffer` so that we can immediately
// flush the output of `echo`. This mitigates a race in Docker where
// it mangles reads from stdout/stderr and commits suicide.
// See MESOS-4676 for more information.
CommandInfo command;
command.set_value(
"unbuffer echo out" + uuid + " ; "
"unbuffer echo err" + uuid + " 1>&2");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
// NOTE: This is an image that is exactly
// `docker run -t -i alpine /bin/sh -c "apk add --update expect"`.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/alpine-expect");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stderr and stdout (which
// might also contain other things, hence the use of a UUID).
Try<string> read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
EXPECT_TRUE(containsLine(lines, "err" + uuid));
EXPECT_FALSE(containsLine(lines, "out" + uuid));
read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_TRUE(containsLine(lines, "out" + uuid));
EXPECT_FALSE(containsLine(lines, "err" + uuid));
driver.stop();
driver.join();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_shell(false);
// NOTE: By not setting CommandInfo::value we're testing that we
// will still be able to run the container because it has a default
// entrypoint!
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
// Since we're not passing any command value, we're expecting the
// default entry point to be run which is 'echo' with the default
// command from the image which is 'inky'.
EXPECT_TRUE(containsLine(lines, "inky"));
read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_FALSE(containsLine(lines, "inky"));
driver.stop();
driver.join();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_shell(false);
// We can set the value to just the 'uuid' since it should get
// passed as an argument to the entrypoint, i.e., 'echo uuid'.
command.set_value(uuid);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stderr and stdout.
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
// We expect the passed in command value to override the image's
// default command, thus we should see the value of 'uuid' in the
// output instead of the default command which is 'inky'.
EXPECT_TRUE(containsLine(lines, uuid));
EXPECT_FALSE(containsLine(lines, "inky"));
read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_FALSE(containsLine(lines, "inky"));
EXPECT_FALSE(containsLine(lines, uuid));
driver.stop();
driver.join();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_shell(false);
// We should also be able to skip setting the comamnd value and just
// set the arguments and those should also get passed through to the
// entrypoint!
command.add_arguments(uuid);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stderr and stdout.
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
vector<string> lines = strings::split(read.get(), "\n");
// We expect the passed in command arguments to override the image's
// default command, thus we should see the value of 'uuid' in the
// output instead of the default command which is 'inky'.
EXPECT_TRUE(containsLine(lines, uuid));
EXPECT_FALSE(containsLine(lines, "inky"));
read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
lines = strings::split(read.get(), "\n");
EXPECT_FALSE(containsLine(lines, "inky"));
EXPECT_FALSE(containsLine(lines, uuid));
driver.stop();
driver.join();
}
// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up we make sure the executor
// re-registers and the slave properly sends the update.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
// This is owned by the containerizer, so we'll need one per containerizer.
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Enable checkpointing for the framework.
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launch)));
// Drop the first update from the executor.
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(containerId);
// Stop the slave before the status update is received.
AWAIT_READY(statusUpdateMessage);
slave.get()->terminate();
Future<Message> reregisterExecutorMessage =
FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
// This is owned by the containerizer, so we'll need one per containerizer.
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Ensure the executor re-registers.
AWAIT_READY(reregisterExecutorMessage);
UPID executorPid = reregisterExecutorMessage.get().from;
ReregisterExecutorMessage reregister;
reregister.ParseFromString(reregisterExecutorMessage.get().body);
// Executor should inform about the unacknowledged update.
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
ASSERT_EQ(TASK_RUNNING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status.get().state());
ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer->wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
}
// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up we make sure the executor
// re-registers and the slave properly sends the update.
//
// TODO(benh): This test is currently disabled because the executor
// inside the image mesosphere/test-executor does not properly set the
// executor PID that is uses during registration, so when the new
// slave recovers it can't reconnect and instead destroys that
// container. In particular, it uses '0' for its IP which we properly
// parse and can even properly use for sending other messages, but the
// current implementation of 'UPID::operator bool()' fails if the IP
// component of a PID is '0'.
TEST_F(DockerContainerizerTest,
DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
// This is owned by the containerizer, so we'll need one per containerizer.
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
Owned<MockDockerContainerizer> dockerContainerizer(
new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Enable checkpointing for the framework.
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<ContainerID> containerId;
Future<SlaveID> slaveId;
EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<4>(&slaveId),
Invoke(dockerContainerizer.get(),
&MockDockerContainerizer::_launchExecutor)));
// We need to wait until the container's pid has been been
// checkpointed so that when the next slave recovers it won't treat
// the executor as having gone lost! We know this has completed
// after Containerizer::launch returns and the
// Slave::executorLaunched gets dispatched.
Future<Nothing> executorLaunched =
FUTURE_DISPATCH(_, &Slave::executorLaunched);
// The test-executor in the image immediately sends a TASK_RUNNING
// followed by TASK_FINISHED (no sleep/delay in between) so we need
// to drop the first TWO updates that come from the executor rather
// than only the first update like above where we can control how
// the length of the task.
Future<StatusUpdateMessage> statusUpdateMessage1 =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
// Drop the first update from the executor.
Future<StatusUpdateMessage> statusUpdateMessage2 =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(containerId);
AWAIT_READY(slaveId);
AWAIT_READY(executorLaunched);
AWAIT_READY(statusUpdateMessage1);
AWAIT_READY(statusUpdateMessage2);
slave.get()->terminate();
Future<Message> reregisterExecutorMessage =
FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
// This is owned by the containerizer, so we'll need one per containerizer.
logger = ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
dockerContainerizer.reset(new MockDockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker));
slave = StartSlave(detector.get(), dockerContainerizer.get(), flags);
ASSERT_SOME(slave);
// Ensure the executor re-registers.
AWAIT_READY(reregisterExecutorMessage);
UPID executorPid = reregisterExecutorMessage.get().from;
ReregisterExecutorMessage reregister;
reregister.ParseFromString(reregisterExecutorMessage.get().body);
// Executor should inform about the unacknowledged update.
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
ASSERT_EQ(TASK_RUNNING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status.get().state());
ASSERT_TRUE(exists(docker, slaveId.get(), containerId.get()));
driver.stop();
driver.join();
}
// This test verifies that port mapping with bridge network is
// exposing the host port to the container port, by sending data
// to the host port and receiving it in the container by listening
// to the mapped container port.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpus:1;mem:1024;ports:[10000-10000]";
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
// We skip stopping the docker container because stopping a container
// even when it terminated might not flush the logs and we end up
// not getting stdout/stderr in our tests.
EXPECT_CALL(*mockDocker, stop(_, _, _))
.WillRepeatedly(Return(Nothing()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_shell(false);
command.set_value("nc");
command.add_arguments("-l");
command.add_arguments("-p");
command.add_arguments("1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
ContainerInfo::DockerInfo::PortMapping portMapping;
portMapping.set_host_port(10000);
portMapping.set_container_port(1000);
dockerInfo.add_port_mappings()->CopyFrom(portMapping);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
ASSERT_TRUE(
exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
string uuid = UUID::random().toString();
// Write uuid to docker mapped host port.
Try<process::Subprocess> s = process::subprocess(
"echo " + uuid + " | nc localhost 10000");
ASSERT_SOME(s);
AWAIT_READY_FOR(s.get().status(), Seconds(60));
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stdout.
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
const vector<string> lines = strings::split(read.get(), "\n");
// We expect the uuid that is sent to host port to be written
// to stdout by the docker container running nc -l.
EXPECT_TRUE(containsLine(lines, uuid));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
}
// This test verifies that sandbox with ':' in the path can still
// run successfully. This a limitation of the Docker CLI where
// the volume map parameter treats colons (:) as seperators,
// and incorrectly seperates the sandbox directory.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
MockDockerContainerizer dockerContainerizer(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
SlaveID slaveId = offer.slave_id();
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("test:colon");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Promise<Nothing> promise;
Future<Nothing> fetch;
// We want to pause the fetch call to simulate a long fetch time.
EXPECT_CALL(*process, fetch(_, _))
.WillOnce(DoAll(FutureSatisfy(&fetch),
Return(promise.future())));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(fetch);
dockerContainerizer.destroy(containerId.get());
// Resume docker launch.
promise.set(Nothing());
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
driver.stop();
driver.join();
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Future<Nothing> fetch;
EXPECT_CALL(*process, fetch(_, _))
.WillOnce(DoAll(FutureSatisfy(&fetch),
Return(Nothing())));
Promise<Nothing> promise;
// We want to pause the fetch call to simulate a long fetch time.
EXPECT_CALL(*process, pull(_))
.WillOnce(Return(promise.future()));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
// Wait until fetch is finished.
AWAIT_READY(fetch);
dockerContainerizer.destroy(containerId.get());
// Resume docker launch.
promise.set(Nothing());
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
driver.stop();
driver.join();
}
// This test checks that when a docker containerizer update failed
// and the container failed before the executor started, the executor
// is properly killed and cleaned up.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("ls");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusLost;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusLost));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
// Fail the update so we don't proceed to send run task to the executor.
EXPECT_CALL(dockerContainerizer, update(_, _))
.WillRepeatedly(Return(Failure("Fail resource update")));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusLost);
EXPECT_EQ(TASK_LOST, statusLost.get().state());
EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED,
statusLost.get().reason());
driver.stop();
driver.join();
}
// When the fetch fails we should send the scheduler a status
// update with message the shows the actual error.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("ls");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
EXPECT_CALL(*process, fetch(_, _))
.WillOnce(Return(Failure("some error from fetch")));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
EXPECT_EQ("Failed to launch container: some error from fetch",
statusFailed.get().message());
// TODO(jaybuff): When MESOS-2035 is addressed we should validate
// that statusFailed.get().reason() is correctly set here.
driver.stop();
driver.join();
}
// When the docker pull fails we should send the scheduler a status
// update with message the shows the actual error.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerPullFailure)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("ls");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("alpine");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
EXPECT_CALL(*mockDocker, pull(_, _, _))
.WillOnce(Return(Failure("some error from docker pull")));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
EXPECT_EQ("Failed to launch container: some error from docker pull",
statusFailed.get().message());
// TODO(jaybuff): When MESOS-2035 is addressed we should validate
// that statusFailed.get().reason() is correctly set here.
driver.stop();
driver.join();
}
// When the docker executor container fails to launch, docker inspect
// future that is in a retry loop should be discarded.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerInspectDiscard)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockDocker* mockDocker =
new MockDocker(tests::flags.docker, tests::flags.docker_socket);
Shared<Docker> docker(mockDocker);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher;
Try<ContainerLogger*> logger =
ContainerLogger::create(flags.container_logger);
ASSERT_SOME(logger);
// The docker containerizer will free the process, so we must
// allocate on the heap.
MockDockerContainerizerProcess* process =
new MockDockerContainerizerProcess(
flags,
&fetcher,
Owned<ContainerLogger>(logger.get()),
docker);
MockDockerContainerizer dockerContainerizer(
(Owned<DockerContainerizerProcess>(process)));
Future<Docker::Container> inspect;
EXPECT_CALL(*mockDocker, inspect(_, _))
.WillOnce(FutureResult(&inspect,
Invoke((MockDocker*) docker.get(),
&MockDocker::_inspect)));
EXPECT_CALL(*mockDocker, run(_, _, _, _, _, _, _, _, _, _))
.WillOnce(Return(Failure("Run failed")));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &dockerContainerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("/bin/test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
// TODO(tnachen): Use local image to test if possible.
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
Future<TaskStatus> statusFailed;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusFailed));
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launchExecutor)));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(statusFailed);
EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
AWAIT_DISCARDED(inspect);
driver.stop();
driver.join();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {