| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <limits> |
| |
| #include <process/gmock.hpp> |
| #include <process/gtest.hpp> |
| #include <process/queue.hpp> |
| |
| #include <stout/format.hpp> |
| #include <stout/gtest.hpp> |
| |
| #include <stout/os/exec.hpp> |
| |
| #include <mesos/v1/scheduler.hpp> |
| |
| #include "slave/gc_process.hpp" |
| |
| #include "slave/containerizer/mesos/containerizer.hpp" |
| #include "slave/containerizer/mesos/paths.hpp" |
| |
| #include "linux/cgroups2.hpp" |
| |
| #include "slave/containerizer/mesos/isolators/cgroups2/constants.hpp" |
| #include "slave/containerizer/mesos/isolators/cgroups/constants.hpp" |
| #include "slave/containerizer/mesos/isolators/cgroups/subsystems/net_cls.hpp" |
| |
| #include "tests/mesos.hpp" |
| #include "tests/mock_slave.hpp" |
| #include "tests/resources_utils.hpp" |
| #include "tests/script.hpp" |
| |
| #include "tests/containerizer/docker_archive.hpp" |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_BLKIO_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_CPU_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_CPUACCT_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_CPUSET_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_DEVICES_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_HUGETLB_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_MEMORY_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_NET_CLS_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_NET_PRIO_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_PERF_EVENT_NAME; |
| using mesos::internal::slave::CGROUP_SUBSYSTEM_PIDS_NAME; |
| using mesos::internal::slave::CPU_SHARES_PER_CPU; |
| using mesos::internal::slave::CPU_SHARES_PER_CPU_REVOCABLE; |
| using mesos::internal::slave::CPU_CFS_PERIOD; |
| using mesos::internal::slave::DEFAULT_EXECUTOR_CPUS; |
| using mesos::internal::slave::DEFAULT_EXECUTOR_MEM; |
| using mesos::internal::slave::CGROUPS2_CPU_WEIGHT_PER_CPU_REVOCABLE; |
| |
| using mesos::internal::slave::Containerizer; |
| using mesos::internal::slave::Fetcher; |
| using mesos::internal::slave::MesosContainerizer; |
| using mesos::internal::slave::MesosContainerizerProcess; |
| using mesos::internal::slave::NetClsHandle; |
| using mesos::internal::slave::NetClsHandleManager; |
| using mesos::internal::slave::Slave; |
| |
| using mesos::internal::slave::containerizer::paths::getCgroupPath; |
| |
| using mesos::master::detector::MasterDetector; |
| |
| using mesos::v1::scheduler::Call; |
| using mesos::v1::scheduler::Event; |
| |
| using process::Clock; |
| using process::Future; |
| using process::Owned; |
| using process::Queue; |
| |
| using process::http::OK; |
| using process::http::Response; |
| |
| using std::set; |
| using std::string; |
| using std::vector; |
| |
| using testing::_; |
| using testing::AllOf; |
| using testing::DoAll; |
| using testing::InvokeWithoutArgs; |
| using testing::Return; |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| |
| // Run the balloon framework under a mesos containerizer. |
| TEST_SCRIPT(ContainerizerTest, |
| ROOT_CGROUPS_BalloonFramework, |
| "balloon_framework_test.sh") |
| |
| |
| class CgroupsIsolatorTest |
| : public ContainerizerTest<MesosContainerizer> {}; |
| |
| |
| // This test starts the agent with cgroups isolation and launches a |
| // task with an unprivileged user. Then verifies that the unprivileged |
| // user has write permission under the corresponding cgroups which are |
| // prepared for the container to run the task. |
| TEST_F(CgroupsIsolatorTest, |
| ROOT_CGROUPS_PERF_NET_CLS_UNPRIVILEGED_USER_UserCgroup) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| const string registry = path::join(os::getcwd(), "registry"); |
| |
| Future<Nothing> testImage = DockerArchive::create(registry, "alpine"); |
| AWAIT_READY(testImage); |
| |
| ASSERT_TRUE(os::exists(path::join(registry, "alpine.tar"))); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.docker_registry = registry; |
| flags.docker_store_dir = path::join(os::getcwd(), "store"); |
| flags.image_providers = "docker"; |
| flags.perf_events = "cpu-cycles"; // Needed for `PerfEventSubsystem`. |
| flags.isolation = |
| "cgroups/cpu," |
| "cgroups/devices," |
| "cgroups/mem," |
| "cgroups/net_cls," |
| "cgroups/perf_event," |
| "docker/runtime," |
| "filesystem/linux"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get()); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| Option<string> user = os::getenv("SUDO_USER"); |
| ASSERT_SOME(user); |
| |
| // Launch a task with the command executor. |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("/bin/sleep"); |
| command.add_arguments("sleep"); |
| command.add_arguments("120"); |
| command.set_user(user.get()); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| offers.get()[0].resources(), |
| command); |
| |
| Image image; |
| image.set_type(Image::DOCKER); |
| image.mutable_docker()->set_name("alpine"); |
| |
| ContainerInfo* container = task.mutable_container(); |
| container->set_type(ContainerInfo::MESOS); |
| container->mutable_mesos()->mutable_image()->CopyFrom(image); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(Return()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY_FOR(statusStarting, Seconds(60)); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY_FOR(statusRunning, Seconds(60)); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| vector<string> subsystems = { |
| CGROUP_SUBSYSTEM_CPU_NAME, |
| CGROUP_SUBSYSTEM_CPUACCT_NAME, |
| CGROUP_SUBSYSTEM_DEVICES_NAME, |
| CGROUP_SUBSYSTEM_MEMORY_NAME, |
| CGROUP_SUBSYSTEM_NET_CLS_NAME, |
| CGROUP_SUBSYSTEM_PERF_EVENT_NAME, |
| }; |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| foreach (const string& subsystem, subsystems) { |
| Result<string> hierarchy = cgroups::hierarchy(subsystem); |
| ASSERT_SOME(hierarchy); |
| |
| string cgroup = path::join(flags.cgroups_root, containerId.value()); |
| |
| // Verify that the user cannot manipulate the container's cgroup |
| // control files as their owner is root. |
| EXPECT_SOME_NE(0, os::system(strings::format( |
| "su - %s -s /bin/sh -c 'echo $$ > %s'", |
| user.get(), |
| path::join(hierarchy.get(), cgroup, "cgroup.procs")).get())); |
| |
| // Verify that the user can create a cgroup under the container's |
| // cgroup as the isolator changes the owner of the cgroup. |
| string userCgroup = path::join(cgroup, "user"); |
| |
| EXPECT_SOME_EQ(0, os::system(strings::format( |
| "su - %s -s /bin/sh -c 'mkdir %s'", |
| user.get(), |
| path::join(hierarchy.get(), userCgroup)).get())); |
| |
| // Verify that the user can manipulate control files in the |
| // created cgroup as it's owned by the user. |
| EXPECT_SOME_EQ(0, os::system(strings::format( |
| "su - %s -s /bin/sh -c 'echo $$ > %s'", |
| user.get(), |
| path::join(hierarchy.get(), userCgroup, "cgroup.procs")).get())); |
| |
| // Clear up the folder. |
| AWAIT_READY(cgroups::destroy(hierarchy.get(), userCgroup)); |
| } |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_RevocableCpu) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| MockResourceEstimator resourceEstimator; |
| EXPECT_CALL(resourceEstimator, initialize(_)); |
| |
| Queue<Resources> estimations; |
| EXPECT_CALL(resourceEstimator, oversubscribable()) |
| .WillRepeatedly(InvokeWithoutArgs(&estimations, &Queue<Resources>::get)); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| &resourceEstimator, |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| // Start the framework which accepts revocable resources. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.add_capabilities()->set_type( |
| FrameworkInfo::Capability::REVOCABLE_RESOURCES); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| frameworkInfo, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers1; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| driver.start(); |
| |
| // Initially the framework will get all regular resources. |
| AWAIT_READY(offers1); |
| ASSERT_FALSE(offers1->empty()); |
| EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty()); |
| |
| Future<vector<Offer>> offers2; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| // Inject an estimation of revocable cpu resources. |
| Resource cpu = Resources::parse("cpus", "1", "*").get(); |
| cpu.mutable_revocable(); |
| Resources cpus(cpu); |
| estimations.put(cpus); |
| |
| // Now the framework will get revocable resources. |
| AWAIT_READY(offers2); |
| ASSERT_FALSE(offers2->empty()); |
| EXPECT_EQ(allocatedResources(cpus, frameworkInfo.roles(0)), |
| Resources(offers2.get()[0].resources())); |
| |
| TaskInfo task = createTask( |
| offers2.get()[0].slave_id(), |
| cpus, |
| "sleep 120"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers2.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| string cpuCgroup = path::join(flags.cgroups_root, containerId.value()); |
| |
| double totalCpus = cpus.cpus().get() + DEFAULT_EXECUTOR_CPUS; |
| #ifdef ENABLE_CGROUPS_V2 |
| EXPECT_SOME_EQ( |
| static_cast<uint64_t>(CGROUPS2_CPU_WEIGHT_PER_CPU_REVOCABLE * totalCpus), |
| cgroups2::cpu::weight(cpuCgroup)); |
| #else |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| ASSERT_SOME(cpuHierarchy); |
| |
| EXPECT_SOME_EQ( |
| CPU_SHARES_PER_CPU_REVOCABLE * totalCpus, |
| cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup)); |
| #endif // ENABLE_CGROUPS_V2 |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies that a task launched with 0.5 cpu and 32MB memory as its |
| // resource requests (but no resource limits specified) will have its CPU and |
| // memory's soft & hard limits and OOM score adjustment set correctly. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_CommandTaskNoLimits) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| |
| // Enable CFS to cap CPU utilization. |
| flags.cgroups_enable_cfs = true; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get()); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // We will launch a task with 0.5 cpu and 32MB memory, and the command |
| // executor will be given 0.1 cpu (`DEFAULT_EXECUTOR_CPUS`) and 32MB |
| // memory (DEFAULT_EXECUTOR_MEM) by default, so we need 0.6 cpu and 64MB |
| // in total. |
| ASSERT_GE( |
| Resources(offers.get()[0].resources()).cpus().get(), |
| 0.5 + DEFAULT_EXECUTOR_CPUS); |
| |
| ASSERT_GE( |
| Resources(offers.get()[0].resources()).mem().get(), |
| Megabytes(32) + DEFAULT_EXECUTOR_MEM); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| Resources::parse("cpus:0.5;mem:32").get(), |
| SLEEP_COMMAND(1000)); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| ASSERT_SOME(cpuHierarchy); |
| |
| Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| ASSERT_SOME(memoryHierarchy); |
| |
| string cgroup = path::join(flags.cgroups_root, containerId.value()); |
| |
| // Ensure the CPU shares and CFS quota are correctly set for the container. |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * (0.5 + DEFAULT_EXECUTOR_CPUS)), |
| cgroups::cpu::shares(cpuHierarchy.get(), cgroup)); |
| |
| Try<Duration> cfsQuota = |
| cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), cgroup); |
| |
| ASSERT_SOME(cfsQuota); |
| |
| double expectedCFSQuota = (0.5 + DEFAULT_EXECUTOR_CPUS) * CPU_CFS_PERIOD.ms(); |
| EXPECT_EQ(expectedCFSQuota, cfsQuota->ms()); |
| |
| // Ensure the memory soft and hard limits are correctly set for the container. |
| EXPECT_SOME_EQ( |
| Megabytes(32) + DEFAULT_EXECUTOR_MEM, |
| cgroups::memory::soft_limit_in_bytes(memoryHierarchy.get(), cgroup)); |
| |
| EXPECT_SOME_EQ( |
| Megabytes(32) + DEFAULT_EXECUTOR_MEM, |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), cgroup)); |
| |
| Future<ContainerStatus> status = containerizer->status(containerId); |
| AWAIT_READY(status); |
| ASSERT_TRUE(status->has_executor_pid()); |
| |
| // Ensure the OOM score adjustment is set to the default value (i.e. 0). |
| Try<string> read = os::read( |
| strings::format("/proc/%d/oom_score_adj", status->executor_pid()).get()); |
| |
| ASSERT_SOME(read); |
| |
| Try<int32_t> oomScoreAdj = numify<int32_t>(strings::trim(read.get())); |
| ASSERT_SOME_EQ(0, oomScoreAdj); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies that a task launched with resource limits specified |
| // will have its CPU and memory's soft & hard limits and OOM score adjustment |
| // set correctly. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_CommandTaskLimits) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Start agent with 2 CPUs, total host memory and 1024MB disk. |
| Try<os::Memory> memory = os::memory(); |
| ASSERT_SOME(memory); |
| |
| uint64_t totalMemInMB = memory->total.bytes() / 1024 / 1024; |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| flags.resources = |
| strings::format("cpus:2;mem:%d;disk:1024", totalMemInMB).get(); |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Launch a task with 0.2 cpu request, 0.5 cpu limit, half of |
| // host total memory - `DEFAULT_EXECUTOR_MEM` as memory request |
| // and half of host total memory as memory limit. |
| string resourceRequests = strings::format( |
| "cpus:0.2;mem:%d;disk:1024", |
| totalMemInMB/2 - DEFAULT_EXECUTOR_MEM.bytes() / 1024 / 1024).get(); |
| |
| Value::Scalar cpuLimit, memLimit; |
| cpuLimit.set_value(0.5); |
| memLimit.set_value(totalMemInMB/2); |
| |
| google::protobuf::Map<string, Value::Scalar> resourceLimits; |
| resourceLimits.insert({"cpus", cpuLimit}); |
| resourceLimits.insert({"mem", memLimit}); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| Resources::parse(resourceRequests).get(), |
| SLEEP_COMMAND(1000), |
| None(), |
| "test-task", |
| id::UUID::random().toString(), |
| resourceLimits); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| ASSERT_SOME(cpuHierarchy); |
| |
| Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| ASSERT_SOME(memoryHierarchy); |
| |
| string cgroup = path::join(flags.cgroups_root, containerId.value()); |
| |
| // The command executor will be given 0.1 cpu (`DEFAULT_EXECUTOR_CPUS`) |
| // by default, so in total the CPU shares of the executor container |
| // should be 0.3. |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * 0.3), |
| cgroups::cpu::shares(cpuHierarchy.get(), cgroup)); |
| |
| // The 0.1 cpu given to the command executor is also included in the cpu |
| // limit, so in total the CFS quota should be 0.6. |
| Try<Duration> cfsQuota = |
| cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), cgroup); |
| |
| ASSERT_SOME(cfsQuota); |
| |
| double expectedCFSQuota = |
| (cpuLimit.value() + DEFAULT_EXECUTOR_CPUS) * CPU_CFS_PERIOD.ms(); |
| |
| EXPECT_EQ(expectedCFSQuota, cfsQuota->ms()); |
| |
| // The command executor will be given 32MB (`DEFAULT_EXECUTOR_MEM`) by default |
| // so in total the memory soft limit should be half of host total memory. |
| EXPECT_SOME_EQ( |
| Megabytes(totalMemInMB/2), |
| cgroups::memory::soft_limit_in_bytes(memoryHierarchy.get(), cgroup)); |
| |
| // The 32MB memory given to the command executor is also included in the |
| // memory limit, so in total the memory limit should be half of host total |
| // memory + 32MB. |
| EXPECT_SOME_EQ( |
| Megabytes(memLimit.value()) + DEFAULT_EXECUTOR_MEM, |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), cgroup)); |
| |
| Future<ContainerStatus> status = containerizer->status(containerId); |
| AWAIT_READY(status); |
| ASSERT_TRUE(status->has_executor_pid()); |
| |
| // Ensure the OOM score adjustment is correctly set for the container. |
| Try<string> read = os::read( |
| strings::format("/proc/%d/oom_score_adj", status->executor_pid()).get()); |
| |
| ASSERT_SOME(read); |
| |
| // Since the memory request is half of host total memory (please note that |
| // `DEFAULT_EXECUTOR_MEM` is also included in memory request), so the OOM |
| // score adjustment should be about 500, see `MemorySubsystemProcess::isolate` |
| // for the detailed algorithm. |
| Try<int32_t> oomScoreAdj = numify<int32_t>(strings::trim(read.get())); |
| ASSERT_SOME(oomScoreAdj); |
| |
| EXPECT_GT(502, oomScoreAdj.get()); |
| EXPECT_LT(498, oomScoreAdj.get()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies that a task launched with infinite resource |
| // limits specified will have its CPU and memory's hard limits set |
| // correctly to infinite values. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_CommandTaskInfiniteLimits) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get()); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Launch a task with infinite resource limits. |
| Value::Scalar cpuLimit, memLimit; |
| cpuLimit.set_value(std::numeric_limits<double>::infinity()); |
| memLimit.set_value(std::numeric_limits<double>::infinity()); |
| |
| google::protobuf::Map<string, Value::Scalar> resourceLimits; |
| resourceLimits.insert({"cpus", cpuLimit}); |
| resourceLimits.insert({"mem", memLimit}); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| offers.get()[0].resources(), |
| "sleep 1000", |
| None(), |
| "test-task", |
| id::UUID::random().toString(), |
| resourceLimits); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| ASSERT_SOME(cpuHierarchy); |
| |
| Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| ASSERT_SOME(memoryHierarchy); |
| |
| string cgroup = path::join(flags.cgroups_root, containerId.value()); |
| |
| // The CFS quota should be -1 which means infinite quota. |
| Try<string> quota = |
| cgroups::read(cpuHierarchy.get(), cgroup, "cpu.cfs_quota_us"); |
| |
| ASSERT_SOME(quota); |
| EXPECT_EQ("-1", strings::trim(quota.get())); |
| |
| // Root cgroup (e.g., `/sys/fs/cgroup/memory/`) cannot have any limits set, so |
| // its hard limit must be infinity. |
| Try<Bytes> rootCgrouplimit = |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), ""); |
| |
| ASSERT_SOME(rootCgrouplimit); |
| |
| // The memory hard limit should be same as root cgroup's, i.e. infinity. |
| EXPECT_SOME_EQ( |
| rootCgrouplimit.get(), |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), cgroup)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies the default executor container's CPU and memory |
| // soft & hard limits can be updated correctly when launching task groups |
| // and killing tasks, and also verifies task's CPU and memory soft & hard |
| // limits can be set correctly. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_TaskGroupLimits) |
| { |
| Clock::pause(); |
| |
| master::Flags masterFlags = CreateMasterFlags(); |
| |
| Try<Owned<cluster::Master>> master = StartMaster(masterFlags); |
| ASSERT_SOME(master); |
| |
| // Disable AuthN on the agent. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| flags.cgroups_enable_cfs = true; |
| flags.authenticate_http_readwrite = false; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = |
| StartSlave(detector.get(), containerizer.get(), flags); |
| |
| ASSERT_SOME(slave); |
| |
| auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); |
| |
| EXPECT_CALL(*scheduler, connected(_)) |
| .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); |
| |
| Future<Event::Subscribed> subscribed; |
| EXPECT_CALL(*scheduler, subscribed(_, _)) |
| .WillOnce(FutureArg<1>(&subscribed)); |
| |
| Future<v1::scheduler::Event::Offers> offers1; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)); |
| |
| EXPECT_CALL(*scheduler, heartbeat(_)) |
| .WillRepeatedly(Return()); // Ignore heartbeats. |
| |
| v1::scheduler::TestMesos mesos( |
| master.get()->pid, ContentType::PROTOBUF, scheduler); |
| |
| AWAIT_READY(subscribed); |
| v1::FrameworkID frameworkId(subscribed->framework_id()); |
| |
| v1::ExecutorInfo executorInfo = v1::createExecutorInfo( |
| v1::DEFAULT_EXECUTOR_ID, |
| None(), |
| "cpus:0.1;mem:32;disk:32", |
| v1::ExecutorInfo::DEFAULT, |
| frameworkId); |
| |
| Clock::advance(masterFlags.allocation_interval); |
| Clock::settle(); |
| |
| AWAIT_READY(offers1); |
| ASSERT_FALSE(offers1->offers().empty()); |
| |
| const v1::Offer& offer1 = offers1->offers(0); |
| const v1::AgentID& agentId = offer1.agent_id(); |
| |
| // Launch the first task group which has two tasks, task1 has no resource |
| // limits specified but task2 has. |
| v1::TaskInfo taskInfo1 = v1::createTask( |
| agentId, |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| SLEEP_COMMAND(1000)); |
| |
| mesos::v1::Value::Scalar cpuLimit, memLimit; |
| cpuLimit.set_value(0.5); |
| memLimit.set_value(64); |
| |
| google::protobuf::Map<string, mesos::v1::Value::Scalar> resourceLimits; |
| resourceLimits.insert({"cpus", cpuLimit}); |
| resourceLimits.insert({"mem", memLimit}); |
| |
| v1::TaskInfo taskInfo2 = v1::createTask( |
| agentId, |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| SLEEP_COMMAND(1000), |
| None(), |
| "test-task", |
| id::UUID::random().toString(), |
| resourceLimits); |
| |
| taskInfo1.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS); |
| taskInfo1.mutable_container()->mutable_linux_info()->set_share_cgroups(false); |
| taskInfo2.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS); |
| taskInfo2.mutable_container()->mutable_linux_info()->set_share_cgroups(false); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate1; |
| Future<v1::scheduler::Event::Update> runningUpdate1; |
| Future<v1::scheduler::Event::Update> killedUpdate1; |
| |
| testing::Sequence task1; |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_STARTING)))) |
| .InSequence(task1) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&startingUpdate1), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) |
| .InSequence(task1) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&runningUpdate1), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_KILLED)))) |
| .InSequence(task1) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&killedUpdate1), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate2; |
| Future<v1::scheduler::Event::Update> runningUpdate2; |
| Future<v1::scheduler::Event::Update> killedUpdate2; |
| |
| testing::Sequence task2; |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_STARTING)))) |
| .InSequence(task2) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&startingUpdate2), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) |
| .InSequence(task2) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&runningUpdate2), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_KILLED)))) |
| .InSequence(task2) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&killedUpdate2), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| Future<v1::scheduler::Event::Offers> offers2; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); |
| |
| { |
| v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( |
| executorInfo, |
| v1::createTaskGroupInfo({taskInfo1, taskInfo2})); |
| |
| Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup}); |
| |
| // Set a 0s filter to immediately get another offer to launch |
| // the second task group. |
| call.mutable_accept()->mutable_filters()->set_refuse_seconds(0); |
| |
| mesos.send(call); |
| } |
| |
| AWAIT_READY(startingUpdate1); |
| AWAIT_READY(runningUpdate1); |
| |
| AWAIT_READY(startingUpdate2); |
| AWAIT_READY(runningUpdate2); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(3u, containers->size()); |
| |
| // Get task container IDs. |
| const v1::ContainerStatus& containerStatus1 = |
| runningUpdate1->status().container_status(); |
| |
| ASSERT_TRUE(containerStatus1.has_container_id()); |
| ASSERT_TRUE(containerStatus1.container_id().has_parent()); |
| |
| const v1::ContainerID& taskContainerId1 = containerStatus1.container_id(); |
| |
| const v1::ContainerStatus& containerStatus2 = |
| runningUpdate2->status().container_status(); |
| |
| ASSERT_TRUE(containerStatus2.has_container_id()); |
| ASSERT_TRUE(containerStatus2.container_id().has_parent()); |
| |
| const v1::ContainerID& taskContainerId2 = containerStatus2.container_id(); |
| |
| EXPECT_EQ(taskContainerId1.parent(), taskContainerId2.parent()); |
| |
| // Get the executor container ID. |
| const v1::ContainerID& executorContainerId = taskContainerId1.parent(); |
| |
| Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); |
| ASSERT_SOME(cpuHierarchy); |
| |
| Result<string> memoryHierarchy = cgroups::hierarchy("memory"); |
| ASSERT_SOME(memoryHierarchy); |
| |
| const string& executorCgroup = |
| path::join(flags.cgroups_root, executorContainerId.value()); |
| |
| const string& taskCgroup1 = |
| getCgroupPath(flags.cgroups_root, devolve(taskContainerId1)); |
| |
| const string& taskCgroup2 = |
| getCgroupPath(flags.cgroups_root, devolve(taskContainerId2)); |
| |
| // The CPU shares of the executor container is the sum of its own CPU |
| // request (0.1) + task1's CPU request (0.1) + task2's CPU request (0.1), |
| // i.e. 0.3. |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * 0.3), |
| cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup)); |
| |
| // The CPU shares of task1 is its CPU request (0.1). |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * 0.1), |
| cgroups::cpu::shares(cpuHierarchy.get(), taskCgroup1)); |
| |
| // The CPU shares of task2 is its CPU request (0.1). |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * 0.1), |
| cgroups::cpu::shares(cpuHierarchy.get(), taskCgroup2)); |
| |
| // The CFS quota of the executor container is the sum of its own CPU |
| // request (0.1) + task1's CPU request (0.1) + task2's CPU limit (0.5), |
| // i.e. 0.7. |
| Try<Duration> cfsQuota = |
| cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup); |
| |
| ASSERT_SOME(cfsQuota); |
| EXPECT_EQ(0.7 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); |
| |
| // The CFS quota of task1 is its CPU request (0.1). |
| cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), taskCgroup1); |
| ASSERT_SOME(cfsQuota); |
| EXPECT_EQ(0.1 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); |
| |
| // The CFS quota of task2 is its CPU limit (0.5). |
| cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), taskCgroup2); |
| ASSERT_SOME(cfsQuota); |
| EXPECT_EQ(0.5 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); |
| |
| // The memory soft limit of the executor container is the sum of its |
| // own memory request (32MB) + task1's memory request (32MB) + task2's |
| // memory request (32MB), i.e. 96MB. |
| EXPECT_SOME_EQ( |
| Megabytes(96), |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), executorCgroup)); |
| |
| // The memory soft limit of task1 is its memory request (32MB). |
| EXPECT_SOME_EQ( |
| Megabytes(32), |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), taskCgroup1)); |
| |
| // The memory soft limit of task2 is its memory request (32MB). |
| EXPECT_SOME_EQ( |
| Megabytes(32), |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), taskCgroup2)); |
| |
| // The memory hard limit of the executor container is the sum of its |
| // own memory request (32MB) + task1's memory request (32MB) + task2's |
| // memory limit (64MB), i.e. 128MB. |
| EXPECT_SOME_EQ( |
| Megabytes(128), |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup)); |
| |
| // The memory hard limit of task1 is its memory request (32MB). |
| EXPECT_SOME_EQ( |
| Megabytes(32), |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), taskCgroup1)); |
| |
| // The memory hard limit of task2 is its memory limit (64MB). |
| EXPECT_SOME_EQ( |
| Megabytes(64), |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), taskCgroup2)); |
| |
| Clock::advance(masterFlags.allocation_interval); |
| Clock::settle(); |
| Clock::resume(); |
| |
| AWAIT_READY(offers2); |
| ASSERT_FALSE(offers1->offers().empty()); |
| |
| const v1::Offer& offer2 = offers2->offers(0); |
| |
| // Launch the second task group which has only one task: task3, and this |
| // task has no resource limits specified. |
| v1::TaskInfo taskInfo3 = v1::createTask( |
| agentId, |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| SLEEP_COMMAND(1000)); |
| |
| taskInfo3.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS); |
| taskInfo3.mutable_container()->mutable_linux_info()->set_share_cgroups(false); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate3; |
| Future<v1::scheduler::Event::Update> runningUpdate3; |
| |
| testing::Sequence task3; |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo3.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_STARTING)))) |
| .InSequence(task3) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&startingUpdate3), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| EXPECT_CALL( |
| *scheduler, |
| update(_, AllOf( |
| TaskStatusUpdateTaskIdEq(taskInfo3.task_id()), |
| TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) |
| .InSequence(task3) |
| .WillOnce( |
| DoAll( |
| FutureArg<1>(&runningUpdate3), |
| v1::scheduler::SendAcknowledge(frameworkId, agentId))); |
| |
| mesos.send( |
| v1::createCallAccept( |
| frameworkId, |
| offer2, |
| {v1::LAUNCH_GROUP( |
| executorInfo, v1::createTaskGroupInfo({taskInfo3}))})); |
| |
| AWAIT_READY(startingUpdate3); |
| AWAIT_READY(runningUpdate3); |
| |
| // The CPU shares of the executor container is the sum of its own CPU |
| // request (0.1) + task1's CPU request (0.1) + task2's CPU request (0.1) |
| // + task3's CPU request (0.1), i.e. 0.4. |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * 0.4), |
| cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup)); |
| |
| // The CFS quota of the executor container is the sum of its own CPU |
| // request (0.1) + task1's CPU request (0.1) + task2's CPU limit (0.5) |
| // + task3's CPU request (0.1), i.e. 0.8. |
| cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup); |
| ASSERT_SOME(cfsQuota); |
| EXPECT_EQ(0.8 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); |
| |
| // The memory soft limit of the executor container is the sum of its |
| // own memory request (32MB) + task1's memory request (32MB) + task2's |
| // memory request (32MB) + task3's memory request (32MB) i.e. 128MB. |
| EXPECT_SOME_EQ( |
| Megabytes(128), |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), executorCgroup)); |
| |
| // The memory hard limit of the executor container is the sum of its |
| // own memory request (32MB) + task1's memory request (32MB) + task2's |
| // memory limit (64MB) + task3's memory request (32MB), i.e. 160MB. |
| EXPECT_SOME_EQ( |
| Megabytes(160), |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup)); |
| |
| // Now kill a task in the first task group. |
| mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id())); |
| |
| // Both of the two tasks in the first group will be killed. |
| AWAIT_READY(killedUpdate1); |
| AWAIT_READY(killedUpdate2); |
| |
| // The CPU shares of the executor container is the sum of its own CPU |
| // request (0.1) + task3's CPU request (0.1), i.e. 0.2. |
| EXPECT_SOME_EQ( |
| (uint64_t)(CPU_SHARES_PER_CPU * 0.2), |
| cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup)); |
| |
| // The CFS quota of the executor container is also the sum of its own CPU |
| // request (0.1) + task3's CPU request (0.1), i.e. 0.2. |
| cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup); |
| ASSERT_SOME(cfsQuota); |
| EXPECT_EQ(0.2 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); |
| |
| // The memory soft limit of the executor container is the sum of its |
| // own memory request (32MB) + task3's memory request (32MB) i.e. 64MB. |
| EXPECT_SOME_EQ( |
| Megabytes(64), |
| cgroups::memory::soft_limit_in_bytes( |
| memoryHierarchy.get(), executorCgroup)); |
| |
| // We only update the memory hard limit if it is the first time or when |
| // we're raising the existing limit (see `MemorySubsystemProcess::update` |
| // for details). So now the memory hard limit of the executor container |
| // should still be 160MB. |
| EXPECT_SOME_EQ( |
| Megabytes(160), |
| cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup)); |
| } |
| |
| |
| // This test verifies the limit swap functionality. Note that We use |
| // the default executor here in order to exercise both the increasing |
| // and decreasing of the memory limit. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_LimitSwap) |
| { |
| // Disable AuthN on the agent. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "filesystem/linux,cgroups/mem"; |
| flags.cgroups_limit_swap = true; |
| flags.authenticate_http_readwrite = false; |
| |
| // TODO(jieyu): Add a test filter for memsw support. |
| Result<Bytes> check = cgroups::memory::memsw_limit_in_bytes( |
| path::join(flags.cgroups_hierarchy, "memory"), "/"); |
| |
| ASSERT_FALSE(check.isError()); |
| |
| if (check.isNone()) { |
| return; |
| } |
| |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); |
| |
| EXPECT_CALL(*scheduler, connected(_)) |
| .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); |
| |
| Future<Event::Subscribed> subscribed; |
| EXPECT_CALL(*scheduler, subscribed(_, _)) |
| .WillOnce(FutureArg<1>(&subscribed)); |
| |
| Future<Event::Offers> offers; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(*scheduler, heartbeat(_)) |
| .WillRepeatedly(Return()); // Ignore heartbeats. |
| |
| v1::scheduler::TestMesos mesos( |
| master.get()->pid, ContentType::PROTOBUF, scheduler); |
| |
| AWAIT_READY(subscribed); |
| v1::FrameworkID frameworkId(subscribed->framework_id()); |
| |
| v1::ExecutorInfo executorInfo = v1::createExecutorInfo( |
| "test_default_executor", |
| None(), |
| "cpus:0.1;mem:32;disk:32", |
| v1::ExecutorInfo::DEFAULT); |
| |
| // Update `executorInfo` with the subscribed `frameworkId`. |
| executorInfo.mutable_framework_id()->CopyFrom(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->offers().empty()); |
| |
| const v1::Offer& offer = offers->offers(0); |
| |
| // NOTE: We use a non-shell command here because 'sh' might not be |
| // in the PATH. 'alpine' does not specify env PATH in the image. On |
| // some linux distribution, '/bin' is not in the PATH by default. |
| v1::TaskInfo taskInfo = v1::createTask( |
| offer.agent_id(), |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| v1::createCommandInfo("ls", {"ls", "-al", "/"})); |
| |
| Future<Event::Update> updateStarting; |
| Future<Event::Update> updateRunning; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(DoAll(FutureArg<1>(&updateStarting), |
| v1::scheduler::SendAcknowledge( |
| frameworkId, |
| offer.agent_id()))) |
| .WillOnce(DoAll(FutureArg<1>(&updateRunning), |
| v1::scheduler::SendAcknowledge( |
| frameworkId, |
| offer.agent_id()))); |
| |
| v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( |
| executorInfo, |
| v1::createTaskGroupInfo({taskInfo})); |
| |
| mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup})); |
| |
| AWAIT_READY(updateStarting); |
| ASSERT_EQ(v1::TASK_STARTING, updateStarting->status().state()); |
| EXPECT_EQ(taskInfo.task_id(), updateStarting->status().task_id()); |
| |
| AWAIT_READY(updateRunning); |
| ASSERT_EQ(v1::TASK_RUNNING, updateRunning->status().state()); |
| EXPECT_EQ(taskInfo.task_id(), updateRunning->status().task_id()); |
| EXPECT_TRUE(updateRunning->status().has_timestamp()); |
| |
| Future<Event::Update> updateFinished; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(FutureArg<1>(&updateFinished)); |
| |
| AWAIT_READY(updateFinished); |
| ASSERT_EQ(v1::TASK_FINISHED, updateFinished->status().state()); |
| EXPECT_EQ(taskInfo.task_id(), updateFinished->status().task_id()); |
| EXPECT_TRUE(updateFinished->status().has_timestamp()); |
| } |
| |
| |
| // The test verifies that the number of processes and threads in a |
| // container is correctly reported. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PidsAndTids) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu"; |
| flags.cgroups_cpu_enable_pids_and_tids_count = true; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get()); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| CommandInfo command; |
| command.set_shell(false); |
| command.set_value("/bin/cat"); |
| command.add_arguments("/bin/cat"); |
| |
| TaskInfo task = createTask( |
| offers.get()[0].slave_id(), |
| offers.get()[0].resources(), |
| command); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| Future<ResourceStatistics> usage = containerizer->usage(containerId); |
| AWAIT_READY(usage); |
| |
| // The possible running processes during capture process number. |
| // - src/.libs/mesos-executor |
| // - src/mesos-executor |
| // - src/.libs/mesos-containerizer |
| // - src/mesos-containerizer |
| // - cat |
| // For `cat` and `mesos-executor`, they keep idling during running |
| // the test case. For other processes, they may occur temporarily. |
| EXPECT_GE(usage->processes(), 2U); |
| EXPECT_GE(usage->threads(), 2U); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This tests the creation of cgroup when cgoups_root dir is gone. |
| // All tasks will fail if this happens after slave starting/recovering. |
| // We should create cgroup recursively to solve this. SEE MESOS-9305. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CreateRecursively) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/mem"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| ASSERT_SOME(slave); |
| |
| // Wait until agent recovery is complete. |
| AWAIT_READY(__recover); |
| |
| Result<string> hierarchy = cgroups::hierarchy("memory"); |
| ASSERT_SOME(hierarchy); |
| |
| // We should remove cgroups_root after the slave being started |
| // because slave will create cgroups_root dir during startup |
| // if it's not present. |
| ASSERT_SOME(os::rmdir( |
| path::join(hierarchy.get(), flags.cgroups_root), false)); |
| ASSERT_FALSE(os::exists(flags.cgroups_root)); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> schedRegistered; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(FutureSatisfy(&schedRegistered)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(schedRegistered); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers->size()); |
| |
| // Create a task to be launched in the mesos-container. We will be |
| // explicitly killing this task to perform the cleanup test. |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Capture the update to verify that the task has been launched. |
| AWAIT_READY(statusStarting); |
| ASSERT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| ASSERT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Task is ready. Make sure there is exactly 1 container in the hashset. |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| const ContainerID& containerID = *(containers->begin()); |
| |
| // Check if the memory cgroup for this container exists, by |
| // checking for the processes associated with this cgroup. |
| string cgroup = path::join( |
| flags.cgroups_root, |
| containerID.value()); |
| |
| Try<set<pid_t>> pids = cgroups::processes(hierarchy.get(), cgroup); |
| ASSERT_SOME(pids); |
| |
| // There should be at least one TGID associated with this cgroup. |
| EXPECT_LE(1u, pids->size()); |
| |
| // Isolator cleanup test: Killing the task should cleanup the cgroup |
| // associated with the container. |
| Future<TaskStatus> killStatus; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&killStatus)); |
| |
| // Wait for the executor to exit. We are using 'gc.schedule' as a proxy event |
| // to monitor the exit of the executor. |
| Future<Nothing> gcSchedule = FUTURE_DISPATCH( |
| _, &slave::GarbageCollectorProcess::schedule); |
| |
| driver.killTask(statusRunning->task_id()); |
| |
| AWAIT_READY(gcSchedule); |
| |
| // If the cleanup is successful the memory cgroup for this container should |
| // not exist. |
| ASSERT_FALSE(os::exists(cgroup)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| class NetClsHandleManagerTest : public testing::Test {}; |
| |
| |
| // Tests the ability of the `NetClsHandleManager` class to allocate |
| // and free secondary handles from a range of primary handles. |
| TEST_F(NetClsHandleManagerTest, AllocateFreeHandles) |
| { |
| NetClsHandleManager manager(IntervalSet<uint32_t>( |
| (Bound<uint32_t>::closed(0x0002), |
| Bound<uint32_t>::closed(0x0003)))); |
| |
| Try<NetClsHandle> handle = manager.alloc(0x0003); |
| ASSERT_SOME(handle); |
| |
| EXPECT_SOME_TRUE(manager.isUsed(handle.get())); |
| |
| ASSERT_SOME(manager.free(handle.get())); |
| |
| EXPECT_SOME_FALSE(manager.isUsed(handle.get())); |
| } |
| |
| |
| // Make sure allocation of secondary handles for invalid primary |
| // handles results in an error. |
| TEST_F(NetClsHandleManagerTest, AllocateInvalidPrimary) |
| { |
| NetClsHandleManager manager(IntervalSet<uint32_t>( |
| (Bound<uint32_t>::closed(0x0002), |
| Bound<uint32_t>::closed(0x0003)))); |
| |
| ASSERT_ERROR(manager.alloc(0x0001)); |
| } |
| |
| |
| // Tests that we can reserve secondary handles for a given primary |
| // handle so that they won't be allocated out later. |
| TEST_F(NetClsHandleManagerTest, ReserveHandles) |
| { |
| NetClsHandleManager manager(IntervalSet<uint32_t>( |
| (Bound<uint32_t>::closed(0x0002), |
| Bound<uint32_t>::closed(0x0003)))); |
| |
| NetClsHandle handle(0x0003, 0xffff); |
| |
| ASSERT_SOME(manager.reserve(handle)); |
| |
| EXPECT_SOME_TRUE(manager.isUsed(handle)); |
| } |
| |
| |
| // Tests that secondary handles are allocated only from a given range, |
| // when the range is specified. |
| TEST_F(NetClsHandleManagerTest, SecondaryHandleRange) |
| { |
| NetClsHandleManager manager( |
| IntervalSet<uint32_t>( |
| (Bound<uint32_t>::closed(0x0002), |
| Bound<uint32_t>::closed(0x0003))), |
| IntervalSet<uint32_t>( |
| (Bound<uint32_t>::closed(0xffff), |
| Bound<uint32_t>::closed(0xffff)))); |
| |
| Try<NetClsHandle> handle = manager.alloc(0x0003); |
| ASSERT_SOME(handle); |
| |
| EXPECT_SOME_TRUE(manager.isUsed(handle.get())); |
| |
| // Try allocating another handle. This should fail, since we don't |
| // have any more secondary handles left. |
| EXPECT_ERROR(manager.alloc(0x0003)); |
| |
| ASSERT_SOME(manager.free(handle.get())); |
| |
| ASSERT_SOME(manager.reserve(handle.get())); |
| |
| EXPECT_SOME_TRUE(manager.isUsed(handle.get())); |
| |
| // Make sure you cannot reserve a secondary handle that is out of |
| // range. |
| EXPECT_ERROR(manager.reserve(NetClsHandle(0x0003, 0x0001))); |
| } |
| |
| |
| // This tests the create, prepare, isolate and cleanup methods of the |
| // 'CgroupNetClsIsolatorProcess'. The test first creates a |
| // 'MesosContainerizer' with net_cls cgroup isolator enabled. The |
| // net_cls cgroup isolator is implemented in the |
| // 'CgroupNetClsIsolatorProcess' class. The test then launches a task |
| // in a mesos container and checks to see if the container has been |
| // added to the right net_cls cgroup. Finally, the test kills the task |
| // and makes sure that the 'CgroupNetClsIsolatorProcess' cleans up the |
| // net_cls cgroup created for the container. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_NET_CLS_Isolate) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| uint16_t primary = 0x0012; |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/net_cls"; |
| flags.cgroups_net_cls_primary_handle = stringify(primary); |
| flags.cgroups_net_cls_secondary_handles = "0xffff,0xffff"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> schedRegistered; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(FutureSatisfy(&schedRegistered)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(schedRegistered); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers->size()); |
| |
| // Create a task to be launched in the mesos-container. We will be |
| // explicitly killing this task to perform the cleanup test. |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Capture the update to verify that the task has been launched. |
| AWAIT_READY(statusStarting); |
| ASSERT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| ASSERT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Task is ready. Make sure there is exactly 1 container in the hashset. |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| const ContainerID& containerID = *(containers->begin()); |
| |
| Result<string> hierarchy = cgroups::hierarchy("net_cls"); |
| ASSERT_SOME(hierarchy); |
| |
| // Check if the net_cls cgroup for this container exists, by |
| // checking for the processes associated with this cgroup. |
| string cgroup = path::join( |
| flags.cgroups_root, |
| containerID.value()); |
| |
| Try<set<pid_t>> pids = cgroups::processes(hierarchy.get(), cgroup); |
| ASSERT_SOME(pids); |
| |
| // There should be at least one TGID associated with this cgroup. |
| EXPECT_LE(1u, pids->size()); |
| |
| // Read the `net_cls.classid` to verify that the handle has been set. |
| Try<uint32_t> classid = cgroups::net_cls::classid(hierarchy.get(), cgroup); |
| EXPECT_SOME(classid); |
| |
| if (classid.isSome()) { |
| // Make sure the primary handle is the same as the one set in |
| // `--cgroup_net_cls_primary_handle`. |
| EXPECT_EQ(primary, (classid.get() & 0xffff0000) >> 16); |
| |
| // Make sure the secondary handle is 0xffff. |
| EXPECT_EQ(0xffffu, classid.get() & 0xffff); |
| } |
| |
| // Isolator cleanup test: Killing the task should cleanup the cgroup |
| // associated with the container. |
| Future<TaskStatus> killStatus; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&killStatus)); |
| |
| // Wait for the executor to exit. We are using 'gc.schedule' as a proxy event |
| // to monitor the exit of the executor. |
| Future<Nothing> gcSchedule = FUTURE_DISPATCH( |
| _, &slave::GarbageCollectorProcess::schedule); |
| |
| driver.killTask(statusRunning->task_id()); |
| |
| AWAIT_READY(gcSchedule); |
| |
| // If the cleanup is successful the net_cls cgroup for this container should |
| // not exist. |
| ASSERT_FALSE(os::exists(cgroup)); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies that we are able to retrieve the `net_cls` handle |
| // from `/state`. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_NET_CLS_ContainerStatus) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/net_cls"; |
| flags.cgroups_net_cls_primary_handle = stringify(0x0012); |
| flags.cgroups_net_cls_secondary_handles = "0x0011,0x0012"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| Future<Nothing> schedRegistered; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(FutureSatisfy(&schedRegistered)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(schedRegistered); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers->size()); |
| |
| // Create a task to be launched in the mesos-container. |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| ASSERT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| ASSERT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Task is ready. Verify `ContainerStatus` is present in slave state. |
| Future<Response> response = process::http::get( |
| slave.get()->pid, |
| "state", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); |
| ASSERT_SOME(parse); |
| |
| Result<JSON::Object> netCls = parse->find<JSON::Object>( |
| "frameworks[0].executors[0].tasks[0].statuses[0]." |
| "container_status.cgroup_info.net_cls"); |
| |
| ASSERT_SOME(netCls); |
| |
| uint32_t classid = |
| netCls->values["classid"].as<JSON::Number>().as<uint32_t>(); |
| |
| // Check the primary and the secondary handle. |
| EXPECT_EQ(0x0012u, classid >> 16); |
| EXPECT_EQ(0x0011u, classid & 0xffff); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_Sample) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.perf_events = "cycles,task-clock"; |
| flags.perf_duration = Milliseconds(250); |
| flags.perf_interval = Milliseconds(500); |
| flags.isolation = "cgroups/perf_event"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get()); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 120"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| // This first sample is likely to be empty because perf hasn't |
| // completed yet but we should still have the required fields. |
| Future<ResourceStatistics> statistics1 = containerizer->usage(containerId); |
| AWAIT_READY(statistics1); |
| ASSERT_TRUE(statistics1->has_perf()); |
| EXPECT_TRUE(statistics1->perf().has_timestamp()); |
| EXPECT_TRUE(statistics1->perf().has_duration()); |
| |
| // Wait until we get the next sample. We use a generous timeout of |
| // two seconds because we currently have a one second reap interval; |
| // when running perf with perf_duration of 250ms we won't notice the |
| // exit for up to one second. |
| ResourceStatistics statistics2; |
| Duration waited = Duration::zero(); |
| do { |
| Future<ResourceStatistics> statistics = containerizer->usage(containerId); |
| AWAIT_READY(statistics); |
| |
| statistics2 = statistics.get(); |
| |
| ASSERT_TRUE(statistics2.has_perf()); |
| |
| if (statistics1->perf().timestamp() != |
| statistics2.perf().timestamp()) { |
| break; |
| } |
| |
| os::sleep(Milliseconds(250)); |
| waited += Milliseconds(250); |
| } while (waited < Seconds(2)); |
| |
| EXPECT_NE(statistics1->perf().timestamp(), |
| statistics2.perf().timestamp()); |
| |
| EXPECT_TRUE(statistics2.perf().has_cycles()); |
| EXPECT_LE(0u, statistics2.perf().cycles()); |
| |
| EXPECT_TRUE(statistics2.perf().has_task_clock()); |
| EXPECT_LE(0.0, statistics2.perf().task_clock()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Test that the perf event subsystem can be enabled after the agent |
| // restart. Previously created containers will not report perf |
| // statistics but newly created containers will. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_PERF_PerfForward) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Start an agent using a containerizer without the perf_event isolation. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> create = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(create); |
| |
| Owned<slave::Containerizer> containerizer(create.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| frameworkInfo, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| ASSERT_FALSE(offers1->empty()); |
| |
| Future<TaskStatus> statusStarting1; |
| Future<TaskStatus> statusRunning1; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting1)) |
| .WillOnce(FutureArg<1>(&statusRunning1)) |
| .WillRepeatedly(Return()); |
| |
| TaskInfo task1 = createTask( |
| offers1.get()[0].slave_id(), |
| Resources::parse("cpus:0.5;mem:128").get(), |
| "sleep 1000"); |
| |
| // We want to be notified immediately with new offer. |
| Filters filters; |
| filters.set_refuse_seconds(0); |
| |
| driver.launchTasks(offers1.get()[0].id(), {task1}, filters); |
| |
| AWAIT_READY(statusStarting1); |
| EXPECT_EQ(TASK_STARTING, statusStarting1->state()); |
| |
| AWAIT_READY(statusRunning1); |
| EXPECT_EQ(TASK_RUNNING, statusRunning1->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| |
| AWAIT_READY(containers); |
| EXPECT_EQ(1u, containers->size()); |
| |
| ContainerID containerId1 = *(containers->begin()); |
| |
| Future<ResourceStatistics> usage = containerizer->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| // There should not be any perf statistics. |
| EXPECT_FALSE(usage->has_perf()); |
| |
| slave.get()->terminate(); |
| |
| Future<vector<Offer>> offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| // Start a slave using a containerizer with the perf_event isolation. |
| flags.isolation = "cgroups/cpu,cgroups/mem,cgroups/perf_event"; |
| flags.perf_events = "cycles,task-clock"; |
| flags.perf_duration = Milliseconds(250); |
| flags.perf_interval = Milliseconds(500); |
| |
| containerizer.reset(); |
| |
| create = MesosContainerizer::create(flags, true, &fetcher); |
| ASSERT_SOME(create); |
| |
| containerizer.reset(create.get()); |
| |
| slave = StartSlave(detector.get(), containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Wait until slave recovery is complete. |
| AWAIT_READY(__recover); |
| |
| AWAIT_READY(offers2); |
| ASSERT_FALSE(offers2->empty()); |
| |
| // The first container should not report any perf statistics. |
| usage = containerizer->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| EXPECT_FALSE(usage->has_perf()); |
| |
| // Start a new container which will start reporting perf statistics. |
| TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting2; |
| Future<TaskStatus> statusRunning2; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting2)) |
| .WillOnce(FutureArg<1>(&statusRunning2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.launchTasks(offers2.get()[0].id(), {task2}); |
| |
| AWAIT_READY(statusStarting2); |
| EXPECT_EQ(TASK_STARTING, statusStarting2->state()); |
| |
| AWAIT_READY(statusRunning2); |
| EXPECT_EQ(TASK_RUNNING, statusRunning2->state()); |
| |
| containers = containerizer->containers(); |
| |
| AWAIT_READY(containers); |
| ASSERT_EQ(2u, containers->size()); |
| EXPECT_TRUE(containers->contains(containerId1)); |
| |
| ContainerID containerId2; |
| foreach (const ContainerID& containerId, containers.get()) { |
| if (containerId != containerId1) { |
| containerId2 = containerId; |
| } |
| } |
| |
| usage = containerizer->usage(containerId2); |
| AWAIT_READY(usage); |
| |
| EXPECT_TRUE(usage->has_perf()); |
| |
| // TODO(jieyu): Consider kill the perf process. |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Test that the memory subsystem can be enabled after the agent |
| // restart. Previously created containers will not perform memory |
| // isolation but newly created containers will. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryForward) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Start an agent using a containerizer without the memory isolation. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> create = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(create); |
| |
| Owned<slave::Containerizer> containerizer(create.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| frameworkInfo, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| ASSERT_FALSE(offers1->empty()); |
| |
| Future<TaskStatus> statusStarting1; |
| Future<TaskStatus> statusRunning1; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting1)) |
| .WillOnce(FutureArg<1>(&statusRunning1)) |
| .WillRepeatedly(Return()); |
| |
| TaskInfo task1 = createTask( |
| offers1.get()[0].slave_id(), |
| Resources::parse("cpus:0.5;mem:128").get(), |
| "sleep 1000"); |
| |
| // We want to be notified immediately with new offer. |
| Filters filters; |
| filters.set_refuse_seconds(0); |
| |
| driver.launchTasks(offers1.get()[0].id(), {task1}, filters); |
| |
| AWAIT_READY(statusStarting1); |
| EXPECT_EQ(TASK_STARTING, statusStarting1->state()); |
| |
| AWAIT_READY(statusRunning1); |
| EXPECT_EQ(TASK_RUNNING, statusRunning1->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| |
| AWAIT_READY(containers); |
| EXPECT_EQ(1u, containers->size()); |
| |
| ContainerID containerId1 = *(containers->begin()); |
| |
| Future<ResourceStatistics> usage = containerizer->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| // There should not be any memory statistics. |
| EXPECT_FALSE(usage->has_mem_total_bytes()); |
| |
| slave.get()->terminate(); |
| |
| Future<vector<Offer>> offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| // Start an agent using a containerizer with the memory isolation. |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| |
| containerizer.reset(); |
| |
| create = MesosContainerizer::create(flags, true, &fetcher); |
| ASSERT_SOME(create); |
| |
| containerizer.reset(create.get()); |
| |
| slave = StartSlave(detector.get(), containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Wait until agent recovery is complete. |
| AWAIT_READY(__recover); |
| |
| AWAIT_READY(offers2); |
| ASSERT_FALSE(offers2->empty()); |
| |
| // The first container should not report memory statistics. |
| usage = containerizer->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| EXPECT_FALSE(usage->has_mem_total_bytes()); |
| EXPECT_FALSE(usage->has_mem_kmem_usage_bytes()); |
| |
| // Start a new container which will start reporting memory statistics. |
| TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting2; |
| Future<TaskStatus> statusRunning2; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting2)) |
| .WillOnce(FutureArg<1>(&statusRunning2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.launchTasks(offers2.get()[0].id(), {task2}); |
| |
| AWAIT_READY(statusStarting2); |
| EXPECT_EQ(TASK_STARTING, statusStarting2->state()); |
| |
| AWAIT_READY(statusRunning2); |
| EXPECT_EQ(TASK_RUNNING, statusRunning2->state()); |
| |
| containers = containerizer->containers(); |
| |
| AWAIT_READY(containers); |
| ASSERT_EQ(2u, containers->size()); |
| EXPECT_TRUE(containers->contains(containerId1)); |
| |
| ContainerID containerId2; |
| foreach (const ContainerID& containerId, containers.get()) { |
| if (containerId != containerId1) { |
| containerId2 = containerId; |
| } |
| } |
| |
| usage = containerizer->usage(containerId2); |
| AWAIT_READY(usage); |
| |
| EXPECT_TRUE(usage->has_mem_total_bytes()); |
| |
| Result<string> hierarchy = cgroups::hierarchy("memory"); |
| ASSERT_SOME(hierarchy); |
| |
| Try<bool> exists = cgroups::exists( |
| hierarchy.get(), "memory.kmem.usage_in_bytes"); |
| ASSERT_SOME(exists); |
| |
| if (exists.get()) { |
| EXPECT_TRUE(usage->has_mem_kmem_usage_bytes()); |
| } |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // Test that the memory subsystem can be disabled after the agent |
| // restart. Previously created containers will perform memory isolation |
| // but newly created containers will. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_MemoryBackward) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| // Start an agent using a containerizer with the memory isolation. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/cpu,cgroups/mem"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> create = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(create); |
| |
| Owned<slave::Containerizer> containerizer(create.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| frameworkInfo, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| ASSERT_FALSE(offers1->empty()); |
| |
| Future<TaskStatus> statusStarting1; |
| Future<TaskStatus> statusRunning1; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting1)) |
| .WillOnce(FutureArg<1>(&statusRunning1)) |
| .WillRepeatedly(Return()); |
| |
| TaskInfo task1 = createTask( |
| offers1.get()[0].slave_id(), |
| Resources::parse("cpus:0.5;mem:128").get(), |
| "sleep 1000"); |
| |
| // We want to be notified immediately with new offer. |
| Filters filters; |
| filters.set_refuse_seconds(0); |
| |
| driver.launchTasks(offers1.get()[0].id(), {task1}, filters); |
| |
| AWAIT_READY(statusStarting1); |
| EXPECT_EQ(TASK_STARTING, statusStarting1->state()); |
| |
| AWAIT_READY(statusRunning1); |
| EXPECT_EQ(TASK_RUNNING, statusRunning1->state()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| |
| AWAIT_READY(containers); |
| EXPECT_EQ(1u, containers->size()); |
| |
| ContainerID containerId1 = *(containers->begin()); |
| |
| Future<ResourceStatistics> usage = containerizer->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| EXPECT_TRUE(usage->has_mem_total_bytes()); |
| |
| slave.get()->terminate(); |
| |
| Future<vector<Offer>> offers2; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| // Start an agent using a containerizer without the memory isolation. |
| flags.isolation = "cgroups/cpu"; |
| |
| containerizer.reset(); |
| |
| create = MesosContainerizer::create(flags, true, &fetcher); |
| ASSERT_SOME(create); |
| |
| containerizer.reset(create.get()); |
| |
| slave = StartSlave(detector.get(), containerizer.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Wait until agent recovery is complete. |
| AWAIT_READY(__recover); |
| |
| AWAIT_READY(offers2); |
| ASSERT_FALSE(offers2->empty()); |
| |
| // The first container should not report memory statistics. |
| usage = containerizer->usage(containerId1); |
| AWAIT_READY(usage); |
| |
| // After restart the agent without the memory isolation, |
| // the container should not report memory statistics. |
| EXPECT_FALSE(usage->has_mem_total_bytes()); |
| |
| TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting2; |
| Future<TaskStatus> statusRunning2; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting2)) |
| .WillOnce(FutureArg<1>(&statusRunning2)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.launchTasks(offers2.get()[0].id(), {task2}); |
| |
| AWAIT_READY(statusStarting2); |
| EXPECT_EQ(TASK_STARTING, statusStarting2->state()); |
| |
| AWAIT_READY(statusRunning2); |
| EXPECT_EQ(TASK_RUNNING, statusRunning2->state()); |
| |
| containers = containerizer->containers(); |
| |
| AWAIT_READY(containers); |
| ASSERT_EQ(2u, containers->size()); |
| EXPECT_TRUE(containers->contains(containerId1)); |
| |
| ContainerID containerId2; |
| foreach (const ContainerID& containerId, containers.get()) { |
| if (containerId != containerId1) { |
| containerId2 = containerId; |
| } |
| } |
| |
| usage = containerizer->usage(containerId2); |
| AWAIT_READY(usage); |
| |
| // After restart the agent without the memory isolation, |
| // the container should not report memory statistics. |
| EXPECT_FALSE(usage->has_mem_total_bytes()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies the cgroups blkio statistics |
| // of the container can be successfully retrieved. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_BlkioUsage) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/blkio"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get(), |
| flags); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| // Create a task to generate a 10k file with 10 disk writes. |
| // |
| // TODO(qianzhang): In some old platforms (CentOS 6 and Ubuntu 14), |
| // the first disk write of a blkio cgroup will always be missed in |
| // the blkio throttling statistics, so here we run two `dd` commands, |
| // the first one which does the first disk write will be missed on |
| // those platforms, and the second one will be recorded in the blkio |
| // throttling statistics. When we drop the CentOS 6 and Ubuntu 14 |
| // support, we should remove the first `dd` command. |
| TaskInfo task = createTask( |
| offers.get()[0], |
| "dd if=/dev/zero of=file bs=1024 count=1 oflag=dsync && " |
| "dd if=/dev/zero of=file bs=1024 count=10 oflag=dsync && " |
| "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(&driver, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillRepeatedly(Return()); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // NOTE: The command executor's id is the same as the task id. |
| ExecutorID executorId; |
| executorId.set_value(task.task_id().value()); |
| |
| const string directory = slave::paths::getExecutorLatestRunPath( |
| flags.work_dir, |
| offers.get()[0].slave_id(), |
| offers.get()[0].framework_id(), |
| executorId); |
| |
| ASSERT_TRUE(os::exists(directory)); |
| |
| // Make sure the file is completely generated. |
| const string filePath = path::join(directory, "file"); |
| Option<Bytes> fileSize; |
| Duration waited = Duration::zero(); |
| |
| do { |
| if (os::exists(filePath)) { |
| Try<Bytes> size = os::stat::size(filePath); |
| ASSERT_SOME(size); |
| |
| if (size->bytes() == 10240) { |
| fileSize = size.get(); |
| break; |
| } |
| } |
| |
| os::sleep(Seconds(1)); |
| waited += Seconds(1); |
| } while (waited < process::TEST_AWAIT_TIMEOUT); |
| |
| ASSERT_SOME(fileSize); |
| ASSERT_EQ(10240u, fileSize->bytes()); |
| |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| ContainerID containerId = *(containers->begin()); |
| |
| Future<ResourceStatistics> usage = containerizer->usage(containerId); |
| AWAIT_READY(usage); |
| |
| // We only check throttling statistics but not cfq statistics, because |
| // in the environment where the disk IO scheduler is not cfq, all the |
| // cfq statistics may be 0. And there must be at least two entries in |
| // the throttling statistics, one is the total statistics, the others |
| // are device specific statistics. |
| ASSERT_TRUE(usage->has_blkio_statistics()); |
| EXPECT_LE(2, usage->blkio_statistics().throttling_size()); |
| |
| // We only check the total throttling statistics. |
| Option<CgroupInfo::Blkio::Throttling::Statistics> totalThrottling; |
| foreach (const CgroupInfo::Blkio::Throttling::Statistics& statistics, |
| usage->blkio_statistics().throttling()) { |
| if (!statistics.has_device()) { |
| totalThrottling = statistics; |
| } |
| } |
| |
| EXPECT_SOME(totalThrottling); |
| EXPECT_EQ(1, totalThrottling->io_serviced_size()); |
| EXPECT_EQ(1, totalThrottling->io_service_bytes_size()); |
| |
| const CgroupInfo::Blkio::Value& totalIOServiced = |
| totalThrottling->io_serviced(0); |
| |
| EXPECT_TRUE(totalIOServiced.has_op()); |
| EXPECT_EQ(CgroupInfo::Blkio::TOTAL, totalIOServiced.op()); |
| EXPECT_TRUE(totalIOServiced.has_value()); |
| EXPECT_LE(10u, totalIOServiced.value()); |
| |
| const CgroupInfo::Blkio::Value& totalIOServiceBytes = |
| totalThrottling->io_service_bytes(0); |
| |
| EXPECT_TRUE(totalIOServiceBytes.has_op()); |
| EXPECT_EQ(CgroupInfo::Blkio::TOTAL, totalIOServiceBytes.op()); |
| EXPECT_TRUE(totalIOServiceBytes.has_value()); |
| EXPECT_LE(10240u, totalIOServiceBytes.value()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies all the local enabled cgroups subsystems |
| // can be automatically loaded by the cgroup isolator. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_AutoLoadSubsystems) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "cgroups/all"; |
| |
| Fetcher fetcher(flags); |
| |
| Try<MesosContainerizer*> _containerizer = |
| MesosContainerizer::create(flags, true, &fetcher); |
| |
| ASSERT_SOME(_containerizer); |
| |
| Owned<MesosContainerizer> containerizer(_containerizer.get()); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| containerizer.get()); |
| |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| MesosSchedulerDriver driver( |
| &sched, |
| DEFAULT_FRAMEWORK_INFO, |
| master.get()->pid, |
| DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->empty()); |
| |
| TaskInfo task = createTask(offers.get()[0], "sleep 1000"); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)); |
| |
| driver.launchTasks(offers.get()[0].id(), {task}); |
| |
| // Capture the update to verify that the task has been launched. |
| AWAIT_READY(statusStarting); |
| ASSERT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| ASSERT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| // Task is ready. Make sure there is exactly 1 container in the hashset. |
| Future<hashset<ContainerID>> containers = containerizer->containers(); |
| AWAIT_READY(containers); |
| ASSERT_EQ(1u, containers->size()); |
| |
| const ContainerID& containerId = *(containers->begin()); |
| |
| Try<set<string>> enabledSubsystems = cgroups::subsystems(); |
| ASSERT_SOME(enabledSubsystems); |
| |
| set<string> supportedSubsystems = { |
| CGROUP_SUBSYSTEM_BLKIO_NAME, |
| CGROUP_SUBSYSTEM_CPU_NAME, |
| CGROUP_SUBSYSTEM_CPUACCT_NAME, |
| CGROUP_SUBSYSTEM_CPUSET_NAME, |
| CGROUP_SUBSYSTEM_DEVICES_NAME, |
| CGROUP_SUBSYSTEM_HUGETLB_NAME, |
| CGROUP_SUBSYSTEM_MEMORY_NAME, |
| CGROUP_SUBSYSTEM_NET_CLS_NAME, |
| CGROUP_SUBSYSTEM_NET_PRIO_NAME, |
| CGROUP_SUBSYSTEM_PERF_EVENT_NAME, |
| CGROUP_SUBSYSTEM_PIDS_NAME, |
| }; |
| |
| // Check cgroups for all the local enabled subsystems |
| // have been created for the container. |
| foreach (const string& subsystem, enabledSubsystems.get()) { |
| if (supportedSubsystems.count(subsystem) == 0) { |
| continue; |
| } |
| |
| Result<string> hierarchy = cgroups::hierarchy(subsystem); |
| ASSERT_SOME(hierarchy); |
| |
| string cgroup = path::join(flags.cgroups_root, containerId.value()); |
| |
| ASSERT_TRUE(os::exists(path::join(hierarchy.get(), cgroup))); |
| } |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test verifies that after the agent recovery/upgrade, nested |
| // containers could still be launched under old containers which |
| // were launched before agent restarts if there are new cgroup |
| // subsystems are added in the agent cgroup isolation. |
| TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_AgentRecoveryWithNewCgroupSubsystems) |
| { |
| // Disable AuthN on the agent. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "filesystem/linux,docker/runtime,cgroups/mem"; |
| flags.image_providers = "docker"; |
| flags.authenticate_http_readwrite = false; |
| |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| // Start the slave with a static process ID. This allows the executor to |
| // reconnect with the slave upon a process restart. |
| const string id("agent"); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), id, flags); |
| ASSERT_SOME(slave); |
| |
| auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); |
| |
| v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo.set_checkpoint(true); |
| |
| EXPECT_CALL(*scheduler, connected(_)) |
| .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); |
| |
| Future<Event::Subscribed> subscribed; |
| EXPECT_CALL(*scheduler, subscribed(_, _)) |
| .WillOnce(FutureArg<1>(&subscribed)); |
| |
| Future<Event::Offers> offers1; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(*scheduler, heartbeat(_)) |
| .WillRepeatedly(Return()); // Ignore heartbeats. |
| |
| v1::scheduler::TestMesos mesos( |
| master.get()->pid, ContentType::PROTOBUF, scheduler); |
| |
| AWAIT_READY(subscribed); |
| v1::FrameworkID frameworkId(subscribed->framework_id()); |
| |
| v1::ExecutorInfo executorInfo = v1::createExecutorInfo( |
| "test_default_executor", |
| None(), |
| "cpus:0.1;mem:32;disk:32", |
| v1::ExecutorInfo::DEFAULT); |
| |
| // Update `executorInfo` with the subscribed `frameworkId`. |
| executorInfo.mutable_framework_id()->CopyFrom(frameworkId); |
| |
| AWAIT_READY(offers1); |
| ASSERT_FALSE(offers1->offers().empty()); |
| |
| const v1::Offer& offer1 = offers1->offers(0); |
| |
| v1::TaskInfo taskInfo1 = v1::createTask( |
| offer1.agent_id(), |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| "sleep 1000"); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate1; |
| Future<v1::scheduler::Event::Update> runningUpdate1; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(DoAll( |
| FutureArg<1>(&startingUpdate1), |
| v1::scheduler::SendAcknowledge(frameworkId, offer1.agent_id()))) |
| .WillOnce(DoAll( |
| FutureArg<1>(&runningUpdate1), |
| v1::scheduler::SendAcknowledge(frameworkId, offer1.agent_id()))) |
| .WillRepeatedly(Return()); |
| |
| mesos.send( |
| v1::createCallAccept( |
| frameworkId, |
| offer1, |
| {v1::LAUNCH_GROUP( |
| executorInfo, v1::createTaskGroupInfo({taskInfo1}))})); |
| |
| AWAIT_READY(startingUpdate1); |
| ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state()); |
| ASSERT_EQ(taskInfo1.task_id(), startingUpdate1->status().task_id()); |
| |
| AWAIT_READY(runningUpdate1); |
| ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state()); |
| ASSERT_EQ(taskInfo1.task_id(), runningUpdate1->status().task_id()); |
| |
| slave.get()->terminate(); |
| slave->reset(); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| // Update the cgroup isolation to introduce new subsystems. |
| flags.isolation = "filesystem/linux,docker/runtime,cgroups/all"; |
| slave = this->StartSlave(detector.get(), id, flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| Future<Event::Offers> offers2; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(Return()); |
| |
| AWAIT_READY(offers2); |
| ASSERT_FALSE(offers2->offers().empty()); |
| |
| const v1::Offer& offer2 = offers2->offers(0); |
| |
| v1::TaskInfo taskInfo2 = v1::createTask( |
| offer2.agent_id(), |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| "sleep 1000"); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate2; |
| Future<v1::scheduler::Event::Update> runningUpdate2; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(DoAll( |
| FutureArg<1>(&startingUpdate2), |
| v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id()))) |
| .WillOnce(FutureArg<1>(&runningUpdate2)); |
| |
| mesos.send( |
| v1::createCallAccept( |
| frameworkId, |
| offer2, |
| {v1::LAUNCH_GROUP( |
| executorInfo, v1::createTaskGroupInfo({taskInfo2}))})); |
| |
| AWAIT_READY(startingUpdate2); |
| ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state()); |
| ASSERT_EQ(taskInfo2.task_id(), startingUpdate2->status().task_id()); |
| |
| AWAIT_READY(runningUpdate2); |
| ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state()); |
| ASSERT_EQ(taskInfo2.task_id(), runningUpdate2->status().task_id()); |
| } |
| |
| |
| // This test verifies the container-specific cgroups are correctly mounted |
| // inside the nested container. |
| TEST_F(CgroupsIsolatorTest, |
| ROOT_CGROUPS_INTERNET_NestedContainerSpecificCgroupsMount) |
| { |
| // Disable AuthN on the agent. |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "filesystem/linux,docker/runtime,cgroups/mem,cgroups/cpu"; |
| flags.image_providers = "docker"; |
| flags.authenticate_http_readwrite = false; |
| |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); |
| |
| EXPECT_CALL(*scheduler, connected(_)) |
| .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); |
| |
| Future<Event::Subscribed> subscribed; |
| EXPECT_CALL(*scheduler, subscribed(_, _)) |
| .WillOnce(FutureArg<1>(&subscribed)); |
| |
| Future<Event::Offers> offers; |
| EXPECT_CALL(*scheduler, offers(_, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); |
| |
| EXPECT_CALL(*scheduler, heartbeat(_)) |
| .WillRepeatedly(Return()); // Ignore heartbeats. |
| |
| v1::scheduler::TestMesos mesos( |
| master.get()->pid, ContentType::PROTOBUF, scheduler); |
| |
| AWAIT_READY(subscribed); |
| v1::FrameworkID frameworkId(subscribed->framework_id()); |
| |
| v1::ExecutorInfo executorInfo = v1::createExecutorInfo( |
| "test_default_executor", |
| None(), |
| "cpus:0.1;mem:32;disk:32", |
| v1::ExecutorInfo::DEFAULT); |
| |
| // Update `executorInfo` with the subscribed `frameworkId`. |
| executorInfo.mutable_framework_id()->CopyFrom(frameworkId); |
| |
| AWAIT_READY(offers); |
| ASSERT_FALSE(offers->offers().empty()); |
| |
| const v1::Offer& offer = offers->offers(0); |
| |
| // Create a task to check if its memory and CPU shares (including both |
| // executor's and task's) are correctly set in its specific cgroup. |
| // |
| // And we also verify the freezer cgroup is correctly mounted for this task |
| // by checking if the current shell PID is included in the freezer cgroup. |
| v1::TaskInfo taskInfo = v1::createTask( |
| offer.agent_id(), |
| v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| "test `cat /sys/fs/cgroup/memory/memory.soft_limit_in_bytes` = 67108864 " |
| "&& test `cat /sys/fs/cgroup/cpu/cpu.shares` = 204" |
| "&& grep $$ /sys/fs/cgroup/freezer/cgroup.procs"); |
| |
| mesos::v1::Image image; |
| image.set_type(mesos::v1::Image::DOCKER); |
| image.mutable_docker()->set_name("alpine"); |
| |
| mesos::v1::ContainerInfo* container = taskInfo.mutable_container(); |
| container->set_type(mesos::v1::ContainerInfo::MESOS); |
| container->mutable_mesos()->mutable_image()->CopyFrom(image); |
| |
| Future<v1::scheduler::Event::Update> startingUpdate; |
| Future<v1::scheduler::Event::Update> runningUpdate; |
| Future<v1::scheduler::Event::Update> finishedUpdate; |
| EXPECT_CALL(*scheduler, update(_, _)) |
| .WillOnce(DoAll( |
| FutureArg<1>(&startingUpdate), |
| v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id()))) |
| .WillOnce(DoAll( |
| FutureArg<1>(&runningUpdate), |
| v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id()))) |
| .WillOnce(DoAll( |
| FutureArg<1>(&finishedUpdate), |
| v1::scheduler::SendAcknowledge(frameworkId, offer.agent_id()))); |
| |
| mesos.send( |
| v1::createCallAccept( |
| frameworkId, |
| offer, |
| {v1::LAUNCH_GROUP( |
| executorInfo, v1::createTaskGroupInfo({taskInfo}))})); |
| |
| AWAIT_READY(startingUpdate); |
| ASSERT_EQ(v1::TASK_STARTING, startingUpdate->status().state()); |
| ASSERT_EQ(taskInfo.task_id(), startingUpdate->status().task_id()); |
| |
| AWAIT_READY(runningUpdate); |
| ASSERT_EQ(v1::TASK_RUNNING, runningUpdate->status().state()); |
| ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id()); |
| |
| AWAIT_READY(finishedUpdate); |
| ASSERT_EQ(v1::TASK_FINISHED, finishedUpdate->status().state()); |
| ASSERT_EQ(taskInfo.task_id(), finishedUpdate->status().task_id()); |
| } |
| |
| |
| // This test verifies the container-specific cgroups are correctly mounted for |
| // the command task. |
| TEST_F(CgroupsIsolatorTest, |
| ROOT_CGROUPS_INTERNET_CommandTaskSpecificCgroupsMount) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| flags.isolation = "filesystem/linux,docker/runtime,cgroups/mem,cgroups/cpu"; |
| flags.image_providers = "docker"; |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)) |
| .WillRepeatedly(Return()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers); |
| EXPECT_EQ(1u, offers->size()); |
| |
| // Create a task to check if its memory and CPU shares (including both |
| // executor's and task's) are correctly set in its specific cgroup. |
| // |
| // And we also verify the freezer cgroup is correctly mounted for this task |
| // by checking if the current shell PID is included in the freezer cgroup. |
| TaskInfo task = createTask( |
| offers->front().slave_id(), |
| Resources::parse("cpus:0.1;mem:32;disk:32").get(), |
| "test `cat /sys/fs/cgroup/memory/memory.soft_limit_in_bytes` = 67108864 " |
| "&& test `cat /sys/fs/cgroup/cpu/cpu.shares` = 204" |
| "&& grep $$ /sys/fs/cgroup/freezer/cgroup.procs"); |
| |
| Image image; |
| image.set_type(Image::DOCKER); |
| image.mutable_docker()->set_name("alpine"); |
| |
| ContainerInfo* container = task.mutable_container(); |
| container->set_type(ContainerInfo::MESOS); |
| container->mutable_mesos()->mutable_image()->CopyFrom(image); |
| |
| Future<TaskStatus> statusStarting; |
| Future<TaskStatus> statusRunning; |
| Future<TaskStatus> statusFinished; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&statusStarting)) |
| .WillOnce(FutureArg<1>(&statusRunning)) |
| .WillOnce(FutureArg<1>(&statusFinished)); |
| |
| driver.launchTasks(offers->front().id(), {task}); |
| |
| AWAIT_READY(statusStarting); |
| EXPECT_EQ(TASK_STARTING, statusStarting->state()); |
| |
| AWAIT_READY(statusRunning); |
| EXPECT_EQ(TASK_RUNNING, statusRunning->state()); |
| |
| AWAIT_READY(statusFinished); |
| EXPECT_EQ(TASK_FINISHED, statusFinished->state()); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |