blob: 67d60a885d65edbcbbf702bce83a54d1a5c0411f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/subprocess.hpp>
#include "linux/cgroups.hpp"
#include "messages/messages.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
#include "slave/containerizer/docker.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/state.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::slave::state;
using namespace mesos::internal::tests;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::internal::slave::DockerContainerizer;
using process::Future;
using process::Message;
using process::PID;
using process::UPID;
using std::vector;
using std::list;
using std::string;
using testing::_;
using testing::DoDefault;
using testing::Eq;
using testing::Invoke;
using testing::Return;
class DockerContainerizerTest : public MesosTest
{
public:
static bool exists(
const list<Docker::Container>& containers,
const ContainerID& containerId)
{
string expectedName = slave::DOCKER_NAME_PREFIX + stringify(containerId);
foreach (const Docker::Container& container, containers) {
// Docker inspect name contains an extra slash in the beginning.
if (strings::contains(container.name, expectedName)) {
return true;
}
}
return false;
}
};
class MockDockerContainerizer : public DockerContainerizer {
public:
MockDockerContainerizer(
const slave::Flags& flags,
const Docker& docker)
: DockerContainerizer(flags, docker)
{
EXPECT_CALL(*this, launch(_, _, _, _, _, _, _))
.WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launchExecutor));
EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
.WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch));
EXPECT_CALL(*this, update(_, _))
.WillRepeatedly(Invoke(this, &MockDockerContainerizer::_update));
}
MOCK_METHOD7(
launch,
process::Future<bool>(
const ContainerID&,
const ExecutorInfo&,
const std::string&,
const Option<std::string>&,
const SlaveID&,
const process::PID<slave::Slave>&,
bool checkpoint));
MOCK_METHOD8(
launch,
process::Future<bool>(
const ContainerID&,
const TaskInfo&,
const ExecutorInfo&,
const std::string&,
const Option<std::string>&,
const SlaveID&,
const process::PID<slave::Slave>&,
bool checkpoint));
MOCK_METHOD2(
update,
process::Future<Nothing>(
const ContainerID&,
const Resources&));
// Default 'launch' implementation (necessary because we can't just
// use &DockerContainerizer::launch with 'Invoke').
process::Future<bool> _launch(
const ContainerID& containerId,
const TaskInfo& taskInfo,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint)
{
return DockerContainerizer::launch(
containerId,
taskInfo,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint);
}
process::Future<bool> _launchExecutor(
const ContainerID& containerId,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint)
{
return DockerContainerizer::launch(
containerId,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint);
}
process::Future<Nothing> _update(
const ContainerID& containerId,
const Resources& resources)
{
return DockerContainerizer::update(
containerId,
resources);
}
};
// Only enable executor launch on linux as other platforms
// requires running linux VM and need special port forwarding
// to get host networking to work.
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launchExecutor)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
Future<list<Docker::Container> > containers =
docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_TRUE(exists(containers.get(), containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
containers = docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_FALSE(exists(containers.get(), containerId.get()));
Shutdown();
}
// This test verifies that a custom executor can be launched and
// registered with the slave with docker bridge network enabled.
// We're assuming that the custom executor is registering it's public
// ip instead of 0.0.0.0 or equivelent to the slave as that's the
// default behavior for libprocess.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("tnachen/test-executor");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launchExecutor)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
Future<list<Docker::Container> > containers =
docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_TRUE(exists(containers.get(), containerId.get()));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.stop();
driver.join();
AWAIT_READY(termination);
containers = docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_FALSE(exists(containers.get(), containerId.get()));
Shutdown();
}
#endif // __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
Future<list<Docker::Container> > containers =
docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_TRUE(containers.get().size() > 0);
ASSERT_TRUE(exists(containers.get(), containerId.get()));
driver.stop();
driver.join();
Shutdown();
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilled));
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
driver.killTask(task.task_id());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled.get().state());
AWAIT_READY(termination);
Future<list<Docker::Container> > containers =
docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_FALSE(exists(containers.get(), containerId.get()));
driver.stop();
driver.join();
Shutdown();
}
// This test tests DockerContainerizer::usage().
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.resources = Option<string>("cpus:2;mem:1024");
Docker docker = Docker::create(tests::flags.docker).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
// Run a CPU intensive command, so we can measure utime and stime later.
command.set_value("dd if=/dev/zero of=/dev/null");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
// We ignore all update calls to prevent resizing cgroup limits.
EXPECT_CALL(dockerContainerizer, update(_, _))
.WillRepeatedly(Return(Nothing()));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
// Verify the usage.
ResourceStatistics statistics;
Duration waited = Duration::zero();
do {
Future<ResourceStatistics> usage =
dockerContainerizer.usage(containerId.get());
AWAIT_READY(usage);
statistics = usage.get();
if (statistics.cpus_user_time_secs() > 0 &&
statistics.cpus_system_time_secs() > 0) {
break;
}
os::sleep(Milliseconds(200));
waited += Milliseconds(200);
} while (waited < Seconds(3));
EXPECT_EQ(2, statistics.cpus_limit());
EXPECT_EQ(Gigabytes(1).bytes(), statistics.mem_limit_bytes());
EXPECT_LT(0, statistics.cpus_user_time_secs());
EXPECT_LT(0, statistics.cpus_system_time_secs());
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId.get());
dockerContainerizer.destroy(containerId.get());
AWAIT_READY(termination);
// Usage() should fail again since the container is destroyed.
Future<ResourceStatistics> usage =
dockerContainerizer.usage(containerId.get());
AWAIT_FAILED(usage);
driver.stop();
driver.join();
Shutdown();
}
#ifdef __linux__
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(containerId);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
string containerName = slave::DOCKER_NAME_PREFIX + containerId.get().value();
Future<Docker::Container> container = docker.inspect(containerName);
AWAIT_READY(container);
Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
ASSERT_SOME(newResources);
Future<Nothing> update =
dockerContainerizer.update(containerId.get(), newResources.get());
AWAIT_READY(update);
Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
Result<string> memoryHierarchy = cgroups::hierarchy("memory");
ASSERT_SOME(cpuHierarchy);
ASSERT_SOME(memoryHierarchy);
Option<pid_t> pid = container.get().pid;
ASSERT_SOME(pid);
Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
ASSERT_SOME(cpuCgroup);
Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get());
ASSERT_SOME(memoryCgroup);
Try<uint64_t> cpu = cgroups::cpu::shares(
cpuHierarchy.get(),
cpuCgroup.get());
ASSERT_SOME(cpu);
Try<Bytes> mem = cgroups::memory::soft_limit_in_bytes(
memoryHierarchy.get(),
memoryCgroup.get());
ASSERT_SOME(mem);
EXPECT_EQ(1024u, cpu.get());
EXPECT_EQ(128u, mem.get().megabytes());
newResources = Resources::parse("cpus:1;mem:144");
// Issue second update that uses the cached pid instead of inspect.
update = dockerContainerizer.update(containerId.get(), newResources.get());
AWAIT_READY(update);
cpu = cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get());
ASSERT_SOME(cpu);
mem = cgroups::memory::soft_limit_in_bytes(
memoryHierarchy.get(),
memoryCgroup.get());
ASSERT_SOME(mem);
EXPECT_EQ(1024u, cpu.get());
EXPECT_EQ(144u, mem.get().megabytes());
driver.stop();
driver.join();
Shutdown();
}
#endif //__linux__
// Disabling recover test as the docker rm in recover is async.
// Even though we wait for the container to finish, when the wait
// returns docker rm might still be in progress.
// TODO(tnachen): Re-enable test when we wait for the async kill
// to finish. One way to do this is to mock the Docker interface
// and let the mocked docker collect all the remove futures and
// at the end of the test wait for all of them before the test exits.
TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Recover)
{
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
ContainerID containerId;
containerId.set_value("c1");
ContainerID reapedContainerId;
reapedContainerId.set_value("c2");
Resources resources = Resources::parse("cpus:1;mem:512").get();
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
CommandInfo commandInfo;
commandInfo.set_value("sleep 1000");
Future<Nothing> d1 =
docker.run(
containerInfo,
commandInfo,
slave::DOCKER_NAME_PREFIX + stringify(containerId),
flags.work_dir,
flags.docker_sandbox_directory,
resources);
Future<Nothing> d2 =
docker.run(
containerInfo,
commandInfo,
slave::DOCKER_NAME_PREFIX + stringify(reapedContainerId),
flags.work_dir,
flags.docker_sandbox_directory,
resources);
AWAIT_READY(d1);
AWAIT_READY(d2);
SlaveState slaveState;
FrameworkState frameworkState;
ExecutorID execId;
execId.set_value("e1");
ExecutorState execState;
ExecutorInfo execInfo;
execState.info = execInfo;
execState.latest = containerId;
Try<process::Subprocess> wait =
process::subprocess(
tests::flags.docker + " wait " +
slave::DOCKER_NAME_PREFIX +
stringify(containerId));
ASSERT_SOME(wait);
Try<process::Subprocess> reaped =
process::subprocess(
tests::flags.docker + " wait " +
slave::DOCKER_NAME_PREFIX +
stringify(reapedContainerId));
ASSERT_SOME(reaped);
FrameworkID frameworkId;
RunState runState;
runState.id = containerId;
runState.forkedPid = wait.get().pid();
execState.runs.put(containerId, runState);
frameworkState.executors.put(execId, execState);
slaveState.frameworks.put(frameworkId, frameworkState);
Future<Nothing> recover = dockerContainerizer.recover(slaveState);
AWAIT_READY(recover);
Future<containerizer::Termination> termination =
dockerContainerizer.wait(containerId);
ASSERT_FALSE(termination.isFailed());
AWAIT_FAILED(dockerContainerizer.wait(reapedContainerId));
dockerContainerizer.destroy(containerId);
AWAIT_READY(termination);
AWAIT_READY(reaped.get().status());
}
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_value("echo out" + uuid + " ; echo err" + uuid + " 1>&2");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stderr and stdout (which
// might also contain other things, hence the use of a UUID).
Try<string> read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
EXPECT_TRUE(strings::contains(read.get(), "err" + uuid));
EXPECT_FALSE(strings::contains(read.get(), "out" + uuid));
read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
EXPECT_TRUE(strings::contains(read.get(), "out" + uuid));
EXPECT_FALSE(strings::contains(read.get(), "err" + uuid));
driver.stop();
driver.join();
Shutdown();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_shell(false);
// NOTE: By not setting CommandInfo::value we're testing that we
// will still be able to run the container because it has a default
// entrypoint!
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
// Since we're not passing any command value, we're expecting the
// default entry point to be run which is 'echo' with the default
// command from the image which is 'inky'.
EXPECT_TRUE(strings::contains(read.get(), "inky"));
read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
EXPECT_FALSE(strings::contains(read.get(), "inky"));
driver.stop();
driver.join();
Shutdown();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_shell(false);
// We can set the value to just the 'uuid' since it should get
// passed as an argument to the entrypoint, i.e., 'echo uuid'.
command.set_value(uuid);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stderr and stdout.
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
// We expect the passed in command value to override the image's
// default command, thus we should see the value of 'uuid' in the
// output instead of the default command which is 'inky'.
EXPECT_TRUE(strings::contains(read.get(), uuid));
EXPECT_FALSE(strings::contains(read.get(), "inky"));
read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
EXPECT_FALSE(strings::contains(read.get(), "inky"));
EXPECT_FALSE(strings::contains(read.get(), uuid));
driver.stop();
driver.join();
Shutdown();
}
// The following test uses a Docker image (mesosphere/inky) that has
// an entrypoint "echo" and a default command "inky".
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
string uuid = UUID::random().toString();
CommandInfo command;
command.set_shell(false);
// We should also be able to skip setting the comamnd value and just
// set the arguments and those should also get passed through to the
// entrypoint!
command.add_arguments(uuid);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/inky");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stderr and stdout.
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
// We expect the passed in command arguments to override the image's
// default command, thus we should see the value of 'uuid' in the
// output instead of the default command which is 'inky'.
EXPECT_TRUE(strings::contains(read.get(), uuid));
EXPECT_FALSE(strings::contains(read.get(), "inky"));
read = os::read(path::join(directory.get(), "stderr"));
ASSERT_SOME(read);
EXPECT_FALSE(strings::contains(read.get(), "inky"));
EXPECT_FALSE(strings::contains(read.get(), uuid));
driver.stop();
driver.join();
Shutdown();
}
// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up we make sure the executor
// re-registers and the slave properly sends the update.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
// Setup recovery slave flags.
flags.checkpoint = true;
flags.recover = "reconnect";
flags.strict = true;
Docker docker = Docker::create(tests::flags.docker, false).get();
// We put the containerizer on the heap so we can more easily
// control it's lifetime, i.e., when we invoke the destructor.
MockDockerContainerizer* dockerContainerizer1 =
new MockDockerContainerizer(flags, docker);
Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
ASSERT_SOME(slave1);
// Enable checkpointing for the framework.
FrameworkInfo frameworkInfo;
frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
Invoke(dockerContainerizer1,
&MockDockerContainerizer::_launch)));
// Drop the first update from the executor.
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(containerId);
// Stop the slave before the status update is received.
AWAIT_READY(statusUpdateMessage);
Stop(slave1.get());
delete dockerContainerizer1;
Future<Message> reregisterExecutorMessage =
FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
MockDockerContainerizer* dockerContainerizer2 =
new MockDockerContainerizer(flags, docker);
Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
ASSERT_SOME(slave2);
// Ensure the executor re-registers.
AWAIT_READY(reregisterExecutorMessage);
UPID executorPid = reregisterExecutorMessage.get().from;
ReregisterExecutorMessage reregister;
reregister.ParseFromString(reregisterExecutorMessage.get().body);
// Executor should inform about the unacknowledged update.
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
ASSERT_EQ(TASK_RUNNING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status.get().state());
// Make sure the container is still running.
Future<list<Docker::Container> > containers =
docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_TRUE(exists(containers.get(), containerId.get()));
driver.stop();
driver.join();
Shutdown();
delete dockerContainerizer2;
}
// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up we make sure the executor
// re-registers and the slave properly sends the update.
//
// TODO(benh): This test is currently disabled because the executor
// inside the image mesosphere/test-executor does not properly set the
// executor PID that is uses during registration, so when the new
// slave recovers it can't reconnect and instead destroys that
// container. In particular, it uses '0' for it's IP which we properly
// parse and can even properly use for sending other messages, but the
// current implementation of 'UPID::operator bool ()' fails if the IP
// component of a PID is '0'.
TEST_F(DockerContainerizerTest,
DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
// Setup recovery slave flags.
flags.checkpoint = true;
flags.recover = "reconnect";
flags.strict = true;
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer* dockerContainerizer1 =
new MockDockerContainerizer(flags, docker);
Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
ASSERT_SOME(slave1);
// Enable checkpointing for the framework.
FrameworkInfo frameworkInfo;
frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
frameworkInfo.set_checkpoint(true);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
ExecutorInfo executorInfo;
ExecutorID executorId;
executorId.set_value("e1");
executorInfo.mutable_executor_id()->CopyFrom(executorId);
CommandInfo command;
command.set_value("test-executor");
executorInfo.mutable_command()->CopyFrom(command);
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("mesosphere/test-executor");
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
executorInfo.mutable_container()->CopyFrom(containerInfo);
task.mutable_executor()->CopyFrom(executorInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
Future<SlaveID> slaveId;
EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<4>(&slaveId),
Invoke(dockerContainerizer1,
&MockDockerContainerizer::_launchExecutor)));
// We need to wait until the container's pid has been been
// checkpointed so that when the next slave recovers it won't treat
// the executor as having gone lost! We know this has completed
// after Containerizer::launch returns and the
// Slave::executorLaunched gets dispatched.
Future<Nothing> executorLaunched =
FUTURE_DISPATCH(_, &Slave::executorLaunched);
// The test-executor in the image immediately sends a TASK_RUNNING
// followed by TASK_FINISHED (no sleep/delay in between) so we need
// to drop the first TWO updates that come from the executor rather
// than only the first update like above where we can control how
// the length of the task.
Future<StatusUpdateMessage> statusUpdateMessage1 =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
// Drop the first update from the executor.
Future<StatusUpdateMessage> statusUpdateMessage2 =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY(containerId);
AWAIT_READY(slaveId);
AWAIT_READY(executorLaunched);
AWAIT_READY(statusUpdateMessage1);
AWAIT_READY(statusUpdateMessage2);
Stop(slave1.get());
delete dockerContainerizer1;
Future<Message> reregisterExecutorMessage =
FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
MockDockerContainerizer* dockerContainerizer2 =
new MockDockerContainerizer(flags, docker);
Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
ASSERT_SOME(slave2);
// Ensure the executor re-registers.
AWAIT_READY(reregisterExecutorMessage);
UPID executorPid = reregisterExecutorMessage.get().from;
ReregisterExecutorMessage reregister;
reregister.ParseFromString(reregisterExecutorMessage.get().body);
// Executor should inform about the unacknowledged update.
ASSERT_EQ(1, reregister.updates_size());
const StatusUpdate& update = reregister.updates(0);
ASSERT_EQ(task.task_id(), update.status().task_id());
ASSERT_EQ(TASK_RUNNING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status.get().state());
// Make sure the container is still running.
Future<list<Docker::Container> > containers =
docker.ps(true, slave::DOCKER_NAME_PREFIX);
AWAIT_READY(containers);
ASSERT_TRUE(exists(containers.get(), containerId.get()));
driver.stop();
driver.join();
Shutdown();
delete dockerContainerizer2;
}
// This test verifies that port mapping with bridge network is
// exposing the host port to the container port, by sending data
// to the host port and receiving it in the container by listening
// to the mapped container port.
TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.resources = "cpus:1;mem:1024;ports:[10000-10000]";
Docker docker = Docker::create(tests::flags.docker, false).get();
MockDockerContainerizer dockerContainerizer(flags, docker);
Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(offer.resources());
CommandInfo command;
command.set_shell(false);
command.set_value("nc");
command.add_arguments("-l");
command.add_arguments("-p");
command.add_arguments("1000");
ContainerInfo containerInfo;
containerInfo.set_type(ContainerInfo::DOCKER);
ContainerInfo::DockerInfo dockerInfo;
dockerInfo.set_image("busybox");
dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
ContainerInfo::DockerInfo::PortMapping portMapping;
portMapping.set_host_port(10000);
portMapping.set_container_port(1000);
dockerInfo.add_port_mappings()->CopyFrom(portMapping);
containerInfo.mutable_docker()->CopyFrom(dockerInfo);
task.mutable_command()->CopyFrom(command);
task.mutable_container()->CopyFrom(containerInfo);
vector<TaskInfo> tasks;
tasks.push_back(task);
Future<ContainerID> containerId;
Future<string> directory;
EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
.WillOnce(DoAll(FutureArg<0>(&containerId),
FutureArg<3>(&directory),
Invoke(&dockerContainerizer,
&MockDockerContainerizer::_launch)));
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
AWAIT_READY(directory);
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
string uuid = UUID::random().toString();
// Write uuid to docker mapped host port.
Try<process::Subprocess> s = process::subprocess(
"echo " + uuid + " | nc localhost 10000");
ASSERT_SOME(s);
AWAIT_READY_FOR(s.get().status(), Seconds(60));
string container = slave::DOCKER_NAME_PREFIX + stringify(containerId.get());
AWAIT_READY_FOR(docker.kill(container, false), Seconds(30));
AWAIT_READY_FOR(statusFinished, Seconds(60));
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
// Now check that the proper output is in stdout.
Try<string> read = os::read(path::join(directory.get(), "stdout"));
ASSERT_SOME(read);
// We expect the uuid that is sent to host port to be written
// to stdout by the docker container running nc -l.
EXPECT_TRUE(strings::contains(read.get(), uuid));
driver.stop();
driver.join();
Shutdown();
}