| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <gmock/gmock.h> |
| |
| #include <gtest/gtest.h> |
| |
| #include <mesos/slave/container_logger.hpp> |
| |
| #include <mesos/v1/mesos.hpp> |
| |
| #include <process/collect.hpp> |
| #include <process/future.hpp> |
| #include <process/gmock.hpp> |
| #include <process/io.hpp> |
| #include <process/owned.hpp> |
| #include <process/subprocess.hpp> |
| |
| #include <stout/duration.hpp> |
| #include <stout/uuid.hpp> |
| |
| #ifdef __linux__ |
| #include "linux/cgroups.hpp" |
| #include "linux/fs.hpp" |
| #endif // __linux__ |
| |
| #include "messages/messages.hpp" |
| |
| #include "slave/containerizer/docker.hpp" |
| #include "slave/containerizer/fetcher.hpp" |
| |
| #include "slave/containerizer/mesos/isolators/cgroups/constants.hpp" |
| |
| #include "slave/paths.hpp" |
| #include "slave/slave.hpp" |
| #include "slave/state.hpp" |
| |
| #include "tests/flags.hpp" |
| #include "tests/mesos.hpp" |
| #include "tests/mock_docker.hpp" |
| |
| #include "tests/containerizer/docker_common.hpp" |
| |
| using namespace mesos::internal::slave::paths; |
| using namespace mesos::internal::slave::state; |
| |
| using namespace process; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::slave::DockerContainerizer; |
| using mesos::internal::slave::DockerContainerizerProcess; |
| using mesos::internal::slave::Fetcher; |
| using mesos::internal::slave::Slave; |
| |
| using mesos::master::detector::MasterDetector; |
| |
| using mesos::slave::ContainerConfig; |
| using mesos::slave::ContainerLogger; |
| using mesos::slave::ContainerTermination; |
| |
| using std::list; |
| using std::string; |
| using std::vector; |
| |
| using testing::_; |
| using testing::DoAll; |
| using testing::DoDefault; |
| using testing::Eq; |
| using testing::Invoke; |
| using testing::Return; |
| |
| namespace process { |
| |
| // We need to reinitialize libprocess in order to test against |
| // different configurations, such as when libprocess is initialized |
| // with SSL or IPv6 enabled. |
| void reinitialize( |
| const Option<string>& delegate, |
| const Option<string>& readonlyAuthenticationRealm, |
| const Option<string>& readwriteAuthenticationRealm); |
| |
| } // namespace process { |
| |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| #ifdef __WINDOWS__ |
| static constexpr char DOCKER_INKY_IMAGE[] = "akagup/inky"; |
| #else |
| static constexpr char DOCKER_INKY_IMAGE[] = "mesosphere/inky"; |
| #endif // __WINDOWS__ |
| |
| |
| static |
| ContainerInfo createDockerInfo(const string& imageName) |
| { |
| ContainerInfo containerInfo; |
| |
| containerInfo.set_type(ContainerInfo::DOCKER); |
| containerInfo.mutable_docker()->set_image(imageName); |
| |
| return containerInfo; |
| } |
| |
| |
| class DockerContainerizerTest : public MesosTest |
| { |
| public: |
| static string containerName(const ContainerID& containerId) |
| { |
| return slave::DOCKER_NAME_PREFIX + containerId.value(); |
| } |
| |
| enum ContainerState |
| { |
| EXISTS, |
| RUNNING |
| }; |
| |
| static bool exists( |
| const process::Shared<Docker>& docker, |
| const ContainerID& containerId, |
| ContainerState state = ContainerState::EXISTS, |
| bool retry = true) |
| { |
| Duration waited = Duration::zero(); |
| string expectedName = containerName(containerId); |
| |
| #ifdef __WINDOWS__ |
| constexpr Duration waitInspect = Seconds(10); |
| constexpr Duration waitInterval = Milliseconds(500); |
| constexpr Duration waitMax = Seconds(15); |
| #else |
| constexpr Duration waitInspect = Seconds(3); |
| constexpr Duration waitInterval = Milliseconds(200); |
| constexpr Duration waitMax = Seconds(5); |
| #endif // __WINDOWS__ |
| |
| do { |
| Future<Docker::Container> inspect = docker->inspect(expectedName); |
| |
| if (!inspect.await(waitInspect)) { |
| return false; |
| } |
| |
| if (inspect.isReady()) { |
| switch (state) { |
| case ContainerState::RUNNING: |
| if (inspect->pid.isSome()) { |
| return true; |
| } |
| // Retry looking for running pid until timeout. |
| break; |
| case ContainerState::EXISTS: |
| return true; |
| } |
| } |
| |
| os::sleep(waitInterval); |
| waited += waitInterval; |
| } while (retry && waited < waitMax); |
| |
| return false; |
| } |
| |
| static bool containsLine( |
| const vector<string>& lines, |
| const string& expectedLine) |
| { |
| foreach (const string& line, lines) { |
| if (line == expectedLine) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| void SetUp() override |
| { |
| MesosTest::SetUp(); |
| |
| Future<std::tuple<Nothing, Nothing>> pulls = process::collect( |
| pullDockerImage(DOCKER_TEST_IMAGE), |
| pullDockerImage(DOCKER_INKY_IMAGE)); |
| |
| // The pull should only need to happen once since we don't delete the |
| // image. So, we only log the warning once. |
| LOG_FIRST_N(WARNING, 1) << "Pulling " << string(DOCKER_TEST_IMAGE) |
| << " and " << string(DOCKER_INKY_IMAGE) << ". " |
| << "This might take a while..."; |
| |
| // The Windows images are ~200 MB, while the Linux images are ~2MB, so |
| // hopefully this is enough time for the Windows images. There should |
| // be some parallelism too, since we're pulling them simultaneously and |
| // they share the same base Windows layer. |
| AWAIT_READY_FOR(pulls, Minutes(10)); |
| } |
| |
| void TearDown() override |
| { |
| Try<Owned<Docker>> docker = Docker::create( |
| tests::flags.docker, |
| tests::flags.docker_socket, |
| false); |
| |
| ASSERT_SOME(docker); |
| |
| Future<vector<Docker::Container>> containers = |
| docker.get()->ps(true, slave::DOCKER_NAME_PREFIX); |
| |
| AWAIT_READY(containers); |
| |
| // Cleanup all mesos launched containers. |
| foreach (const Docker::Container& container, containers.get()) { |
| AWAIT_READY_FOR(docker.get()->rm(container.id, true), Seconds(30)); |
| } |
| |
| MesosTest::TearDown(); |
| } |
| }; |
| |
| |
| // Only enable executor launch on linux as other platforms |
| // requires running linux VM and need special port forwarding |
| // to get host networking to work. |
| #ifdef __linux__ |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| ExecutorID executorId; |
| executorId.set_value("e1"); |
| |
| CommandInfo command; |
| command.set_value("/bin/test-executor"); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command, |
| executorId); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_executor()->mutable_container()->CopyFrom( |
| createDockerInfo("tnachen/test-executor")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| } |
| |
| |
| // This test verifies that a custom executor can be launched and |
| // registered with the slave with docker bridge network enabled. |
| // We're assuming that the custom executor is registering its public |
| // ip instead of 0.0.0.0 or equivalent to the slave as that's the |
| // default behavior for libprocess. |
| // |
| // Currently this test fails on ubuntu and centos since the slave is |
| // binding and advertising 127.0.x.x address and unreachable by executor |
| // in bridge network. |
| // TODO(tnachen): Re-enable this test when we are able to fix MESOS-3123. |
| TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_Launch_Executor_Bridged) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| ExecutorID executorId; |
| executorId.set_value("e1"); |
| |
| CommandInfo command; |
| command.set_value("/bin/test-executor"); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command, |
| executorId); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo("alpine"); |
| |
| containerInfo.mutable_docker()->set_network( |
| ContainerInfo::DockerInfo::BRIDGE); |
| |
| task.mutable_executor()->mutable_container()->CopyFrom(containerInfo); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| } |
| #endif // __linux__ |
| |
| |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| Try<JSON::Array> array = JSON::parse<JSON::Array>(statusRunning->data()); |
| ASSERT_SOME(array); |
| |
| // Check if container information is exposed through master's state endpoint. |
| Future<http::Response> response = http::get( |
| master.get()->pid, |
| "state", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); |
| ASSERT_SOME(parse); |
| |
| Result<JSON::Value> find = parse->find<JSON::Value>( |
| "frameworks[0].tasks[0].container.docker.privileged"); |
| |
| EXPECT_SOME_FALSE(find); |
| |
| // Check if container information is exposed through slave's state endpoint. |
| response = http::get( |
| slave.get()->pid, |
| "state", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); |
| |
| parse = JSON::parse<JSON::Object>(response->body); |
| ASSERT_SOME(parse); |
| |
| find = parse->find<JSON::Value>( |
| "frameworks[0].executors[0].tasks[0].container.docker.privileged"); |
| |
| EXPECT_SOME_FALSE(find); |
| |
| // Now verify the ContainerStatus fields in the TaskStatus. |
| ASSERT_TRUE(statusRunning->has_container_status()); |
| EXPECT_TRUE(statusRunning->container_status().has_container_id()); |
| ASSERT_EQ(1, statusRunning->container_status().network_infos().size()); |
| EXPECT_EQ(1, statusRunning->container_status().network_infos(0).ip_addresses().size()); // NOLINT(whitespace/line_length) |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| } |
| |
| |
| // This test verifies that docker executor will terminate a task after it |
| // reaches `max_completion_time`. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_MaxCompletionTime) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| const Offer& offer = offers.get()[0]; |
| |
| TaskInfo task = |
| createTask(offer.slave_id(), offer.resources(), SLEEP_COMMAND(1000)); |
| |
| // Set a `max_completion_time` for 10 seconds on Windows and 2 seconds on |
| // other platforms. Hopefully this should not block test too long and still |
| // keep it reliable. |
| #ifdef __WINDOWS__ |
| task.mutable_max_completion_time()->set_nanoseconds(Seconds(10).ns()); |
| #else |
| task.mutable_max_completion_time()->set_nanoseconds(Seconds(2).ns()); |
| #endif // __WINDOWS__ |
| |
| ContainerInfo containerInfo; |
| containerInfo.set_type(ContainerInfo::DOCKER); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFailed; |
| |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFailed)); |
| |
| Future<Nothing> executorTerminated = |
| FUTURE_DISPATCH(_, &Slave::executorTerminated); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY(statusFailed); |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| |
| EXPECT_EQ( |
| TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED, statusFailed->reason()); |
| |
| AWAIT_READY(executorTerminated); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| ASSERT_TRUE( |
| exists(docker, containerId.get(), ContainerState::RUNNING)); |
| |
| Future<TaskStatus> statusKilled; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusKilled)); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.killTask(task.task_id()); |
| |
| AWAIT_READY(statusKilled); |
| EXPECT_EQ(TASK_KILLED, statusKilled->state()); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| // Even though the task is killed, the executor should exit gracefully. |
| ASSERT_TRUE(termination.get()->has_status()); |
| EXPECT_EQ(0, termination.get()->status()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Ensures that the framework will receive a TASK_KILLING update |
| // before TASK_KILLED, if the capability is supported. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_TaskKillingCapability) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| // Start the framework with the task killing capability. |
| FrameworkInfo::Capability capability; |
| capability.set_type(FrameworkInfo::Capability::TASK_KILLING_STATE); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.add_capabilities()->CopyFrom(capability); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| ASSERT_TRUE( |
| exists(docker, containerId.get(), ContainerState::RUNNING)); |
| |
| Future<TaskStatus> statusKilling, statusKilled; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusKilling)) |
| .WillOnce(FutureArg<1>(&statusKilled)); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.killTask(task.task_id()); |
| |
| AWAIT_READY(statusKilling); |
| EXPECT_EQ(TASK_KILLING, statusKilling->state()); |
| |
| AWAIT_READY(statusKilled); |
| EXPECT_EQ(TASK_KILLED, statusKilled->state()); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test tests DockerContainerizer::usage(). |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = Option<string>("cpus:1;mem:1024"); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| CommandInfo command; |
| // Run a CPU intensive command, so we can measure utime and stime later. |
| #ifdef __WINDOWS__ |
| command.set_value("for /L %n in (1, 0, 2) do rem"); |
| #else |
| command.set_value("dd if=/dev/zero of=/dev/null"); |
| #endif // __WINDOWS__ |
| |
| Value::Scalar cpuLimit, memLimit; |
| cpuLimit.set_value(2); |
| memLimit.set_value(2048); |
| |
| google::protobuf::Map<string, Value::Scalar> resourceLimits; |
| resourceLimits.insert({"cpus", cpuLimit}); |
| resourceLimits.insert({"mem", memLimit}); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command, |
| None(), |
| "test-task", |
| id::UUID::random().toString(), |
| resourceLimits); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| // We ignore all update calls to prevent resizing cgroup limits. |
| EXPECT_CALL(dockerContainerizer, update(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Verify the usage. |
| ResourceStatistics statistics; |
| Duration waited = Duration::zero(); |
| do { |
| Future<ResourceStatistics> usage = |
| dockerContainerizer.usage(containerId.get()); |
| // TODO(tnachen): Replace await with AWAIT_COMPLETED once |
| // implemented. |
| ASSERT_TRUE(usage.await(Seconds(3))); |
| |
| if (usage.isReady()) { |
| statistics = usage.get(); |
| |
| if (statistics.cpus_user_time_secs() > 0 && |
| statistics.cpus_system_time_secs() > 0) { |
| break; |
| } |
| } |
| |
| os::sleep(Milliseconds(200)); |
| waited += Milliseconds(200); |
| } while (waited < Seconds(3)); |
| |
| EXPECT_EQ(1, statistics.cpus_soft_limit()); |
| EXPECT_EQ(2, statistics.cpus_limit()); |
| EXPECT_EQ(Gigabytes(1).bytes(), statistics.mem_soft_limit_bytes()); |
| EXPECT_EQ(Gigabytes(2).bytes(), statistics.mem_limit_bytes()); |
| |
| #ifndef __WINDOWS__ |
| // These aren't provided by the Windows Container APIs, so skip them. |
| EXPECT_LT(0, statistics.cpus_user_time_secs()); |
| EXPECT_LT(0, statistics.cpus_system_time_secs()); |
| EXPECT_GT(statistics.mem_rss_bytes(), 0u); |
| #endif // __WINDOWS__ |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| dockerContainerizer.destroy(containerId.get()); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| // Usage() should fail again since the container is destroyed. |
| Future<ResourceStatistics> usage = |
| dockerContainerizer.usage(containerId.get()); |
| |
| AWAIT_FAILED(usage); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover) |
| { |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| SlaveID slaveId; |
| slaveId.set_value("s1"); |
| ContainerID containerId; |
| containerId.set_value(id::UUID::random().toString()); |
| ContainerID reapedContainerId; |
| reapedContainerId.set_value(id::UUID::random().toString()); |
| |
| string container1 = containerName(containerId); |
| string container2 = containerName(reapedContainerId); |
| |
| // Clean up artifacts if containers still exists. |
| ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30))); |
| ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30))); |
| |
| Resources resources = Resources::parse("cpus:1;mem:512").get(); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo(DOCKER_TEST_IMAGE); |
| |
| CommandInfo commandInfo; |
| commandInfo.set_value(SLEEP_COMMAND(1000)); |
| |
| Try<Docker::RunOptions> runOptions = Docker::RunOptions::create( |
| containerInfo, |
| commandInfo, |
| container1, |
| flags.work_dir, |
| flags.sandbox_directory, |
| resources); |
| |
| ASSERT_SOME(runOptions); |
| |
| docker->run(runOptions.get()); |
| |
| Try<Docker::RunOptions> orphanOptions = Docker::RunOptions::create( |
| containerInfo, |
| commandInfo, |
| container2, |
| flags.work_dir, |
| flags.sandbox_directory, |
| resources); |
| |
| ASSERT_SOME(orphanOptions); |
| |
| Future<Option<int>> orphanRun = docker->run(orphanOptions.get()); |
| |
| ASSERT_TRUE( |
| exists(docker, containerId, ContainerState::RUNNING)); |
| ASSERT_TRUE( |
| exists(docker, reapedContainerId, ContainerState::RUNNING)); |
| |
| Future<Docker::Container> inspect = docker->inspect(container2); |
| AWAIT_READY(inspect); |
| |
| SlaveState slaveState; |
| slaveState.id = slaveId; |
| FrameworkState frameworkState; |
| |
| ExecutorID execId; |
| execId.set_value("e1"); |
| |
| ExecutorState execState; |
| ExecutorInfo execInfo; |
| execState.info = execInfo; |
| execState.latest = containerId; |
| |
| Try<process::Subprocess> wait = |
| process::subprocess(tests::flags.docker + " wait " + container1); |
| |
| ASSERT_SOME(wait); |
| |
| FrameworkID frameworkId; |
| |
| RunState runState; |
| runState.id = containerId; |
| runState.forkedPid = wait->pid(); |
| |
| execState.runs.put(containerId, runState); |
| frameworkState.executors.put(execId, execState); |
| |
| slaveState.frameworks.put(frameworkId, frameworkState); |
| |
| Future<Nothing> recover = dockerContainerizer.recover(slaveState); |
| |
| AWAIT_READY(recover); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId); |
| |
| ASSERT_FALSE(termination.isFailed()); |
| |
| // The reaped container should be cleaned up and unknown at this point. |
| Future<Option<ContainerTermination>> termination2 = |
| dockerContainerizer.wait(reapedContainerId); |
| |
| AWAIT_READY(termination2); |
| EXPECT_NONE(termination2.get()); |
| |
| // Expect the orphan to be stopped! |
| assertDockerKillStatus(orphanRun); |
| } |
| |
| |
| // This test ensures that orphaned docker containers unknown to the current |
| // agent instance are properly cleaned up. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_KillOrphanContainers) |
| { |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| SlaveID slaveId; |
| slaveId.set_value("s1"); |
| |
| ContainerID containerId; |
| containerId.set_value(id::UUID::random().toString()); |
| |
| ContainerID orphanContainerId; |
| orphanContainerId.set_value(id::UUID::random().toString()); |
| |
| string container1 = containerName(containerId); |
| |
| // Start the orphan container with the old slave id. |
| string container2 = containerName(orphanContainerId); |
| |
| // Clean up artifacts if containers still exists. |
| ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30))); |
| ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30))); |
| |
| Resources resources = Resources::parse("cpus:1;mem:512").get(); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo(DOCKER_TEST_IMAGE); |
| |
| CommandInfo commandInfo; |
| commandInfo.set_value(SLEEP_COMMAND(1000)); |
| |
| Try<Docker::RunOptions> runOptions = Docker::RunOptions::create( |
| containerInfo, |
| commandInfo, |
| container1, |
| flags.work_dir, |
| flags.sandbox_directory, |
| resources); |
| |
| ASSERT_SOME(runOptions); |
| |
| docker->run(runOptions.get()); |
| |
| Try<Docker::RunOptions> orphanOptions = Docker::RunOptions::create( |
| containerInfo, |
| commandInfo, |
| container2, |
| flags.work_dir, |
| flags.sandbox_directory, |
| resources); |
| |
| ASSERT_SOME(orphanOptions); |
| |
| Future<Option<int>> orphanRun = docker->run(orphanOptions.get()); |
| |
| ASSERT_TRUE( |
| exists(docker, containerId, ContainerState::RUNNING)); |
| |
| ASSERT_TRUE( |
| exists(docker, orphanContainerId, ContainerState::RUNNING)); |
| |
| SlaveState slaveState; |
| slaveState.id = slaveId; |
| |
| FrameworkState frameworkState; |
| |
| ExecutorID execId; |
| execId.set_value("e1"); |
| |
| ExecutorState execState; |
| ExecutorInfo execInfo; |
| |
| execState.info = execInfo; |
| execState.latest = containerId; |
| |
| Try<process::Subprocess> wait = |
| process::subprocess(tests::flags.docker + " wait " + container1); |
| |
| ASSERT_SOME(wait); |
| |
| FrameworkID frameworkId; |
| |
| RunState runState; |
| runState.id = containerId; |
| runState.forkedPid = wait->pid(); |
| |
| execState.runs.put(containerId, runState); |
| frameworkState.executors.put(execId, execState); |
| |
| slaveState.frameworks.put(frameworkId, frameworkState); |
| |
| Future<Nothing> recover = dockerContainerizer.recover(slaveState); |
| |
| AWAIT_READY(recover); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId); |
| |
| ASSERT_FALSE(termination.isFailed()); |
| |
| // The orphaned container should be correctly cleaned up. |
| Future<Option<ContainerTermination>> termination2 = |
| dockerContainerizer.wait(orphanContainerId); |
| |
| AWAIT_READY(termination2); |
| EXPECT_NONE(termination2.get()); |
| ASSERT_FALSE( |
| exists(docker, orphanContainerId, ContainerState::EXISTS, false)); |
| |
| assertDockerKillStatus(orphanRun); |
| } |
| |
| |
| // This test checks the docker containerizer doesn't recover executors |
| // that were started by another containerizer (e.g: mesos). |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker) |
| { |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| ContainerID containerId; |
| containerId.set_value(id::UUID::random().toString()); |
| |
| ExecutorID executorId; |
| executorId.set_value(id::UUID::random().toString()); |
| |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_container()->set_type(ContainerInfo::MESOS); |
| |
| ExecutorState executorState; |
| executorState.info = executorInfo; |
| executorState.latest = containerId; |
| |
| RunState runState; |
| runState.id = containerId; |
| executorState.runs.put(containerId, runState); |
| |
| FrameworkState frameworkState; |
| frameworkState.executors.put(executorId, executorState); |
| |
| SlaveState slaveState; |
| FrameworkID frameworkId; |
| frameworkId.set_value(id::UUID::random().toString()); |
| slaveState.frameworks.put(frameworkId, frameworkState); |
| |
| Future<Nothing> recover = dockerContainerizer.recover(slaveState); |
| AWAIT_READY(recover); |
| |
| Future<hashset<ContainerID>> containers = dockerContainerizer.containers(); |
| AWAIT_READY(containers); |
| |
| // A MesosContainerizer task shouldn't be recovered by |
| // DockerContainerizer. |
| EXPECT_TRUE(containers->empty()); |
| } |
| |
| |
| // This test checks the docker containerizer doesn't recover containers |
| // with malformed uuid. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverMalformedUUID) |
| { |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.docker_kill_orphans = true; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| SlaveID slaveId; |
| slaveId.set_value("s1"); |
| ContainerID containerId; |
| containerId.set_value("malformedUUID"); |
| |
| string container = containerName(containerId); |
| |
| // Clean up container if it still exists. |
| ASSERT_TRUE(docker->rm(container, true).await(Seconds(30))); |
| |
| Resources resources = Resources::parse("cpus:1;mem:512").get(); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| |
| CommandInfo commandInfo; |
| commandInfo.set_value(SLEEP_COMMAND(1000)); |
| |
| Try<Docker::RunOptions> runOptions = Docker::RunOptions::create( |
| createDockerInfo(DOCKER_TEST_IMAGE), |
| commandInfo, |
| container, |
| flags.work_dir, |
| flags.sandbox_directory, |
| resources); |
| |
| Future<Option<int>> run = docker->run(runOptions.get()); |
| |
| ASSERT_TRUE( |
| exists(docker, containerId, ContainerState::RUNNING)); |
| |
| SlaveState slaveState; |
| slaveState.id = slaveId; |
| FrameworkState frameworkState; |
| |
| ExecutorID execId; |
| execId.set_value("e1"); |
| |
| ExecutorState execState; |
| ExecutorInfo execInfo; |
| execState.info = execInfo; |
| |
| FrameworkID frameworkId; |
| frameworkState.executors.put(execId, execState); |
| slaveState.frameworks.put(frameworkId, frameworkState); |
| |
| Future<Nothing> recover = dockerContainerizer.recover(slaveState); |
| AWAIT_READY(recover); |
| |
| // The container should still exist and should not get killed |
| // by containerizer recovery. |
| ASSERT_TRUE(exists(docker, containerId)); |
| } |
| |
| |
| // TOOD(akagup): Persistent volumes aren't implemented on Windows, but these |
| // tests should be enabled once we implement them. See MESOS-5461. |
| #ifdef __linux__ |
| // This test verifies that we can launch a docker container with |
| // persistent volume. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = "cpu:2;mem:2048;disk(role1):2048"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_roles(0, "role1"); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| Resource volume = createPersistentVolume( |
| Megabytes(64), |
| "role1", |
| "id1", |
| "path1", |
| None(), |
| None(), |
| frameworkInfo.principal()); |
| |
| CommandInfo command; |
| command.set_value("echo abc > " + |
| path::join(flags.sandbox_directory, "path1", "file")); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| Resources::parse("cpus:1;mem:64").get() + volume, |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| // We use the filter explicitly here so that the resources will not |
| // be filtered for 5 seconds (the default). |
| Filters filters; |
| filters.set_refuse_seconds(0); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.acceptOffers( |
| {offers->front().id()}, |
| {CREATE(volume), LAUNCH({task})}, |
| filters); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| |
| const string& volumePath = getPersistentVolumePath( |
| flags.work_dir, |
| volume); |
| |
| EXPECT_SOME_EQ("abc\n", os::read(path::join(volumePath, "file"))); |
| |
| Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); |
| EXPECT_SOME(table); |
| |
| // Verify that the persistent volume is unmounted. |
| foreach (const fs::MountInfoTable::Entry& entry, table->entries) { |
| EXPECT_FALSE(strings::contains( |
| entry.target, |
| path::join(containerConfig->directory(), "path1"))); |
| } |
| } |
| |
| |
| // This test checks the docker containerizer is able to recover containers |
| // with persistent volumes and destroy it properly. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = "cpu:2;mem:2048;disk(role1):2048"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| Owned<MockDockerContainerizer> dockerContainerizer( |
| new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_roles(0, "role1"); |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Filters filters; |
| filters.set_refuse_seconds(0); |
| |
| // NOTE: We set filter explicitly here so that the resources will |
| // not be filtered for 5 seconds (the default). |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| Resource volume = createPersistentVolume( |
| Megabytes(64), |
| "role1", |
| "id1", |
| "path1", |
| None(), |
| None(), |
| frameworkInfo.principal()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| Resources::parse("cpus:1;mem:64").get() + volume, |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(dockerContainerizer.get(), |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.acceptOffers( |
| {offers->front().id()}, |
| {CREATE(volume), LAUNCH({task})}, |
| filters); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Recreate containerizer and start slave again. |
| slave.get()->terminate(); |
| slave->reset(); |
| |
| logger = ContainerLogger::create(flags.container_logger); |
| ASSERT_SOME(logger); |
| |
| dockerContainerizer.reset(new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| slave = StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| // Wait until containerizer recover is complete. |
| AWAIT_READY(_recover); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer->destroy(containerId.get()); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); |
| EXPECT_SOME(table); |
| |
| // Verify that the recovered container's persistent volume is |
| // unmounted. |
| foreach (const fs::MountInfoTable::Entry& entry, table->entries) { |
| EXPECT_FALSE(strings::contains( |
| entry.target, |
| path::join(containerConfig->directory(), "path1"))); |
| } |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test checks the docker containerizer is able to clean up |
| // orphaned containers with persistent volumes. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = "cpu:2;mem:2048;disk(role1):2048"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| Owned<MockDockerContainerizer> dockerContainerizer( |
| new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_roles(0, "role1"); |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Filters filters; |
| filters.set_refuse_seconds(0); |
| |
| // NOTE: We set filter explicitly here so that the resources will |
| // not be filtered for 5 seconds (the default). |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| Resource volume = createPersistentVolume( |
| Megabytes(64), |
| "role1", |
| "id1", |
| "path1", |
| None(), |
| None(), |
| frameworkInfo.principal()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| Resources::parse("cpus:1;mem:64").get() + volume, |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(dockerContainerizer.get(), |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.acceptOffers( |
| {offers->front().id()}, |
| {CREATE(volume), LAUNCH({task})}, |
| filters); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Recreate containerizer and start slave again. |
| slave.get()->terminate(); |
| slave->reset(); |
| |
| // Wipe the framework directory so that the slave will treat the |
| // above running task as an orphan. We don't want to wipe the whole |
| // meta directory since Docker Containerizer will skip recover if |
| // state is not found. |
| ASSERT_SOME( |
| os::rmdir(getFrameworkPath( |
| getMetaRootDir(flags.work_dir), |
| offers->front().slave_id(), |
| frameworkId.get()))); |
| |
| logger = ContainerLogger::create(flags.container_logger); |
| ASSERT_SOME(logger); |
| |
| dockerContainerizer.reset(new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| slave = StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| // Wait until containerizer recover is complete. |
| AWAIT_READY(_recover); |
| |
| Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); |
| EXPECT_SOME(table); |
| |
| // Verify that the orphaned container's persistent volume is |
| // unmounted. |
| foreach (const fs::MountInfoTable::Entry& entry, table->entries) { |
| EXPECT_FALSE(strings::contains( |
| entry.target, |
| path::join(containerConfig->directory(), "path1"))); |
| } |
| |
| driver.stop(); |
| driver.join(); |
| |
| slave->reset(); |
| |
| EXPECT_FALSE( |
| exists(docker, containerId.get(), ContainerState::EXISTS, false)); |
| } |
| #endif // __linux__ |
| |
| |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| // We skip stopping the docker container because stopping a container |
| // even when it terminated might not flush the logs and we end up |
| // not getting stdout/stderr in our tests. |
| EXPECT_CALL(*mockDocker, stop(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| string uuid = id::UUID::random().toString(); |
| |
| CommandInfo command; |
| #ifdef __WINDOWS__ |
| // We avoid spaces in `echo` since `echo` in `cmd.exe` treats spaces |
| // in the argument as literal spaces so `echo X<SPACE>` outputs X<SPACE>. |
| // We don't use powershell here since `Write-Error` is verbose and causes |
| // the script to return a failure. |
| command.set_value( |
| "echo out" + uuid + "&" |
| "(echo err" + uuid + ")1>&2"); |
| #else |
| // NOTE: We prefix `echo` with `unbuffer` so that we can immediately |
| // flush the output of `echo`. This mitigates a race in Docker where |
| // it mangles reads from stdout/stderr and commits suicide. |
| // See MESOS-4676 for more information. |
| command.set_value( |
| "unbuffer echo out" + uuid + " ; " |
| "unbuffer echo err" + uuid + " 1>&2"); |
| #endif // __WINDOWS__ |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| #ifdef __WINDOWS__ |
| const ContainerInfo containerInfo = |
| createDockerInfo(DOCKER_TEST_IMAGE); |
| #else |
| // NOTE: This is an image that is exactly |
| // `docker run -t -i alpine /bin/sh -c "apk add --update expect"`. |
| const ContainerInfo containerInfo = |
| createDockerInfo("mesosphere/alpine-expect"); |
| #endif // __WINDOWS__ |
| |
| task.mutable_container()->CopyFrom(containerInfo); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| // Now check that the proper output is in stderr and stdout (which |
| // might also contain other things, hence the use of a UUID). |
| Try<string> read = |
| os::read(path::join(containerConfig->directory(), "stderr")); |
| |
| ASSERT_SOME(read); |
| |
| vector<string> lines = strings::split(read.get(), "\n"); |
| |
| EXPECT_TRUE(containsLine(lines, "err" + uuid)); |
| EXPECT_FALSE(containsLine(lines, "out" + uuid)); |
| |
| read = os::read(path::join(containerConfig->directory(), "stdout")); |
| ASSERT_SOME(read); |
| |
| lines = strings::split(read.get(), "\n"); |
| |
| EXPECT_TRUE(containsLine(lines, "out" + uuid)); |
| EXPECT_FALSE(containsLine(lines, "err" + uuid)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // The following test uses a Docker image (mesosphere/inky) that has |
| // an entrypoint "echo" and a default command "inky". |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| // We skip stopping the docker container because stopping a container |
| // even when it terminated might not flush the logs and we end up |
| // not getting stdout/stderr in our tests. |
| EXPECT_CALL(*mockDocker, stop(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| |
| // NOTE: By not setting CommandInfo::value we're testing that we |
| // will still be able to run the container because it has a default |
| // entrypoint! |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_INKY_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| Try<string> read = |
| os::read(path::join(containerConfig->directory(), "stdout")); |
| |
| ASSERT_SOME(read); |
| |
| vector<string> lines = strings::split(read.get(), "\n"); |
| |
| // Since we're not passing any command value, we're expecting the |
| // default entry point to be run which is 'echo' with the default |
| // command from the image which is 'inky'. |
| EXPECT_TRUE(containsLine(lines, "inky")); |
| |
| read = os::read(path::join(containerConfig->directory(), "stderr")); |
| ASSERT_SOME(read); |
| |
| lines = strings::split(read.get(), "\n"); |
| |
| EXPECT_FALSE(containsLine(lines, "inky")); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // The following test uses a Docker image (mesosphere/inky) that has |
| // an entrypoint "echo" and a default command "inky". |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| // We skip stopping the docker container because stopping a container |
| // even when it terminated might not flush the logs and we end up |
| // not getting stdout/stderr in our tests. |
| EXPECT_CALL(*mockDocker, stop(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| string uuid = id::UUID::random().toString(); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| |
| // We can set the value to just the 'uuid' since it should get |
| // passed as an argument to the entrypoint, i.e., 'echo uuid'. |
| command.set_value(uuid); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_INKY_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| // Now check that the proper output is in stderr and stdout. |
| Try<string> read = |
| os::read(path::join(containerConfig->directory(), "stdout")); |
| |
| ASSERT_SOME(read); |
| |
| vector<string> lines = strings::split(read.get(), "\n"); |
| |
| // We expect the passed in command value to override the image's |
| // default command, thus we should see the value of 'uuid' in the |
| // output instead of the default command which is 'inky'. |
| EXPECT_TRUE(containsLine(lines, uuid)); |
| EXPECT_FALSE(containsLine(lines, "inky")); |
| |
| read = os::read(path::join(containerConfig->directory(), "stderr")); |
| ASSERT_SOME(read); |
| |
| lines = strings::split(read.get(), "\n"); |
| |
| EXPECT_FALSE(containsLine(lines, "inky")); |
| EXPECT_FALSE(containsLine(lines, uuid)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // The following test uses a Docker image (mesosphere/inky) that has |
| // an entrypoint "echo" and a default command "inky". |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| // We skip stopping the docker container because stopping a container |
| // even when it terminated might not flush the logs and we end up |
| // not getting stdout/stderr in our tests. |
| EXPECT_CALL(*mockDocker, stop(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| string uuid = id::UUID::random().toString(); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| |
| // We should also be able to skip setting the command value and just |
| // set the arguments and those should also get passed through to the |
| // entrypoint! |
| command.add_arguments(uuid); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_INKY_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| // Now check that the proper output is in stderr and stdout. |
| Try<string> read = |
| os::read(path::join(containerConfig->directory(), "stdout")); |
| |
| ASSERT_SOME(read); |
| |
| vector<string> lines = strings::split(read.get(), "\n"); |
| |
| // We expect the passed in command arguments to override the image's |
| // default command, thus we should see the value of 'uuid' in the |
| // output instead of the default command which is 'inky'. |
| EXPECT_TRUE(containsLine(lines, uuid)); |
| EXPECT_FALSE(containsLine(lines, "inky")); |
| |
| read = os::read(path::join(containerConfig->directory(), "stderr")); |
| ASSERT_SOME(read); |
| |
| lines = strings::split(read.get(), "\n"); |
| |
| EXPECT_FALSE(containsLine(lines, "inky")); |
| EXPECT_FALSE(containsLine(lines, uuid)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // The slave is stopped before the first update for a task is received |
| // from the executor. When it comes back up we make sure the executor |
| // reregisters and the slave properly sends the update. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| // This is owned by the containerizer, so we'll need one per containerizer. |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| Owned<MockDockerContainerizer> dockerContainerizer( |
| new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(dockerContainerizer.get(), |
| &MockDockerContainerizer::_launch))); |
| |
| // Drop the status updates from the executor. We actually wait until we can |
| // drop the `TASK_RUNNING` update here because the window between the two is |
| // small enough that we could still successfully receive `TASK_RUNNING` after |
| // we have dropped `TASK_STARTING`. |
| Future<StatusUpdateMessage> runningUpdate = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, _); |
| |
| Future<StatusUpdateMessage> startingUpdate = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(containerId); |
| |
| // Stop the slave before the status updates are received. |
| AWAIT_READY(startingUpdate); |
| AWAIT_READY(runningUpdate); |
| |
| slave.get()->terminate(); |
| |
| Future<Message> reregisterExecutorMessage = |
| FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| // This is owned by the containerizer, so we'll need one per containerizer. |
| logger = ContainerLogger::create(flags.container_logger); |
| ASSERT_SOME(logger); |
| |
| dockerContainerizer.reset(new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| slave = StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure the executor reregisters. |
| AWAIT_READY(reregisterExecutorMessage); |
| |
| ReregisterExecutorMessage reregister; |
| reregister.ParseFromString(reregisterExecutorMessage->body); |
| |
| // Executor should inform about the unacknowledged updates. |
| ASSERT_EQ(2, reregister.updates_size()); |
| |
| const StatusUpdate& updateStarting = reregister.updates(0); |
| ASSERT_EQ(task.task_id(), updateStarting.status().task_id()); |
| ASSERT_EQ(TASK_STARTING, updateStarting.status().state()); |
| |
| const StatusUpdate& updateRunning = reregister.updates(1); |
| ASSERT_EQ(task.task_id(), updateRunning.status().task_id()); |
| ASSERT_EQ(TASK_RUNNING, updateRunning.status().state()); |
| |
| // Scheduler should receive the recovered update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_STARTING, status->state()); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer->wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| |
| #ifndef __WINDOWS__ |
| // The slave is stopped before the first update for a task is received |
| // from the executor. When it comes back up we make sure the executor |
| // reregisters and the slave properly sends the update. |
| // |
| // The test is removed on Windows, because the `mesosphere/test-executor` |
| // image doesn't work on Windows and probably won't ever be ported. |
| // |
| // TODO(benh): This test is currently disabled because the executor |
| // inside the image mesosphere/test-executor does not properly set the |
| // executor PID that is uses during registration, so when the new |
| // slave recovers it can't reconnect and instead destroys that |
| // container. In particular, it uses '0' for its IP which we properly |
| // parse and can even properly use for sending other messages, but the |
| // current implementation of 'UPID::operator bool()' fails if the IP |
| // component of a PID is '0'. |
| // |
| // TODO(alexr): Enable after MESOS-8708 is resolved. |
| TEST_F(DockerContainerizerTest, |
| DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| // This is owned by the containerizer, so we'll need one per containerizer. |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| Owned<MockDockerContainerizer> dockerContainerizer( |
| new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| ExecutorID executorId; |
| executorId.set_value("e1"); |
| |
| CommandInfo command; |
| command.set_value("test-executor"); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command, |
| executorId); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_executor()->mutable_container()->CopyFrom( |
| createDockerInfo("mesosphere/test-executor")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(dockerContainerizer.get(), |
| &MockDockerContainerizer::_launch))); |
| |
| // We need to wait until the container's pid has been been |
| // checkpointed so that when the next slave recovers it won't treat |
| // the executor as having gone lost! We know this has completed |
| // after Containerizer::launch returns and the |
| // Slave::executorLaunched gets dispatched. |
| Future<Nothing> executorLaunched = |
| FUTURE_DISPATCH(_, &Slave::executorLaunched); |
| |
| // The test-executor in the image immediately sends a TASK_RUNNING |
| // followed by TASK_FINISHED (no sleep/delay in between) so we need |
| // to drop the first TWO updates that come from the executor rather |
| // than only the first update like above where we can control how |
| // the length of the task. |
| Future<StatusUpdateMessage> statusUpdateMessage1 = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, _); |
| |
| // Drop the first update from the executor. |
| Future<StatusUpdateMessage> statusUpdateMessage2 = |
| DROP_PROTOBUF(StatusUpdateMessage(), _, _); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(containerId); |
| |
| AWAIT_READY(executorLaunched); |
| AWAIT_READY(statusUpdateMessage1); |
| AWAIT_READY(statusUpdateMessage2); |
| |
| slave.get()->terminate(); |
| |
| Future<Message> reregisterExecutorMessage = |
| FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| // This is owned by the containerizer, so we'll need one per containerizer. |
| logger = ContainerLogger::create(flags.container_logger); |
| ASSERT_SOME(logger); |
| |
| dockerContainerizer.reset(new MockDockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker)); |
| |
| slave = StartSlave(detector.get(), dockerContainerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure the executor reregisters. |
| AWAIT_READY(reregisterExecutorMessage); |
| |
| ReregisterExecutorMessage reregister; |
| reregister.ParseFromString(reregisterExecutorMessage->body); |
| |
| // Executor should inform about the unacknowledged update. |
| ASSERT_EQ(1, reregister.updates_size()); |
| const StatusUpdate& update = reregister.updates(0); |
| ASSERT_EQ(task.task_id(), update.status().task_id()); |
| ASSERT_EQ(TASK_STARTING, update.status().state()); |
| |
| // Scheduler should receive the recovered update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_STARTING, status->state()); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| #endif // __WINDOWS__ |
| |
| |
| // This test verifies that port mapping with bridge network is |
| // exposing the host port to the container port, by sending data |
| // to the host port and receiving it in the container by listening |
| // to the mapped container port. |
| // |
| // TODO(akagup): This test requres netcat on the Windows host before |
| // it can be ported. We could provide a build of netcat or just replace |
| // it with powershell for this test. |
| TEST_F_TEMP_DISABLED_ON_WINDOWS( |
| DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = "cpus:1;mem:1024;ports:[10000-10000]"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| // We skip stopping the docker container because stopping a container |
| // even when it terminated might not flush the logs and we end up |
| // not getting stdout/stderr in our tests. |
| EXPECT_CALL(*mockDocker, stop(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("nc"); |
| command.add_arguments("-l"); |
| command.add_arguments("-p"); |
| command.add_arguments("1000"); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo("alpine"); |
| |
| containerInfo.mutable_docker()->set_network( |
| ContainerInfo::DockerInfo::BRIDGE); |
| |
| ContainerInfo::DockerInfo::PortMapping portMapping; |
| portMapping.set_host_port(10000); |
| portMapping.set_container_port(1000); |
| |
| containerInfo.mutable_docker()->add_port_mappings()->CopyFrom(portMapping); |
| |
| task.mutable_container()->CopyFrom(containerInfo); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| ASSERT_TRUE( |
| exists(docker, |
| containerId.get(), |
| ContainerState::RUNNING)); |
| |
| string uuid = id::UUID::random().toString(); |
| |
| // Write uuid to docker mapped host port. |
| Try<process::Subprocess> s = process::subprocess( |
| "echo " + uuid + " | nc localhost 10000"); |
| |
| ASSERT_SOME(s); |
| AWAIT_READY_FOR(s->status(), Seconds(60)); |
| |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| // Now check that the proper output is in stdout. |
| Try<string> read = |
| os::read(path::join(containerConfig->directory(), "stdout")); |
| |
| ASSERT_SOME(read); |
| |
| const vector<string> lines = strings::split(read.get(), "\n"); |
| |
| // We expect the uuid that is sent to host port to be written |
| // to stdout by the docker container running nc -l. |
| EXPECT_TRUE(containsLine(lines, uuid)); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| |
| #ifndef __WINDOWS__ |
| // This test verifies that sandbox with ':' in the path can still |
| // run successfully. This a limitation of the Docker CLI where |
| // the volume map parameter treats colons (:) as separators, |
| // and incorrectly separates the sandbox directory. |
| // |
| // On Windows, colons aren't a legal path character, so this test is skipped. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Create a sleep task whose name is "test:colon". |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000), |
| None(), |
| "test:colon"); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| #endif // __WINDOWS__ |
| |
| |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| // The docker containerizer will free the process, so we must |
| // allocate on the heap. |
| MockDockerContainerizerProcess* process = |
| new MockDockerContainerizerProcess( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| MockDockerContainerizer dockerContainerizer( |
| (Owned<DockerContainerizerProcess>(process))); |
| |
| Promise<Nothing> promise; |
| Future<Nothing> fetch; |
| |
| // We want to pause the fetch call to simulate a long fetch time. |
| EXPECT_CALL(*process, fetch(_)) |
| .WillOnce(DoAll(FutureSatisfy(&fetch), |
| Return(promise.future()))); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<TaskStatus> statusFailed; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusFailed)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| AWAIT_READY(fetch); |
| |
| dockerContainerizer.destroy(containerId.get()); |
| |
| // Resume docker launch. |
| promise.set(Nothing()); |
| |
| AWAIT_READY(statusFailed); |
| |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| // The docker containerizer will free the process, so we must |
| // allocate on the heap. |
| MockDockerContainerizerProcess* process = |
| new MockDockerContainerizerProcess( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| MockDockerContainerizer dockerContainerizer( |
| (Owned<DockerContainerizerProcess>(process))); |
| |
| Future<Nothing> fetch; |
| EXPECT_CALL(*process, fetch(_)) |
| .WillOnce(DoAll(FutureSatisfy(&fetch), |
| Return(Nothing()))); |
| |
| Promise<Nothing> promise; |
| |
| // We want to pause the fetch call to simulate a long fetch time. |
| EXPECT_CALL(*process, pull(_)) |
| .WillOnce(Return(promise.future())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<TaskStatus> statusFailed; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusFailed)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| // Wait until fetch is finished. |
| AWAIT_READY(fetch); |
| |
| dockerContainerizer.destroy(containerId.get()); |
| |
| // Resume docker launch. |
| promise.set(Nothing()); |
| |
| AWAIT_READY(statusFailed); |
| |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Ensures the containerizer responds correctly (false Future) to |
| // a request to destroy an unknown container. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyUnknownContainer) |
| { |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<DockerContainerizer*> create = |
| DockerContainerizer::create(flags, &fetcher); |
| |
| ASSERT_SOME(create); |
| |
| DockerContainerizer* containerizer = create.get(); |
| |
| ContainerID containerId; |
| containerId.set_value(id::UUID::random().toString()); |
| |
| Future<Option<ContainerTermination>> destroyed = |
| containerizer->destroy(containerId); |
| |
| AWAIT_READY(destroyed); |
| EXPECT_NONE(destroyed.get()); |
| } |
| |
| |
| // This test checks that when a docker containerizer update failed |
| // and the container failed before the executor started, the executor |
| // is properly killed and cleaned up. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| // The docker containerizer will free the process, so we must |
| // allocate on the heap. |
| MockDockerContainerizerProcess* process = |
| new MockDockerContainerizerProcess( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| MockDockerContainerizer dockerContainerizer( |
| (Owned<DockerContainerizerProcess>(process))); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.add_capabilities()->set_type( |
| FrameworkInfo::Capability::PARTITION_AWARE); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| "exit 0"); |
| |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<TaskStatus> statusGone; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusGone)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| // Fail the update so we don't proceed to send run task to the executor. |
| EXPECT_CALL(dockerContainerizer, update(_, _, _)) |
| .WillRepeatedly(Return(Failure("Fail resource update"))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| AWAIT_READY(statusGone); |
| EXPECT_EQ(TASK_GONE, statusGone->state()); |
| EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, |
| statusGone->reason()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // When the fetch fails we should send the scheduler a status |
| // update with message the shows the actual error. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| // The docker containerizer will free the process, so we must |
| // allocate on the heap. |
| MockDockerContainerizerProcess* process = |
| new MockDockerContainerizerProcess( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| MockDockerContainerizer dockerContainerizer( |
| (Owned<DockerContainerizerProcess>(process))); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| "exit 0"); |
| |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<TaskStatus> statusFailed; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusFailed)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| EXPECT_CALL(*process, fetch(_)) |
| .WillOnce(Return(Failure("some error from fetch"))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| AWAIT_READY(statusFailed); |
| |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| EXPECT_EQ("Failed to launch container: some error from fetch", |
| statusFailed->message()); |
| |
| // TODO(jaybuff): When MESOS-2035 is addressed we should validate |
| // that statusFailed->reason() is correctly set here. |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // When the docker pull fails we should send the scheduler a status |
| // update with message the shows the actual error. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerPullFailure) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| // The docker containerizer will free the process, so we must |
| // allocate on the heap. |
| MockDockerContainerizerProcess* process = |
| new MockDockerContainerizerProcess( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| MockDockerContainerizer dockerContainerizer( |
| (Owned<DockerContainerizerProcess>(process))); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| "exit 0"); |
| |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<TaskStatus> statusFailed; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusFailed)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| EXPECT_CALL(*mockDocker, pull(_, _, _)) |
| .WillOnce(Return(Failure("some error from docker pull"))); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| AWAIT_READY(statusFailed); |
| |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| EXPECT_EQ("Failed to launch container: some error from docker pull", |
| statusFailed->message()); |
| |
| // TODO(jaybuff): When MESOS-2035 is addressed we should validate |
| // that statusFailed->reason() is correctly set here. |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // When the docker executor container fails to launch, docker inspect |
| // future that is in a retry loop should be discarded. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerInspectDiscard) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| // The docker containerizer will free the process, so we must |
| // allocate on the heap. |
| MockDockerContainerizerProcess* process = |
| new MockDockerContainerizerProcess( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| MockDockerContainerizer dockerContainerizer( |
| (Owned<DockerContainerizerProcess>(process))); |
| |
| Future<Docker::Container> inspect; |
| EXPECT_CALL(*mockDocker, inspect(_, _)) |
| .WillOnce(FutureResult(&inspect, |
| Invoke((MockDocker*) docker.get(), |
| &MockDocker::_inspect))); |
| |
| EXPECT_CALL(*mockDocker, run(_, _, _)) |
| .WillOnce(Return(Failure("Run failed"))); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| ExecutorID executorId; |
| executorId.set_value("e1"); |
| |
| CommandInfo command; |
| #ifdef __WINDOWS__ |
| command.set_value(SLEEP_COMMAND(1000)); |
| #else |
| command.set_value("/bin/test-executor"); |
| #endif // __WINDOWS__ |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command, |
| executorId); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| #ifdef __WINDOWS__ |
| const ContainerInfo containerInfo = createDockerInfo(DOCKER_TEST_IMAGE); |
| #else |
| const ContainerInfo containerInfo = createDockerInfo("tnachen/test-executor"); |
| #endif // __WINDOWS__ |
| |
| task.mutable_executor()->mutable_container()->CopyFrom(containerInfo); |
| |
| Future<TaskStatus> statusFailed; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusFailed)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<Nothing> executorLost; |
| EXPECT_CALL(sched, executorLost(&driver, executorId, _, _)) |
| .WillOnce(FutureSatisfy(&executorLost)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| AWAIT_READY(statusFailed); |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| |
| AWAIT_READY(executorLost); |
| |
| AWAIT_DISCARDED(inspect); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Ensures the containerizer responds correctly (returns None) |
| // to a request to wait on an unknown container. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_WaitUnknownContainer) |
| { |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<DockerContainerizer*> create = |
| DockerContainerizer::create(flags, &fetcher); |
| |
| ASSERT_SOME(create); |
| |
| DockerContainerizer* containerizer = create.get(); |
| |
| ContainerID containerId; |
| containerId.set_value(id::UUID::random().toString()); |
| |
| Future<Option<ContainerTermination>> wait = containerizer->wait(containerId); |
| |
| AWAIT_READY(wait); |
| EXPECT_NONE(wait.get()); |
| } |
| |
| |
| // This test ensures that a task will transition straight from `TASK_KILLING` to |
| // `TASK_KILLED`, even if the health check begins to fail during the kill policy |
| // grace period. |
| // |
| // TODO(gkleiman): this test takes about 7 seconds to run, consider using mock |
| // tasks and health checkers to speed it up. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_NoTransitionFromKillingToRunning) |
| { |
| Shared<Docker> docker(new MockDocker( |
| tests::flags.docker, tests::flags.docker_socket)); |
| |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags agentFlags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(agentFlags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(agentFlags.container_logger); |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer containerizer( |
| agentFlags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> agent = |
| StartSlave(detector.get(), &containerizer, agentFlags); |
| ASSERT_SOME(agent); |
| |
| // Start the framework with the task killing capability. |
| FrameworkInfo::Capability capability; |
| capability.set_type(FrameworkInfo::Capability::TASK_KILLING_STATE); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.add_capabilities()->CopyFrom(capability); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers->size()); |
| |
| const uint16_t testPort = getFreePort().get(); |
| |
| #ifdef __WINDOWS__ |
| // On Windows, we will do a command health check instead of a TCP one, |
| // so that this test will work on Windows 10 (Hyper-V isolation) containers. |
| const string command = SLEEP_COMMAND(1000); |
| #else |
| // Launch a HTTP server until SIGTERM is received, then sleep for |
| // 15 seconds to let the health check fail. |
| const string command = strings::format( |
| "trap \"sleep 15\" SIGTERM && nc -lk -p %u -e echo", |
| testPort).get(); |
| #endif // __WINDOWS__ |
| |
| TaskInfo task = createTask(offers->front(), command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo(DOCKER_TEST_IMAGE); |
| |
| // On Linux, the docker container runs in host network mode. |
| #ifndef __WINDOWS__ |
| containerInfo.mutable_docker()->set_network( |
| ContainerInfo::DockerInfo::HOST); |
| #endif // __WINDOWS__ |
| |
| task.mutable_container()->CopyFrom(containerInfo); |
| |
| HealthCheck healthCheck; |
| #ifdef __WINDOWS__ |
| healthCheck.set_type(HealthCheck::COMMAND); |
| |
| // The first `mkdir` will succeed, but the later ones will fail, so we get |
| // the same behavior as the Linux test. |
| healthCheck.mutable_command()->set_value("mkdir C:\\healthcheck-test"); |
| #else |
| healthCheck.set_type(HealthCheck::TCP); |
| healthCheck.mutable_tcp()->set_port(testPort); |
| #endif // __WINDOWS__ |
| |
| // Set `grace_period_seconds` here because it takes some time to launch |
| // Netcat to serve requests. |
| healthCheck.set_delay_seconds(0); |
| healthCheck.set_grace_period_seconds(15); |
| healthCheck.set_interval_seconds(0); |
| |
| task.mutable_health_check()->CopyFrom(healthCheck); |
| |
| // Set the kill policy grace period to 5 seconds. |
| KillPolicy killPolicy; |
| killPolicy.mutable_grace_period()->set_nanoseconds(Seconds(5).ns()); |
| |
| task.mutable_kill_policy()->CopyFrom(killPolicy); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(containerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&containerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusHealthy; |
| Future<TaskStatus> statusKilling; |
| Future<TaskStatus> statusKilled; |
| |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusHealthy)) |
| .WillOnce(FutureArg<1>(&statusKilling)) |
| .WillOnce(FutureArg<1>(&statusKilled)); |
| |
| driver.launchTasks(offers->front().id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| AWAIT_READY(statusHealthy); |
| EXPECT_EQ(TASK_RUNNING, statusHealthy->state()); |
| EXPECT_TRUE(statusHealthy->has_healthy()); |
| EXPECT_TRUE(statusHealthy->healthy()); |
| |
| driver.killTask(task.task_id()); |
| |
| AWAIT_READY(statusKilling); |
| EXPECT_EQ(TASK_KILLING, statusKilling->state()); |
| EXPECT_FALSE(statusKilling->has_healthy()); |
| |
| AWAIT_READY(statusKilled); |
| EXPECT_EQ(TASK_KILLED, statusKilled->state()); |
| EXPECT_FALSE(statusKilled->has_healthy()); |
| |
| Future<Option<ContainerTermination>> termination = |
| containerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| |
| |
| #ifndef __WINDOWS__ |
| // This test ensures that a task will transition from `TASK_KILLING` |
| // to `TASK_KILLED` rather than `TASK_FINISHED` when it is killed, |
| // even if it returns an "EXIT_STATUS" of 0 on receiving a SIGTERM. |
| // |
| // This test is ignored on Windows, since Windows containers seem to |
| // always return `STATUS_CONTROL_C_EXIT` and `STATUS_UNSUCCESSFUL` for |
| // graceful and forceful shutdown. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_NoTransitionFromKillingToFinished) |
| { |
| Shared<Docker> docker(new MockDocker( |
| tests::flags.docker, tests::flags.docker_socket)); |
| |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags agentFlags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(agentFlags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(agentFlags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer containerizer( |
| agentFlags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> agent = |
| StartSlave(detector.get(), &containerizer, agentFlags); |
| |
| ASSERT_SOME(agent); |
| |
| // Start the framework with the task killing capability. |
| FrameworkInfo::Capability capability; |
| capability.set_type(FrameworkInfo::Capability::TASK_KILLING_STATE); |
| |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.add_capabilities()->CopyFrom(capability); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers->size()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // The "nginx:alpine" container returns an "EXIT_STATUS" of 0 on |
| // receiving a SIGTERM. |
| // |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("nginx:alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(containerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&containerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusKilling; |
| Future<TaskStatus> statusKilled; |
| |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusKilling)) |
| .WillOnce(FutureArg<1>(&statusKilled)); |
| |
| driver.launchTasks(offers->front().id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Docker executor will call "docker stop ..." to send SIGTERM |
| // to kill the task. |
| driver.killTask(task.task_id()); |
| |
| AWAIT_READY(statusKilling); |
| EXPECT_EQ(TASK_KILLING, statusKilling->state()); |
| |
| AWAIT_READY(statusKilled); |
| EXPECT_EQ(TASK_KILLED, statusKilled->state()); |
| |
| Future<Option<ContainerTermination>> termination = |
| containerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| #endif // __WINDOWS__ |
| |
| |
| #ifdef __linux__ |
| // This test ensures that when `cgroups_enable_cfs` is set on agent, |
| // the docker container launched through docker containerizer has |
| // `cpuQuotas` limit. |
| // Cgroups cpu quota is only available on Linux. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_CGROUPS_CFS_CgroupsEnableCFS) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.cgroups_enable_cfs = true; |
| flags.resources = "cpus:1;mem:128"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| // Find cgroups cpu hierarchy of the container and verifies |
| // quota is set. |
| string name = containerName(containerId.get()); |
| Future<Docker::Container> inspect = docker->inspect(name); |
| AWAIT_READY(inspect); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| ASSERT_SOME(cpuHierarchy); |
| |
| Option<pid_t> pid = inspect->pid; |
| ASSERT_SOME(pid); |
| |
| Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get()); |
| ASSERT_SOME(cpuCgroup); |
| |
| Try<Duration> cfsQuota = |
| cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), cpuCgroup.get()); |
| |
| ASSERT_SOME(cfsQuota); |
| |
| const Duration expectedCpuQuota = mesos::internal::slave::CPU_CFS_PERIOD * 1; |
| EXPECT_EQ(expectedCpuQuota, cfsQuota.get()); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| |
| |
| // This test verifies that a task launched with resource limits specified |
| // will have its CPU and memory's soft & hard limits and OOM score adjustment |
| // set correctly. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_CGROUPS_CFS_CommandTaskLimits) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| // Start agent with 2 CPUs and total host memory. |
| Try<os::Memory> memory = os::memory(); |
| ASSERT_SOME(memory); |
| |
| uint64_t totalMemInMB = memory->total.bytes() / 1024 / 1024; |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.resources = |
| strings::format("cpus:2;mem:%d", totalMemInMB).get(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Launch a task with 1 cpu request, 2 cpu limit, half of host total |
| // memory as memory request and host total memory as memory limit. |
| string resourceRequests = strings::format( |
| "cpus:1;mem:%d;disk:1024", |
| totalMemInMB/2).get(); |
| |
| Value::Scalar cpuLimit, memLimit; |
| cpuLimit.set_value(2); |
| memLimit.set_value(totalMemInMB); |
| |
| google::protobuf::Map<string, Value::Scalar> resourceLimits; |
| resourceLimits.insert({"cpus", cpuLimit}); |
| resourceLimits.insert({"mem", memLimit}); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| Resources::parse(resourceRequests).get(), |
| SLEEP_COMMAND(1000), |
| None(), |
| "test-task", |
| id::UUID::random().toString(), |
| resourceLimits); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| string name = containerName(containerId.get()); |
| Future<Docker::Container> inspect = docker->inspect(name); |
| AWAIT_READY(inspect); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| |
| ASSERT_SOME(cpuHierarchy); |
| ASSERT_SOME(memoryHierarchy); |
| |
| Option<pid_t> pid = inspect->pid; |
| ASSERT_SOME(pid); |
| |
| Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get()); |
| ASSERT_SOME(cpuCgroup); |
| |
| EXPECT_SOME_EQ( |
| mesos::internal::slave::CPU_SHARES_PER_CPU, |
| cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get())); |
| |
| Try<Duration> cfsQuota = |
| cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), cpuCgroup.get()); |
| |
| ASSERT_SOME(cfsQuota); |
| |
| const Duration expectedCpuQuota = |
| mesos::internal::slave::CPU_CFS_PERIOD * cpuLimit.value(); |
| |
| EXPECT_EQ(expectedCpuQuota, cfsQuota.get()); |
| |
| Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get()); |
| ASSERT_SOME(memoryCgroup); |
| |
| EXPECT_SOME_EQ( |
| Megabytes(totalMemInMB/2), |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), memoryCgroup.get())); |
| |
| EXPECT_SOME_EQ( |
| Megabytes(memLimit.value()), |
| cgroups::memory::limit_in_bytes( |
| memoryHierarchy.get(), memoryCgroup.get())); |
| |
| // Ensure the OOM score adjustment is correctly set for the container. |
| Try<string> read = os::read( |
| strings::format("/proc/%d/oom_score_adj", pid.get()).get()); |
| |
| ASSERT_SOME(read); |
| |
| // Since the memory request is half of host total memory so the OOM score |
| // adjustment should be about 500. |
| Try<int32_t> oomScoreAdj = numify<int32_t>(strings::trim(read.get())); |
| ASSERT_SOME(oomScoreAdj); |
| |
| EXPECT_GT(502, oomScoreAdj.get()); |
| EXPECT_LT(498, oomScoreAdj.get()); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| |
| |
| // This test verifies that a task launched with infinite resource |
| // limits specified will have its CPU and memory's hard limits set |
| // correctly to infinite values. |
| TEST_F( |
| DockerContainerizerTest, |
| ROOT_DOCKER_CGROUPS_CFS_CommandTaskInfiniteLimits) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Launch a task with infinite resource limits. |
| Value::Scalar cpuLimit, memLimit; |
| cpuLimit.set_value(std::numeric_limits<double>::infinity()); |
| memLimit.set_value(std::numeric_limits<double>::infinity()); |
| |
| google::protobuf::Map<string, Value::Scalar> resourceLimits; |
| resourceLimits.insert({"cpus", cpuLimit}); |
| resourceLimits.insert({"mem", memLimit}); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| offers.get()[0].resources(), |
| SLEEP_COMMAND(1000), |
| None(), |
| "test-task", |
| id::UUID::random().toString(), |
| resourceLimits); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| string name = containerName(containerId.get()); |
| Future<Docker::Container> inspect = docker->inspect(name); |
| AWAIT_READY(inspect); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| |
| ASSERT_SOME(cpuHierarchy); |
| ASSERT_SOME(memoryHierarchy); |
| |
| Option<pid_t> pid = inspect->pid; |
| ASSERT_SOME(pid); |
| |
| Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get()); |
| ASSERT_SOME(cpuCgroup); |
| |
| // The CFS quota should be -1 which means infinite quota. |
| Try<string> quota = |
| cgroups::read(cpuHierarchy.get(), cpuCgroup.get(), "cpu.cfs_quota_us"); |
| |
| ASSERT_SOME(quota); |
| EXPECT_EQ("-1", strings::trim(quota.get())); |
| |
| Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get()); |
| ASSERT_SOME(memoryCgroup); |
| |
| // Root cgroup (e.g., `/sys/fs/cgroup/memory/`) cannot have any limits set, so |
| // its hard limit must be infinity. |
| Try<Bytes> rootCgrouplimit = |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), ""); |
| |
| ASSERT_SOME(rootCgrouplimit); |
| |
| // The memory hard limit should be same as root cgroup's, i.e. infinity. |
| EXPECT_SOME_EQ( |
| rootCgrouplimit.get(), |
| cgroups::memory::limit_in_bytes( |
| memoryHierarchy.get(), memoryCgroup.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| #endif // __linux__ |
| |
| |
| #ifndef __WINDOWS__ |
| // Run a task as non root while inheriting this ownership from the |
| // framework supplied default user. Tests if the sandbox "stdout" |
| // is correctly owned and writeable by the tasks user. |
| // This test isn't run on Windows, because the `switch_user` flag |
| // isn't supported. |
| TEST_F(DockerContainerizerTest, |
| ROOT_DOCKER_UNPRIVILEGED_USER_NonRootSandbox) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| Shared<Docker> docker( |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket)); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.switch_user = true; |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| SlaveID slaveId = slaveRegisteredMessage->slave_id(); |
| |
| Option<string> user = os::getenv("SUDO_USER"); |
| ASSERT_SOME(user); |
| |
| FrameworkInfo framework; |
| framework.set_name("default"); |
| framework.set_user(user.get()); |
| framework.set_principal(DEFAULT_CREDENTIAL.principal()); |
| framework.add_capabilities()->set_type( |
| FrameworkInfo::Capability::RESERVATION_REFINEMENT); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Start the task as a user without supplying an explicit command |
| // user. This should inherit the framework user for the task |
| // ownership. |
| CommandInfo command; |
| command.set_value("echo \"foo\" && sleep 1000"); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| command); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| |
| // Check that the sandbox was written to. |
| const string sandboxDirectory = slave::paths::getExecutorRunPath( |
| flags.work_dir, |
| slaveId, |
| frameworkId.get(), |
| statusRunning->executor_id(), |
| containerId.get()); |
| |
| ASSERT_TRUE(os::exists(sandboxDirectory)); |
| |
| // Check the sandbox "stdout" was written to. |
| const string stdoutPath = path::join(sandboxDirectory, "stdout"); |
| ASSERT_TRUE(os::exists(stdoutPath)); |
| |
| // Check the sandbox "stdout" is owned by the framework default user. |
| struct stat stdoutStat; |
| ASSERT_GE(::stat(stdoutPath.c_str(), &stdoutStat), 0); |
| |
| Result<uid_t> uid = os::getuid(framework.user()); |
| ASSERT_SOME(uid); |
| |
| ASSERT_EQ(stdoutStat.st_uid, uid.get()); |
| |
| // Validate that our task was able to log into the sandbox. |
| Result<string> stdout = os::read(stdoutPath); |
| ASSERT_SOME(stdout); |
| |
| EXPECT_TRUE(strings::contains(stdout.get(), "foo")); |
| } |
| #endif // __WINDOWS__ |
| |
| |
| // This test verifies the DNS configuration of the Docker container |
| // can be successfully set with the agent flag `--default_container_dns`. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_DefaultDNS) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| #ifdef __WINDOWS__ |
| // --dns-option and --dns-search are not supported on Windows. |
| // See https://docs.microsoft.com/en-us/virtualization/windowscontainers/manage-containers/container-networking // NOLINT(whitespace/line_length) |
| Try<ContainerDNSInfo> parse = flags::parse<ContainerDNSInfo>( |
| R"~( |
| { |
| "docker": [ |
| { |
| "network_mode": "BRIDGE", |
| "dns": { |
| "nameservers": [ "8.8.8.8", "8.8.4.4" ] |
| } |
| } |
| ] |
| })~"); |
| #else |
| Try<ContainerDNSInfo> parse = flags::parse<ContainerDNSInfo>( |
| R"~( |
| { |
| "docker": [ |
| { |
| "network_mode": "BRIDGE", |
| "dns": { |
| "nameservers": [ "8.8.8.8", "8.8.4.4" ], |
| "search": [ "example1.com", "example2.com" ], |
| "options": [ "timeout:3", "attempts:2" ] |
| } |
| } |
| ] |
| })~"); |
| #endif // __WINDOWS__ |
| |
| ASSERT_SOME(parse); |
| |
| flags.default_container_dns = parse.get(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo(DOCKER_TEST_IMAGE); |
| |
| containerInfo.mutable_docker()->set_network( |
| ContainerInfo::DockerInfo::BRIDGE); |
| |
| task.mutable_container()->CopyFrom(containerInfo); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| // Find the DNS configuration of the container and verify |
| // if it is consistent with `flags.default_container_dns`. |
| string name = containerName(containerId.get()); |
| Future<Docker::Container> inspect = docker->inspect(name); |
| AWAIT_READY(inspect); |
| |
| vector<string> defaultDNS; |
| std::copy( |
| flags.default_container_dns->docker(0).dns().nameservers().begin(), |
| flags.default_container_dns->docker(0).dns().nameservers().end(), |
| std::back_inserter(defaultDNS)); |
| |
| EXPECT_EQ(inspect->dns, defaultDNS); |
| |
| #ifndef __WINDOWS__ |
| vector<string> defaultDNSSearch; |
| std::copy( |
| flags.default_container_dns->docker(0).dns().search().begin(), |
| flags.default_container_dns->docker(0).dns().search().end(), |
| std::back_inserter(defaultDNSSearch)); |
| |
| EXPECT_EQ(inspect->dnsSearch, defaultDNSSearch); |
| |
| vector<string> defaultDNSOption; |
| std::copy( |
| flags.default_container_dns->docker(0).dns().options().begin(), |
| flags.default_container_dns->docker(0).dns().options().end(), |
| std::back_inserter(defaultDNSOption)); |
| |
| EXPECT_EQ(inspect->dnsOptions, defaultDNSOption); |
| #endif // __WINDOWS__ |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| } |
| |
| |
| // This test verifies that the `MESOS_ALLOCATION_ROLE` |
| // environment variable is set properly. |
| TEST_F(DockerContainerizerTest, ROOT_DOCKER_AllocationRoleEnvironmentVariable) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| // We skip stopping the docker container because stopping a container |
| // even when it terminated might not flush the logs and we end up |
| // not getting stdout/stderr in our tests. |
| EXPECT_CALL(*mockDocker, stop(_, _, _)) |
| .WillRepeatedly(Return(Nothing())); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| // Start the framework with a role specified. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.clear_roles(); |
| frameworkInfo.add_roles("role1"); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| offers->front().resources(), |
| #ifdef __WINDOWS__ |
| "if %MESOS_ALLOCATION_ROLE% == \"role1\" (exit 1)"); |
| #else |
| "if [ \"$MESOS_ALLOCATION_ROLE\" != \"role1\" ]; then exit 1; fi"); |
| #endif // __WINDOWS__ |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| Future<ContainerConfig> containerConfig; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| FutureArg<1>(&containerConfig), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY(containerConfig); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| AWAIT_READY_FOR(statusFinished, Seconds(60)); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Fixture for testing IPv6 support for docker containers on host network. |
| // |
| // TODO(akagup): Windows containers do not support IPv6, but they should |
| // in the future, so enable these when IPv6 is supported. See MESOS-8566. |
| // |
| // TODO(asridharan): Currently in the `Setup` and `TearDown` methods |
| // of this class we re-initialize libprocess to take an IPv6 address. |
| // Ideally, we should be moving this into a more general test fixture |
| // in mesos.hpp to be used by any other tests for IPv6. This might |
| // need changes to `MesosTest` in to order to allow for multiple |
| // inheritance. |
| class DockerContainerizerIPv6Test : public DockerContainerizerTest |
| { |
| protected: |
| void SetUp() override |
| { |
| os::setenv("LIBPROCESS_IP6", "::1234"); |
| process::reinitialize( |
| None(), |
| READWRITE_HTTP_AUTHENTICATION_REALM, |
| READONLY_HTTP_AUTHENTICATION_REALM); |
| |
| DockerContainerizerTest::SetUp(); |
| } |
| |
| void TearDown() override |
| { |
| DockerContainerizerTest::TearDown(); |
| |
| os::unsetenv("LIBPROCESS_IP6"); |
| process::reinitialize( |
| None(), |
| READWRITE_HTTP_AUTHENTICATION_REALM, |
| READONLY_HTTP_AUTHENTICATION_REALM); |
| } |
| }; |
| |
| |
| // Launches a docker container on the host network. The host network |
| // is assumed to have an IPv4 address and an IPv6 address. The test |
| // passes if the Mesos state correctly exposes both the IPv4 and IPv6 |
| // address. |
| TEST_F_TEMP_DISABLED_ON_WINDOWS( |
| DockerContainerizerIPv6Test, |
| ROOT_DOCKER_LaunchIPv6HostNetwork) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| |
| ASSERT_SOME(slave); |
| |
| // Check if the slave has the IPv6 address stored in its PID. |
| EXPECT_SOME(slave.get()->pid.addresses.v6); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| const Offer& offer = offers.get()[0]; |
| |
| TaskInfo task = createTask( |
| offer.slave_id(), |
| offer.resources(), |
| SLEEP_COMMAND(10000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo("alpine")); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| Try<JSON::Array> array = JSON::parse<JSON::Array>(statusRunning->data()); |
| ASSERT_SOME(array); |
| |
| // Check if container information is exposed through master's state endpoint. |
| Future<http::Response> response = http::get( |
| master.get()->pid, |
| "state", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); |
| ASSERT_SOME(parse); |
| |
| Result<JSON::Value> find = parse->find<JSON::Value>( |
| "frameworks[0].tasks[0].container.docker.privileged"); |
| |
| EXPECT_SOME_FALSE(find); |
| |
| // Check if container information is exposed through slave's state endpoint. |
| response = http::get( |
| slave.get()->pid, |
| "state", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); |
| |
| parse = JSON::parse<JSON::Object>(response->body); |
| ASSERT_SOME(parse); |
| |
| find = parse->find<JSON::Value>( |
| "frameworks[0].executors[0].tasks[0].container.docker.privileged"); |
| |
| EXPECT_SOME_FALSE(find); |
| |
| // Now verify the ContainerStatus fields in the TaskStatus. |
| ASSERT_TRUE(statusRunning->has_container_status()); |
| EXPECT_TRUE(statusRunning->container_status().has_container_id()); |
| ASSERT_EQ(1, statusRunning->container_status().network_infos().size()); |
| EXPECT_EQ(2, statusRunning->container_status().network_infos(0).ip_addresses().size()); // NOLINT(whitespace/line_length) |
| |
| Option<string> containerIPv4 = None(); |
| Option<string> containerIPv6 = None(); |
| |
| foreach(const NetworkInfo::IPAddress& ipAddress, |
| statusRunning->container_status().network_infos(0).ip_addresses()) { |
| if (ipAddress.protocol() == NetworkInfo::IPv4) { |
| containerIPv4 = ipAddress.ip_address(); |
| } |
| |
| if (ipAddress.protocol() == NetworkInfo::IPv6) { |
| containerIPv6 = ipAddress.ip_address(); |
| } |
| } |
| |
| EXPECT_SOME(containerIPv4); |
| ASSERT_SOME(containerIPv6); |
| EXPECT_EQ(containerIPv6.get(), "::1234"); |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| } |
| |
| |
| // Fixture for testing IPv6 support for docker containers on docker |
| // user network. |
| class DockerContainerizerIPv6UserNetworkTest : public DockerContainerizerTest |
| { |
| protected: |
| void SetUp() override |
| { |
| createDockerIPv6UserNetwork(); |
| DockerContainerizerTest::SetUp(); |
| } |
| |
| void TearDown() override |
| { |
| DockerContainerizerTest::TearDown(); |
| removeDockerIPv6UserNetwork(); |
| } |
| }; |
| |
| |
| // Launches a docker container on the docker user network. The docker network |
| // is assumed to have an IPv4 address and an IPv6 address. The test passes if |
| // the Mesos state correctly exposes both the IPv4 and IPv6 address. |
| TEST_F_TEMP_DISABLED_ON_WINDOWS( |
| DockerContainerizerIPv6UserNetworkTest, |
| ROOT_DOCKER_USERNETWORK_LaunchIPv6Container) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| const Offer& offer = offers.get()[0]; |
| |
| TaskInfo task = createTask( |
| offer.slave_id(), |
| offer.resources(), |
| SLEEP_COMMAND(10000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| ContainerInfo containerInfo = createDockerInfo("alpine"); |
| |
| containerInfo.mutable_docker()->set_network( |
| ContainerInfo::DockerInfo::USER); |
| |
| // Setup the docker IPv6 network. |
| NetworkInfo networkInfo; |
| networkInfo.set_name(DOCKER_IPv6_NETWORK); |
| containerInfo.add_network_infos()->CopyFrom(networkInfo); |
| |
| task.mutable_container()->CopyFrom(containerInfo); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| ASSERT_TRUE(statusRunning->has_data()); |
| |
| // Now verify the ContainerStatus fields in the TaskStatus. |
| ASSERT_TRUE(statusRunning->has_container_status()); |
| EXPECT_TRUE(statusRunning->container_status().has_container_id()); |
| ASSERT_EQ(1, statusRunning->container_status().network_infos().size()); |
| ASSERT_EQ(2, statusRunning->container_status().network_infos(0).ip_addresses().size()); // NOLINT(whitespace/line_length) |
| |
| Option<string> containerIPv4 = None(); |
| Option<string> containerIPv6 = None(); |
| |
| foreach(const NetworkInfo::IPAddress& ipAddress, |
| statusRunning->container_status().network_infos(0).ip_addresses()) { |
| if (ipAddress.protocol() == NetworkInfo::IPv4) { |
| containerIPv4 = ipAddress.ip_address(); |
| } |
| |
| if (ipAddress.protocol() == NetworkInfo::IPv6) { |
| containerIPv6 = ipAddress.ip_address(); |
| } |
| } |
| |
| ASSERT_SOME(containerIPv4); |
| ASSERT_SOME(containerIPv6); |
| |
| // Check if container information is exposed through slave's state endpoint. |
| Future<http::Response> response = http::get( |
| slave.get()->pid, |
| "state", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); |
| ASSERT_SOME(parse); |
| |
| // Verify that the slave state information has the same container |
| // status as received in the status update message. |
| for (int i = 0; i < 2; i++) { |
| Result<JSON::String> protocol = parse->find<JSON::String>( |
| "frameworks[0].executors[0].tasks[0].statuses[1]" |
| ".container_status.network_infos[0].ip_addresses[" + |
| stringify(i) + "].protocol"); |
| |
| ASSERT_SOME(protocol); |
| |
| Result<JSON::String> ip = parse->find<JSON::String>( |
| "frameworks[0].executors[0].tasks[0].statuses[1]" |
| ".container_status.network_infos[0].ip_addresses[" + |
| stringify(i) + "].ip_address"); |
| |
| ASSERT_SOME(ip); |
| |
| if (protocol->value == "IPv4") { |
| EXPECT_EQ(ip->value, containerIPv4.get()); |
| } else { |
| EXPECT_EQ(ip->value, containerIPv6.get()); |
| } |
| |
| LOG(INFO) << "IP: " << ip->value; |
| } |
| |
| ASSERT_TRUE(exists(docker, containerId.get())); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| } |
| |
| |
| class HungDockerTest : public DockerContainerizerTest |
| { |
| public: |
| const string testDockerBinary = "docker"; |
| #ifdef __WINDOWS__ |
| const string testDockerScript = "test-docker.bat"; |
| const string testDockerEnvFile = "test-docker-env.bat"; |
| #else |
| const string testDockerScript = "test-docker.sh"; |
| const string testDockerEnvFile = "test-docker.env"; |
| #endif // __WINDOWS__ |
| |
| string commandsEnv; |
| string delayEnv; |
| |
| slave::Flags CreateSlaveFlags() override |
| { |
| slave::Flags flags = MesosTest::CreateSlaveFlags(); |
| |
| flags.docker = path::join(os::getcwd(), testDockerScript); |
| |
| return flags; |
| } |
| |
| void writeEnv() |
| { |
| // TODO(greggomann): This write operation is not atomic, which means an |
| // ill-timed write may cause the shell script to be invoked when this |
| // file is in an unintended state. We should make this atomic. |
| |
| Try<Nothing> write = |
| #ifdef __WINDOWS__ |
| os::write(testDockerEnvFile, commandsEnv + "\r\n" + delayEnv); |
| #else |
| os::write(testDockerEnvFile, commandsEnv + "\n" + delayEnv); |
| #endif // __WINDOWS__ |
| |
| ASSERT_SOME(write); |
| } |
| |
| void setDelayedCommands(const std::vector<string>& commands) |
| { |
| #ifdef __WINDOWS__ |
| commandsEnv = "set "; |
| #else |
| commandsEnv = ""; |
| #endif // __WINDOWS__ |
| |
| commandsEnv += "DELAYED_COMMANDS=( "; |
| foreach (const string& command, commands) { |
| commandsEnv += (command + " "); |
| } |
| commandsEnv += ")"; |
| |
| writeEnv(); |
| } |
| |
| void setDelay(const int seconds) |
| { |
| #ifdef __WINDOWS__ |
| delayEnv = "set "; |
| #else |
| delayEnv = ""; |
| #endif // __WINDOWS__ |
| |
| delayEnv += "DELAY_SECONDS=" + stringify(seconds); |
| |
| writeEnv(); |
| } |
| |
| void SetUp() override |
| { |
| DockerContainerizerTest::SetUp(); |
| |
| // Write a wrapper script which allows us to delay Docker commands. |
| #ifdef __WINDOWS__ |
| const string dockerScriptText = |
| "@echo off\r\n" |
| "setlocal enabledelayedexpansion\r\n" |
| "call \"" + path::join(os::getcwd(), testDockerEnvFile) + "\"\r\n" |
| "set ACTIVE_COMMAND=%3\r\n" |
| "if not defined DELAYED_COMMANDS set DELAYED_COMMANDS=()\r\n" |
| "for %%G in %DELAYED_COMMANDS% do (\r\n" |
| " if %ACTIVE_COMMAND% == %%G (\r\n" |
| " ping -n %DELAY_SECONDS% 127.0.0.1 > NUL\r\n" |
| " )\r\n" |
| ")\r\n" + |
| testDockerBinary + " %*\r\n"; |
| #else |
| const string dockerScriptText = |
| "#!/usr/bin/env bash\n" |
| "source " + stringify(path::join(os::getcwd(), testDockerEnvFile)) + "\n" |
| "ACTIVE_COMMAND=$3\n" |
| "for DELAYED_COMMAND in \"${DELAYED_COMMANDS[@]}\"; do\n" |
| " if [ \"$ACTIVE_COMMAND\" == \"$DELAYED_COMMAND\" ]; then\n" |
| " sleep $DELAY_SECONDS\n" |
| " fi\n" |
| "done\n" + |
| testDockerBinary + " \"$@\"\n"; |
| #endif // __WINDOWS__ |
| |
| Try<Nothing> write = os::write(testDockerScript, dockerScriptText); |
| ASSERT_SOME(write); |
| |
| #ifndef __WINDOWS__ |
| Try<Nothing> chmod = os::chmod( |
| testDockerScript, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); |
| ASSERT_SOME(chmod); |
| #endif // __WINDOWS__ |
| |
| // Set a very long delay by default to simulate an indefinitely |
| // hung Docker daemon. |
| setDelay(999999); |
| } |
| }; |
| |
| |
| TEST_F(HungDockerTest, ROOT_DOCKER_InspectHungDuringPull) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| // When the 'executor_registration_timeout' elapses, the agent will destroy |
| // the container whose 'docker pull' command is stuck. This should cause the |
| // launch to fail and the terminal task status update to be sent. |
| flags.executor_registration_timeout = Milliseconds(100); |
| |
| MockDocker* mockDocker = |
| new MockDocker(flags.docker, tests::flags.docker_socket); |
| Shared<Docker> docker(mockDocker); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Causing the 'docker inspect' call preceding the container pull to hang |
| // should result in a TASK_FAILED update. |
| setDelayedCommands({"inspect"}); |
| |
| const Offer& offer = offers.get()[0]; |
| |
| TaskInfo task = createTask( |
| offer.slave_id(), |
| offer.resources(), |
| SLEEP_COMMAND(1000)); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| task.mutable_container()->CopyFrom(createDockerInfo(DOCKER_TEST_IMAGE)); |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<TaskStatus> statusFailed; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusFailed)) |
| .WillRepeatedly(DoDefault()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusFailed); |
| EXPECT_EQ(TASK_FAILED, statusFailed->state()); |
| EXPECT_EQ( |
| TaskStatus::REASON_CONTAINER_LAUNCH_FAILED, |
| statusFailed->reason()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test is disabled on windows due to the bash-specific |
| // command used in the task below. |
| TEST_F_TEMP_DISABLED_ON_WINDOWS( |
| DockerContainerizerTest, ROOT_DOCKER_OverrideKillPolicy) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockDocker* mockDocker = |
| new MockDocker(tests::flags.docker, tests::flags.docker_socket); |
| |
| Shared<Docker> docker(mockDocker); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<ContainerLogger*> logger = |
| ContainerLogger::create(flags.container_logger); |
| |
| ASSERT_SOME(logger); |
| |
| Future<SlaveRegisteredMessage> slaveRegisteredMessage = |
| FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); |
| |
| MockDockerContainerizer dockerContainerizer( |
| flags, |
| &fetcher, |
| Owned<ContainerLogger>(logger.get()), |
| docker); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), &dockerContainerizer, flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(slaveRegisteredMessage); |
| |
| auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); |
| |
| EXPECT_CALL(*scheduler, connected(_)) |
| .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); |
| |
| Future<v1::scheduler::Event::Subscribed> subscribed; |
| EXPECT_CALL(*scheduler, subscribed(_, _)) |
| .WillOnce(FutureArg<1>(&subscribed)); |
| |
| Future<v1::scheduler::Event::Offers> offers; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(*scheduler, heartbeat(_)) |
| .WillRepeatedly(Return()); // Ignore heartbeats. |
| |
| v1::scheduler::TestMesos mesos( |
| master.get()->pid, |
| ContentType::PROTOBUF, |
| scheduler); |
| |
| AWAIT_READY(subscribed); |
| v1::FrameworkID frameworkId(subscribed->framework_id()); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->offers().empty()); |
| |
| const v1::Offer& offer = offers->offers(0); |
| const v1::AgentID& agentId = offer.agent_id(); |
| |
| Try<v1::Resources> parsed = |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32"); |
| |
| ASSERT_SOME(parsed); |
| |
| v1::Resources resources = parsed.get(); |
| |
| // Create a task which ignores SIGTERM so that we can detect |
| // when the task receives SIGKILL. |
| v1::TaskInfo taskInfo = v1::createTask( |
| agentId, |
| resources, |
| "trap \"echo 'SIGTERM received'\" SIGTERM; sleep 999999"); |
| |
| // TODO(tnachen): Use local image to test if possible. |
| taskInfo.mutable_container()->CopyFrom( |
| evolve(createDockerInfo(DOCKER_TEST_IMAGE))); |
| |
| { |
| // Set a long grace period on the task's kill policy so that we |
| // can detect if the override is effective. |
| mesos::v1::DurationInfo gracePeriod; |
| gracePeriod.set_nanoseconds(Minutes(10).ns()); |
| |
| mesos::v1::KillPolicy killPolicy; |
| killPolicy.mutable_grace_period()->CopyFrom(gracePeriod); |
| |
| taskInfo.mutable_kill_policy()->CopyFrom(killPolicy); |
| } |
| |
| Future<ContainerID> containerId; |
| EXPECT_CALL(dockerContainerizer, launch(_, _, _, _)) |
| .WillOnce(DoAll(FutureArg<0>(&containerId), |
| Invoke(&dockerContainerizer, |
| &MockDockerContainerizer::_launch))); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate; |
| Future<v1::scheduler::Event::Update> runningUpdate; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(DoAll( |
| FutureArg<1>(&startingUpdate), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))) |
| .WillOnce(DoAll( |
| FutureArg<1>(&runningUpdate), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| mesos.send( |
| v1::createCallAccept( |
| frameworkId, |
| offer, |
| {v1::LAUNCH({taskInfo})})); |
| |
| AWAIT_READY_FOR(containerId, Seconds(60)); |
| AWAIT_READY_FOR(startingUpdate, Seconds(60)); |
| EXPECT_EQ(v1::TASK_STARTING, startingUpdate->status().state()); |
| AWAIT_READY_FOR(runningUpdate, Seconds(60)); |
| EXPECT_EQ(v1::TASK_RUNNING, runningUpdate->status().state()); |
| |
| ASSERT_TRUE( |
| exists(docker, containerId.get(), ContainerState::RUNNING)); |
| |
| Future<v1::scheduler::Event::Update> killedUpdate; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(FutureArg<1>(&killedUpdate)); |
| |
| Future<Option<ContainerTermination>> termination = |
| dockerContainerizer.wait(containerId.get()); |
| |
| { |
| // Set a short grace period on the kill call so that we |
| // can detect if the override is effective. |
| mesos::v1::DurationInfo gracePeriod; |
| gracePeriod.set_nanoseconds(100); |
| |
| mesos::v1::KillPolicy killPolicy; |
| killPolicy.mutable_grace_period()->CopyFrom(gracePeriod); |
| |
| mesos.send( |
| v1::createCallKill( |
| frameworkId, |
| taskInfo.task_id(), |
| agentId, |
| killPolicy)); |
| } |
| |
| AWAIT_READY(killedUpdate); |
| EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); |
| |
| AWAIT_READY(termination); |
| EXPECT_SOME(termination.get()); |
| |
| // Even though the task is killed, the executor should exit gracefully. |
| ASSERT_TRUE(termination.get()->has_status()); |
| EXPECT_EQ(0, termination.get()->status()); |
| |
| ASSERT_FALSE( |
| exists(docker, containerId.get(), ContainerState::RUNNING, false)); |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |