| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <gmock/gmock.h> |
| |
| #include <iostream> |
| #include <string> |
| #include <vector> |
| |
| #include <process/future.hpp> |
| #include <process/reap.hpp> |
| |
| #include <stout/gtest.hpp> |
| #include <stout/json.hpp> |
| #include <stout/net.hpp> |
| |
| #include "linux/fs.hpp" |
| |
| #include "linux/routing/utils.hpp" |
| |
| #include "linux/routing/filter/ip.hpp" |
| |
| #include "linux/routing/link/link.hpp" |
| |
| #include "linux/routing/queueing/ingress.hpp" |
| |
| #include "master/master.hpp" |
| |
| #include "slave/flags.hpp" |
| #include "slave/slave.hpp" |
| |
| #include "slave/containerizer/isolators/network/port_mapping.hpp" |
| |
| #include "slave/containerizer/launcher.hpp" |
| #include "slave/containerizer/linux_launcher.hpp" |
| |
| #include "slave/containerizer/mesos/containerizer.hpp" |
| #include "slave/containerizer/mesos/launch.hpp" |
| |
| #include "tests/flags.hpp" |
| #include "tests/mesos.hpp" |
| #include "tests/utils.hpp" |
| |
| using namespace mesos; |
| |
| using namespace mesos::internal; |
| using namespace mesos::internal::slave; |
| using namespace mesos::internal::tests; |
| |
| using namespace process; |
| |
| using namespace routing; |
| using namespace routing::filter; |
| using namespace routing::queueing; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::slave::Launcher; |
| using mesos::internal::slave::LinuxLauncher; |
| using mesos::internal::slave::MesosContainerizer; |
| using mesos::internal::slave::MesosContainerizerLaunch; |
| using mesos::internal::slave::PortMappingIsolatorProcess; |
| |
| using std::list; |
| using std::ostringstream; |
| using std::set; |
| using std::string; |
| using std::vector; |
| |
| using testing::_; |
| using testing::Eq; |
| using testing::Return; |
| |
| |
| // An old glibc might not have this symbol. |
| #ifndef MNT_DETACH |
| #define MNT_DETACH 2 |
| #endif |
| |
| |
| static void cleanup(const string& eth0, const string& lo) |
| { |
| // Clean up the ingress qdisc on eth0 and lo if exists. |
| Try<bool> hostEth0ExistsQdisc = ingress::exists(eth0); |
| ASSERT_SOME(hostEth0ExistsQdisc); |
| if (hostEth0ExistsQdisc.get()) { |
| ASSERT_SOME_TRUE(ingress::remove(eth0)); |
| } |
| |
| Try<bool> hostLoExistsQdisc = ingress::exists(lo); |
| ASSERT_SOME(hostLoExistsQdisc); |
| if (hostLoExistsQdisc.get()) { |
| ASSERT_SOME_TRUE(ingress::remove(lo)); |
| } |
| |
| // Clean up all 'veth' devices if exist. |
| Try<set<string> > links = net::links(); |
| ASSERT_SOME(links); |
| foreach (const string& name, links.get()) { |
| if (strings::startsWith(name, slave::VETH_PREFIX)) { |
| ASSERT_SOME_TRUE(link::remove(name)); |
| } |
| } |
| |
| Try<list<string> > entries = os::ls(slave::BIND_MOUNT_ROOT); |
| ASSERT_SOME(entries); |
| foreach (const string& file, entries.get()) { |
| string target = path::join(slave::BIND_MOUNT_ROOT, file); |
| |
| // NOTE: Here, we ignore the unmount errors because previous tests |
| // may have created the file and died before mounting. |
| mesos::internal::fs::unmount(target, MNT_DETACH); |
| |
| // Use best effort to remove the bind mount file, but it is okay |
| // the file can't be removed at this point. |
| os::rm(target); |
| } |
| } |
| |
| |
| class PortMappingIsolatorTest : public TemporaryDirectoryTest |
| { |
| public: |
| static void SetUpTestCase() |
| { |
| ASSERT_SOME(routing::check()) |
| << "-------------------------------------------------------------\n" |
| << "We cannot run any PortMappingIsolatorTests because your\n" |
| << "libnl library is not new enough. You can either install a\n" |
| << "new libnl library, or disable this test case\n" |
| << "-------------------------------------------------------------"; |
| |
| ASSERT_SOME_EQ(0, os::shell(NULL, "which nc")) |
| << "-------------------------------------------------------------\n" |
| << "We cannot run any PortMappingIsolatorTests because 'nc'\n" |
| << "could not be found. You can either install 'nc', or disable\n" |
| << "this test case\n" |
| << "-------------------------------------------------------------"; |
| |
| ASSERT_SOME_EQ(0, os::shell(NULL, "which arping")) |
| << "-------------------------------------------------------------\n" |
| << "We cannot run some PortMappingIsolatorTests because 'arping'\n" |
| << "could not be found. You can either isntall 'arping', or\n" |
| << "disable this test case\n" |
| << "-------------------------------------------------------------"; |
| } |
| |
| protected: |
| virtual void SetUp() |
| { |
| TemporaryDirectoryTest::SetUp(); |
| |
| flags = CreateSlaveFlags(); |
| |
| // Guess the name of the public interface. |
| Result<string> _eth0 = link::eth0(); |
| ASSERT_SOME(_eth0) << "Failed to guess the name of the public interface"; |
| |
| eth0 = _eth0.get(); |
| |
| LOG(INFO) << "Using " << eth0 << " as the public interface"; |
| |
| // Guess the name of the loopback interface. |
| Result<string> _lo = link::lo(); |
| ASSERT_SOME(_lo) << "Failed to guess the name of the loopback interface"; |
| |
| lo = _lo.get(); |
| |
| LOG(INFO) << "Using " << lo << " as the loopback interface"; |
| |
| // Clean up qdiscs and veth devices. |
| cleanup(eth0, lo); |
| |
| // Get host IP address. |
| Result<net::IP> _hostIP = net::ip(eth0); |
| CHECK_SOME(_hostIP) |
| << "Failed to retrieve the host public IP from " << eth0 << ": " |
| << _hostIP.error(); |
| |
| hostIP = _hostIP.get(); |
| |
| // Get all the external name servers for tests that need to talk |
| // to an external host, e.g., ping, DNS. |
| Try<string> read = os::read("/etc/resolv.conf"); |
| CHECK_SOME(read); |
| |
| foreach (const string& line, strings::split(read.get(), "\n")) { |
| if (!strings::startsWith(line, "nameserver")) { |
| continue; |
| } |
| |
| vector<string> tokens = strings::split(line, " "); |
| ASSERT_EQ(2u, tokens.size()) << "Unexpected format in '/etc/resolv.conf'"; |
| if (tokens[1] != "127.0.0.1") { |
| nameServers.push_back(tokens[1]); |
| } |
| } |
| |
| container1Ports = "ports:[31000-31499]"; |
| container2Ports = "ports:[31500-32000]"; |
| |
| port = 31001; |
| |
| // 'errorPort' is not in 'ports' or 'ephemeral_ports'. |
| errorPort = 32502; |
| |
| container1Ready = path::join(os::getcwd(), "container1_ready"); |
| container2Ready = path::join(os::getcwd(), "container2_ready"); |
| trafficViaLoopback = path::join(os::getcwd(), "traffic_via_loopback"); |
| trafficViaPublic = path::join(os::getcwd(), "traffic_via_public"); |
| exitStatus = path::join(os::getcwd(), "exit_status"); |
| } |
| |
| virtual void TearDown() |
| { |
| cleanup(eth0, lo); |
| TemporaryDirectoryTest::TearDown(); |
| } |
| |
| slave::Flags CreateSlaveFlags() |
| { |
| slave::Flags flags; |
| |
| flags.launcher_dir = path::join(tests::flags.build_dir, "src"); |
| |
| // NOTE: By default, Linux sets host ip local port range to |
| // [32768, 61000]. We set 'ephemeral_ports' resource so that it |
| // does not overlap with the host ip local port range. |
| flags.resources = |
| "cpus:2;mem:1024;disk:1024;ports:[31000-32000];" |
| "ephemeral_ports:[30001-30999]"; |
| |
| // NOTE: '16' should be enough for all our tests. |
| flags.ephemeral_ports_per_container = 16; |
| |
| flags.isolation = "network/port_mapping"; |
| |
| return flags; |
| } |
| |
| Try<pid_t> launchHelper( |
| Launcher* launcher, |
| int pipes[2], |
| const ContainerID& containerId, |
| const string& command, |
| const Option<CommandInfo>& preparation) |
| { |
| CommandInfo commandInfo; |
| commandInfo.set_value(command); |
| |
| // The flags to pass to the helper process. |
| MesosContainerizerLaunch::Flags launchFlags; |
| |
| launchFlags.command = JSON::Protobuf(commandInfo); |
| launchFlags.directory = os::getcwd(); |
| |
| CHECK_SOME(os::user()); |
| launchFlags.user = os::user().get(); |
| |
| launchFlags.pipe_read = pipes[0]; |
| launchFlags.pipe_write = pipes[1]; |
| |
| JSON::Object commands; |
| JSON::Array array; |
| array.values.push_back(JSON::Protobuf(preparation.get())); |
| commands.values["commands"] = array; |
| |
| launchFlags.commands = commands; |
| |
| vector<string> argv(2); |
| argv[0] = "mesos-containerizer"; |
| argv[1] = MesosContainerizerLaunch::NAME; |
| |
| Try<pid_t> pid = launcher->fork( |
| containerId, |
| path::join(flags.launcher_dir, "mesos-containerizer"), |
| argv, |
| Subprocess::FD(STDIN_FILENO), |
| Subprocess::FD(STDOUT_FILENO), |
| Subprocess::FD(STDERR_FILENO), |
| launchFlags, |
| None(), |
| None()); |
| |
| return pid; |
| } |
| |
| slave::Flags flags; |
| |
| // Name of the host eth0 and lo. |
| string eth0; |
| string lo; |
| |
| // Host public IP. |
| Option<net::IP> hostIP; |
| |
| // 'port' is within the range of ports assigned to one container. |
| int port; |
| |
| // 'errorPort' is outside the range of ports assigned to the |
| // container. Connecting to a container using this port will fail. |
| int errorPort; |
| |
| // Ports assigned to container1. |
| string container1Ports; |
| |
| // Ports assigned to container2. |
| string container2Ports; |
| |
| // All the external name servers as read from /etc/resolv.conf. |
| vector<string> nameServers; |
| |
| // Some auxiliary files for the tests. |
| string container1Ready; |
| string container2Ready; |
| string trafficViaLoopback; |
| string trafficViaPublic; |
| string exitStatus; |
| }; |
| |
| |
| // This test uses 2 containers: one listens to 'port' and 'errorPort' |
| // and writes data received to files; the other container attemptes to |
| // connect to the previous container using 'port' and |
| // 'errorPort'. Verify that only the connection through 'port' is |
| // successful. |
| TEST_F(PortMappingIsolatorTest, ROOT_ContainerToContainerTCPTest) |
| { |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId1; |
| containerId1.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId1, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| // Listen to 'localhost' and 'port'. |
| command1 << "nc -l localhost " << port << " > " << trafficViaLoopback << "& "; |
| |
| // Listen to 'public ip' and 'port'. |
| command1 << "nc -l " << net::IP(hostIP.get().address()) << " " << port |
| << " > " << trafficViaPublic << "& "; |
| |
| // Listen to 'errorPort'. This should not get anything. |
| command1 << "nc -l " << errorPort << " | tee " << trafficViaLoopback << " " |
| << trafficViaPublic << "& "; |
| |
| // Touch the guard file. |
| command1 << "touch " << container1Ready; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId1, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status1 = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId1, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to start. |
| while (!os::exists(container1Ready)); |
| |
| ContainerID containerId2; |
| containerId2.set_value("container2"); |
| |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container2Ports).get()); |
| |
| Future<Option<CommandInfo> > preparation2 = |
| isolator.get()->prepare(containerId2, executorInfo); |
| AWAIT_READY(preparation2); |
| ASSERT_SOME(preparation2.get()); |
| |
| ostringstream command2; |
| // Send to 'localhost' and 'port'. |
| command2 << "echo -n hello1 | nc localhost " << port << ";"; |
| // Send to 'localhost' and 'errorPort'. This should fail. |
| command2 << "echo -n hello2 | nc localhost " << errorPort << ";"; |
| // Send to 'public IP' and 'port'. |
| command2 << "echo -n hello3 | nc " << net::IP(hostIP.get().address()) |
| << " " << port << ";"; |
| // Send to 'public IP' and 'errorPort'. This should fail. |
| command2 << "echo -n hello4 | nc " << net::IP(hostIP.get().address()) |
| << " " << errorPort << ";"; |
| // Touch the guard file. |
| command2 << "touch " << container2Ready; |
| |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId2, |
| command2.str(), |
| preparation2.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status2 = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId2, pid.get())); |
| |
| // Now signal the child to continue. |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to start. |
| while (!os::exists(container2Ready)); |
| |
| // Wait for the command to complete. |
| AWAIT_READY(status1); |
| AWAIT_READY(status2); |
| |
| EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback)); |
| EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId1)); |
| AWAIT_READY(launcher.get()->destroy(containerId2)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId1)); |
| AWAIT_READY(isolator.get()->cleanup(containerId2)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // The same container-to-container test but with UDP. |
| TEST_F(PortMappingIsolatorTest, ROOT_ContainerToContainerUDPTest) |
| { |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId1; |
| containerId1.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId1, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| // Listen to 'localhost' and 'port'. |
| command1 << "nc -u -l localhost " << port << " > " |
| << trafficViaLoopback << "& "; |
| |
| // Listen to 'public ip' and 'port'. |
| command1 << "nc -u -l " << net::IP(hostIP.get().address()) << " " << port |
| << " > " << trafficViaPublic << "& "; |
| |
| // Listen to 'errorPort'. This should not receive anything. |
| command1 << "nc -u -l " << errorPort << " | tee " << trafficViaLoopback << " " |
| << trafficViaPublic << "& "; |
| |
| // Touch the guard file. |
| command1 << "touch " << container1Ready; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId1, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status1 = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId1, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to start. |
| while (!os::exists(container1Ready)); |
| |
| ContainerID containerId2; |
| containerId2.set_value("container2"); |
| |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container2Ports).get()); |
| |
| Future<Option<CommandInfo> > preparation2 = |
| isolator.get()->prepare(containerId2, executorInfo); |
| AWAIT_READY(preparation2); |
| ASSERT_SOME(preparation2.get()); |
| |
| ostringstream command2; |
| // Send to 'localhost' and 'port'. |
| command2 << "echo -n hello1 | nc -w1 -u localhost " << port << ";"; |
| // Send to 'localhost' and 'errorPort'. No data should be sent. |
| command2 << "echo -n hello2 | nc -w1 -u localhost " << errorPort << ";"; |
| // Send to 'public IP' and 'port'. |
| command2 << "echo -n hello3 | nc -w1 -u " << net::IP(hostIP.get().address()) |
| << " " << port << ";"; |
| // Send to 'public IP' and 'errorPort'. No data should be sent. |
| command2 << "echo -n hello4 | nc -w1 -u " << net::IP(hostIP.get().address()) |
| << " " << errorPort << ";"; |
| // Touch the guard file. |
| command2 << "touch " << container2Ready; |
| |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId2, |
| command2.str(), |
| preparation2.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status2 = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId2, pid.get())); |
| |
| // Now signal the child to continue. |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to start. |
| while (!os::exists(container2Ready)); |
| |
| // Wait for the command to complete. |
| AWAIT_READY(status1); |
| AWAIT_READY(status2); |
| |
| EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback)); |
| EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId1)); |
| AWAIT_READY(launcher.get()->destroy(containerId2)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId1)); |
| AWAIT_READY(isolator.get()->cleanup(containerId2)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test the scenario where a UDP server is in a container while host |
| // tries to establish a UDP connection. |
| TEST_F(PortMappingIsolatorTest, ROOT_HostToContainerUDPTest) |
| { |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId; |
| containerId.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| // Listen to 'localhost' and 'Port'. |
| command1 << "nc -u -l localhost " << port << " > " |
| << trafficViaLoopback << "&"; |
| |
| // Listen to 'public IP' and 'Port'. |
| command1 << "nc -u -l " << net::IP(hostIP.get().address()) << " " << port |
| << " > " << trafficViaPublic << "&"; |
| |
| // Listen to 'public IP' and 'errorPort'. This should not receive anything. |
| command1 << "nc -u -l " << errorPort << " | tee " << trafficViaLoopback << " " |
| << trafficViaPublic << "&"; |
| |
| // Touch the guard file. |
| command1 << "touch " << container1Ready; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to start. |
| while (!os::exists(container1Ready)); |
| |
| // Send to 'localhost' and 'port'. |
| ASSERT_SOME_EQ(0, os::shell( |
| NULL, |
| "echo -n hello1 | nc -w1 -u localhost %s", |
| stringify(port).c_str())); |
| |
| // Send to 'localhost' and 'errorPort'. The command should return |
| // successfully because UDP is stateless but no data could be sent. |
| ASSERT_SOME_EQ(0, os::shell( |
| NULL, |
| "echo -n hello2 | nc -w1 -u localhost %s", |
| stringify(errorPort).c_str())); |
| |
| // Send to 'public IP' and 'port'. |
| ASSERT_SOME_EQ(0, os::shell( |
| NULL, |
| "echo -n hello3 | nc -w1 -u %s %s", |
| stringify(net::IP(hostIP.get().address())).c_str(), |
| stringify(port).c_str())); |
| |
| // Send to 'public IP' and 'errorPort'. The command should return |
| // successfully because UDP is stateless but no data could be sent. |
| ASSERT_SOME_EQ(0, os::shell( |
| NULL, |
| "echo -n hello4 | nc -w1 -u %s %s", |
| stringify(net::IP(hostIP.get().address())).c_str(), |
| stringify(errorPort).c_str())); |
| |
| EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback)); |
| EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test the scenario where a TCP server is in a container while host |
| // tries to establish a TCP connection. |
| TEST_F(PortMappingIsolatorTest, ROOT_HostToContainerTCPTest) |
| { |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId; |
| containerId.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| // Listen to 'localhost' and 'Port'. |
| command1 << "nc -l localhost " << port << " > " << trafficViaLoopback << "&"; |
| |
| // Listen to 'public IP' and 'Port'. |
| command1 << "nc -l " << net::IP(hostIP.get().address()) << " " << port |
| << " > " << trafficViaPublic << "&"; |
| |
| // Listen to 'public IP' and 'errorPort'. This should fail. |
| command1 << "nc -l " << errorPort << " | tee " << trafficViaLoopback << " " |
| << trafficViaPublic << "&"; |
| |
| // Touch the guard file. |
| command1 << "touch " << container1Ready; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to start. |
| while (!os::exists(container1Ready)); |
| |
| // Send to 'localhost' and 'port'. |
| ASSERT_SOME_EQ(0, os::shell( |
| NULL, |
| "echo -n hello1 | nc localhost %s", |
| stringify(port).c_str())); |
| |
| // Send to 'localhost' and 'errorPort'. This should fail because TCP |
| // connection couldn't be established.. |
| ASSERT_SOME_EQ(256, os::shell( |
| NULL, |
| "echo -n hello2 | nc localhost %s", |
| stringify(errorPort).c_str())); |
| |
| // Send to 'public IP' and 'port'. |
| ASSERT_SOME_EQ(0, os::shell( |
| NULL, |
| "echo -n hello3 | nc %s %s", |
| stringify(net::IP(hostIP.get().address())).c_str(), |
| stringify(port).c_str())); |
| |
| // Send to 'public IP' and 'errorPort'. This should fail because TCP |
| // connection couldn't be established. |
| ASSERT_SOME_EQ(256, os::shell( |
| NULL, |
| "echo -n hello4 | nc %s %s", |
| stringify(net::IP(hostIP.get().address())).c_str(), |
| stringify(errorPort).c_str())); |
| |
| EXPECT_SOME_EQ("hello1", os::read(trafficViaLoopback)); |
| EXPECT_SOME_EQ("hello3", os::read(trafficViaPublic)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test the scenario where a container issues ICMP requests to |
| // external hosts. |
| TEST_F(PortMappingIsolatorTest, ROOT_ContainerICMPExternalTest) |
| { |
| // TODO(chzhcn): Even though this is unlikely, consider a better |
| // way to get external servers. |
| ASSERT_FALSE(nameServers.empty()) |
| << "-------------------------------------------------------------\n" |
| << "We cannot run some PortMappingIsolatorTests because we could\n" |
| << "not find any external name servers in /etc/resolv.conf.\n" |
| << "-------------------------------------------------------------"; |
| |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId; |
| containerId.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| for (unsigned int i = 0; i < nameServers.size(); i++) { |
| const string& IP = nameServers[i]; |
| command1 << "ping -c1 " << IP; |
| if (i + 1 < nameServers.size()) { |
| command1 << " && "; |
| } |
| } |
| command1 << "; echo -n $? > " << exitStatus << "; sync"; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to complete. |
| AWAIT_READY(status); |
| |
| EXPECT_SOME_EQ("0", os::read(exitStatus)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test the scenario where a container issues ICMP requests to itself. |
| TEST_F(PortMappingIsolatorTest, ROOT_ContainerICMPInternalTest) |
| { |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId; |
| containerId.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| command1 << "ping -c1 127.0.0.1 && ping -c1 " |
| << stringify(net::IP(hostIP.get().address())); |
| command1 << "; echo -n $? > " << exitStatus << "; sync"; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to complete. |
| AWAIT_READY(status); |
| |
| EXPECT_SOME_EQ("0", os::read(exitStatus)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test the scenario where a container issues ARP requests to |
| // external hosts. |
| TEST_F(PortMappingIsolatorTest, ROOT_ContainerARPExternalTest) |
| { |
| // TODO(chzhcn): Even though this is unlikely, consider a better |
| // way to get external servers. |
| ASSERT_FALSE(nameServers.empty()) |
| << "-------------------------------------------------------------\n" |
| << "We cannot run some PortMappingIsolatorTests because we could\n" |
| << "not find any external name servers in /etc/resolv.conf.\n" |
| << "-------------------------------------------------------------"; |
| |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId; |
| containerId.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| for (unsigned int i = 0; i < nameServers.size(); i++) { |
| const string& IP = nameServers[i]; |
| // Time out after 1s and terminate upon receiving the first reply. |
| command1 << "arping -f -w1 " << IP << " -I " << eth0; |
| if (i + 1 < nameServers.size()) { |
| command1 << " && "; |
| } |
| } |
| command1 << "; echo -n $? > " << exitStatus << "; sync"; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to complete. |
| AWAIT_READY(status); |
| |
| EXPECT_SOME_EQ("0", os::read(exitStatus)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test DNS connectivity. |
| TEST_F(PortMappingIsolatorTest, ROOT_DNSTest) |
| { |
| // TODO(chzhcn): Even though this is unlikely, consider a better |
| // way to get external servers. |
| ASSERT_FALSE(nameServers.empty()) |
| << "-------------------------------------------------------------\n" |
| << "We cannot run some PortMappingIsolatorTests because we could\n" |
| << "not find any external name servers in /etc/resolv.conf.\n" |
| << "-------------------------------------------------------------"; |
| |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId; |
| containerId.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| for (unsigned int i = 0; i < nameServers.size(); i++) { |
| const string& IP = nameServers[i]; |
| command1 << "host " << IP; |
| if (i + 1 < nameServers.size()) { |
| command1 << " && "; |
| } |
| } |
| command1 << "; echo -n $? > " << exitStatus << "; sync"; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| // Wait for the command to complete. |
| AWAIT_READY(status); |
| |
| EXPECT_SOME_EQ("0", os::read(exitStatus)); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| // Test the scenario where a container has run out of ephemeral ports |
| // to use. |
| TEST_F(PortMappingIsolatorTest, ROOT_TooManyContainersTest) |
| { |
| // Increase the ephemeral ports per container so that we dont have |
| // enough ephemeral ports to launch a second container. |
| flags.ephemeral_ports_per_container = 512; |
| |
| Try<Isolator*> isolator = PortMappingIsolatorProcess::create(flags); |
| CHECK_SOME(isolator); |
| |
| Try<Launcher*> launcher = LinuxLauncher::create(flags); |
| CHECK_SOME(launcher); |
| |
| // Set the executor's resources. |
| ExecutorInfo executorInfo; |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container1Ports).get()); |
| |
| ContainerID containerId1; |
| containerId1.set_value("container1"); |
| |
| Future<Option<CommandInfo> > preparation1 = |
| isolator.get()->prepare(containerId1, executorInfo); |
| AWAIT_READY(preparation1); |
| ASSERT_SOME(preparation1.get()); |
| |
| ostringstream command1; |
| command1 << "sleep 1000"; |
| |
| int pipes[2]; |
| ASSERT_NE(-1, ::pipe(pipes)); |
| |
| Try<pid_t> pid = launchHelper( |
| launcher.get(), |
| pipes, |
| containerId1, |
| command1.str(), |
| preparation1.get()); |
| ASSERT_SOME(pid); |
| |
| // Reap the forked child. |
| Future<Option<int> > status1 = process::reap(pid.get()); |
| |
| // Continue in the parent. |
| ::close(pipes[0]); |
| |
| // Isolate the forked child. |
| AWAIT_READY(isolator.get()->isolate(containerId1, pid.get())); |
| |
| // Now signal the child to continue. |
| char dummy; |
| ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy))); |
| ::close(pipes[1]); |
| |
| ContainerID containerId2; |
| containerId2.set_value("container2"); |
| |
| executorInfo.mutable_resources()->CopyFrom( |
| Resources::parse(container2Ports).get()); |
| |
| Future<Option<CommandInfo> > preparation2 = |
| isolator.get()->prepare(containerId2, executorInfo); |
| AWAIT_FAILED(preparation2); |
| |
| // Ensure all processes are killed. |
| AWAIT_READY(launcher.get()->destroy(containerId1)); |
| |
| // Let the isolator clean up. |
| AWAIT_READY(isolator.get()->cleanup(containerId1)); |
| |
| delete isolator.get(); |
| delete launcher.get(); |
| } |
| |
| |
| class PortMappingMesosTest : public ContainerizerTest<MesosContainerizer> |
| { |
| public: |
| slave::Flags CreateSlaveFlags() |
| { |
| slave::Flags flags = |
| ContainerizerTest<MesosContainerizer>::CreateSlaveFlags(); |
| |
| // Setup recovery slave flags. |
| flags.checkpoint = true; |
| flags.recover = "reconnect"; |
| flags.strict = true; |
| |
| return flags; |
| } |
| |
| virtual void SetUp() |
| { |
| ContainerizerTest<MesosContainerizer>::SetUp(); |
| |
| // Guess the name of the public interface. |
| Result<string> _eth0 = link::eth0(); |
| ASSERT_SOME(_eth0) << "Failed to guess the name of the public interface"; |
| |
| eth0 = _eth0.get(); |
| |
| LOG(INFO) << "Using " << eth0 << " as the public interface"; |
| |
| // Guess the name of the loopback interface. |
| Result<string> _lo = link::lo(); |
| ASSERT_SOME(_lo) << "Failed to guess the name of the loopback interface"; |
| |
| lo = _lo.get(); |
| |
| LOG(INFO) << "Using " << lo << " as the loopback interface"; |
| |
| cleanup(eth0, lo); |
| |
| flags = CreateSlaveFlags(); |
| } |
| |
| virtual void TearDown() |
| { |
| cleanup(eth0, lo); |
| ContainerizerTest<MesosContainerizer>::TearDown(); |
| } |
| |
| slave::Flags flags; |
| |
| // Name of the host eth0 and lo. |
| string eth0; |
| string lo; |
| }; |
| |
| |
| // Test the scenario where the network isolator is asked to recover |
| // both types of containers: containers that were previously managed |
| // by network isolator, and containers that weren't. |
| TEST_F(PortMappingMesosTest, ROOT_RecoverMixedContainersTest) |
| { |
| slave::Flags flagsNoNetworkIsolator = flags; |
| |
| vector<string> isolations = strings::split(flags.isolation, ","); |
| ASSERT_NE(0u, isolations.size()); |
| |
| vector<string>::iterator it = |
| find(isolations.begin(), isolations.end(), "network/port_mapping"); |
| |
| ASSERT_NE(it, isolations.end()) |
| << "PortMappingMesosTests could not run because network/port_mapping" |
| << "is not supported on this host."; |
| |
| // Clear part of flags to disable network isolator, but keep the |
| // rest of the flags the same to share the same settings for the |
| // slave. |
| isolations.erase(it); |
| flagsNoNetworkIsolator.isolation = strings::join(",", isolations); |
| |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<MesosContainerizer*> containerizer1 = |
| MesosContainerizer::create(flagsNoNetworkIsolator, true); |
| ASSERT_SOME(containerizer1); |
| |
| // Start the first slave without network isolator and start a task. |
| Try<PID<Slave> > slave = StartSlave( |
| containerizer1.get(), |
| flagsNoNetworkIsolator); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched1; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo1; |
| frameworkInfo1.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo1.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched1, frameworkInfo1, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched1, registered(_, _, _)); |
| |
| Future<vector<Offer> > offers1; |
| Future<vector<Offer> > offers2; |
| EXPECT_CALL(sched1, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillOnce(FutureArg<1>(&offers2)) |
| .WillRepeatedly(DeclineOffers()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| // The first task doesn't need network resources. |
| Offer offer1 = offers1.get()[0]; |
| offer1.mutable_resources()->CopyFrom( |
| Resources::parse("cpus:1;mem:512").get()); |
| |
| // Start a long running task without using network isolator. |
| TaskInfo task1 = createTask(offer1, "sleep 1000"); |
| vector<TaskInfo> tasks1; |
| tasks1.push_back(task1); |
| |
| EXPECT_CALL(sched1, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement1 = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks1); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement1); |
| |
| Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave with a new containerizer that uses network |
| // isolator. |
| Try<MesosContainerizer*> containerizer2 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| // Start the second slave with network isolator, recover the first |
| // task without network isolation and start a second task with |
| // network islation. |
| slave = StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| // Make sure the new containerizer recovers the task. |
| AWAIT_READY(_recover); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| Clock::resume(); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| AWAIT_READY(offers2); |
| EXPECT_NE(0u, offers2.get().size()); |
| |
| Offer offer2 = offers2.get()[0]; |
| |
| TaskInfo task2 = createTask(offer2, "sleep 1000"); |
| |
| vector<TaskInfo> tasks2; |
| tasks2.push_back(task2); // Long-running task |
| |
| EXPECT_CALL(sched1, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement2 = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers2.get()[0].id(), tasks2); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement2); |
| |
| Stop(slave.get()); |
| delete containerizer2.get(); |
| |
| Future<Nothing> _recover2 = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage2 = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| // Restart the slave with a new containerizer that uses network |
| // isolator. This is to verify the case where one task is running |
| // without network isolator and another task is running with network |
| // isolator. |
| Try<MesosContainerizer*> containerizer3 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer3); |
| |
| slave = StartSlave(containerizer3.get(), flags); |
| ASSERT_SOME(slave); |
| |
| Clock::pause(); |
| |
| // Make sure the new containerizer recovers the tasks from both runs |
| // previously. |
| AWAIT_READY(_recover2); |
| |
| Clock::settle(); // Wait for slave to schedule reregister timeout. |
| |
| Clock::advance(EXECUTOR_REREGISTER_TIMEOUT); |
| Clock::resume(); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage2); |
| |
| Future<hashset<ContainerID> > containers = containerizer3.get()->containers(); |
| AWAIT_READY(containers); |
| EXPECT_EQ(2u, containers.get().size()); |
| foreach (ContainerID containerId, containers.get()) { |
| // Do some basic checks to make sure the network isolator can |
| // handle mixed types of containers correctly. |
| Future<ResourceStatistics> usage = |
| containerizer3.get()->usage(containerId); |
| AWAIT_READY(usage); |
| |
| // TODO(chzhcn): write a more thorough test for update. |
| Try<Resources> resources = Containerizer::resources(flags); |
| ASSERT_SOME(resources); |
| |
| Future<Nothing> update = |
| containerizer3.get()->update(containerId, resources.get()); |
| AWAIT_READY(update); |
| } |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| delete containerizer3.get(); |
| } |
| |
| |
| // Test that all configurations (tc filters etc) is cleaned up for an |
| // orphaned container using the network isolator. |
| TEST_F(PortMappingMesosTest, ROOT_CleanUpOrphanTest) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Try<MesosContainerizer*> containerizer1 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer1); |
| |
| Try<PID<Slave> > slave = StartSlave(containerizer1.get(), flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| |
| // Enable checkpointing for the framework. |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); |
| frameworkInfo.set_checkpoint(true); |
| |
| MesosSchedulerDriver driver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(_, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer> > offers1; |
| EXPECT_CALL(sched, resourceOffers(_, _)) |
| .WillOnce(FutureArg<1>(&offers1)) |
| .WillRepeatedly(DeclineOffers()); // Ignore subsequent offers. |
| |
| driver.start(); |
| |
| AWAIT_READY(offers1); |
| EXPECT_NE(0u, offers1.get().size()); |
| |
| // Start a long running task using network islator. |
| TaskInfo task = createTask(offers1.get()[0], "sleep 1000"); |
| vector<TaskInfo> tasks; |
| tasks.push_back(task); |
| |
| EXPECT_CALL(sched, statusUpdate(_, _)); |
| |
| Future<Nothing> _statusUpdateAcknowledgement1 = |
| FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement); |
| |
| driver.launchTasks(offers1.get()[0].id(), tasks); |
| |
| // Wait for the ACK to be checkpointed. |
| AWAIT_READY(_statusUpdateAcknowledgement1); |
| |
| Stop(slave.get()); |
| delete containerizer1.get(); |
| |
| ExecutorID executorId; |
| executorId.set_value(task.task_id().value()); |
| |
| // Construct the framework meta directory that needs wiping. |
| string frameworkPath = paths::getFrameworkPath( |
| paths::getMetaRootDir(flags.work_dir), |
| offers1.get()[0].slave_id(), |
| frameworkId.get()); |
| |
| // Remove the framework meta directory, so that the slave will not |
| // recover the task. |
| ASSERT_SOME(os::rmdir(frameworkPath, true)); |
| |
| Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); |
| |
| Future<SlaveReregisteredMessage> slaveReregisteredMessage = |
| FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); |
| |
| Future<TaskStatus> status; |
| EXPECT_CALL(sched, statusUpdate(_, _)) |
| .WillOnce(FutureArg<1>(&status)) |
| .WillRepeatedly(Return()); // Ignore subsequent updates. |
| |
| // Restart the slave (use same flags) with a new containerizer. |
| Try<MesosContainerizer*> containerizer2 = |
| MesosContainerizer::create(flags, true); |
| ASSERT_SOME(containerizer2); |
| |
| slave = StartSlave(containerizer2.get(), flags); |
| ASSERT_SOME(slave); |
| |
| // Wait for the slave to recover. |
| AWAIT_READY(_recover); |
| |
| // Wait for the slave to re-register. |
| AWAIT_READY(slaveReregisteredMessage); |
| |
| // Wait for TASK_LOST update. |
| AWAIT_READY(status); |
| ASSERT_EQ(TASK_LOST, status.get().state()); |
| |
| // Expect that qdiscs still exist on eth0 and lo but with no filters. |
| Try<bool> hostEth0ExistsQdisc = ingress::exists(eth0); |
| EXPECT_SOME_TRUE(hostEth0ExistsQdisc); |
| |
| Try<bool> hostLoExistsQdisc = ingress::exists(lo); |
| EXPECT_SOME_TRUE(hostLoExistsQdisc); |
| |
| Result<vector<ip::Classifier> > classifiers = |
| ip::classifiers(eth0, ingress::HANDLE); |
| |
| EXPECT_SOME(classifiers); |
| EXPECT_EQ(0u, classifiers.get().size()); |
| |
| classifiers = ip::classifiers(lo, ingress::HANDLE); |
| EXPECT_SOME(classifiers); |
| EXPECT_EQ(0u, classifiers.get().size()); |
| |
| // Expect no 'veth' devices. |
| Try<set<string> > links = net::links(); |
| ASSERT_SOME(links); |
| foreach (const string& name, links.get()) { |
| EXPECT_FALSE(strings::startsWith(name, slave::VETH_PREFIX)); |
| } |
| |
| // Expect no files in bind mount directory. |
| Try<list<string> > files = os::ls(slave::BIND_MOUNT_ROOT); |
| ASSERT_SOME(files); |
| EXPECT_EQ(0u, files.get().size()); |
| |
| driver.stop(); |
| driver.join(); |
| |
| Shutdown(); |
| delete containerizer2.get(); |
| } |