| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <limits.h> |
| #include <string.h> |
| #include <unistd.h> |
| |
| #include <iostream> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include <mesos/mesos.hpp> |
| |
| #include <process/collect.hpp> |
| #include <process/defer.hpp> |
| #include <process/io.hpp> |
| #include <process/pid.hpp> |
| #include <process/subprocess.hpp> |
| |
| #include <stout/error.hpp> |
| #include <stout/foreach.hpp> |
| #include <stout/fs.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/json.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/mac.hpp> |
| #include <stout/multihashmap.hpp> |
| #include <stout/numify.hpp> |
| #include <stout/os.hpp> |
| #include <stout/option.hpp> |
| #include <stout/path.hpp> |
| #include <stout/protobuf.hpp> |
| #include <stout/result.hpp> |
| #include <stout/stringify.hpp> |
| #include <stout/strings.hpp> |
| #include <stout/utils.hpp> |
| |
| #include <stout/os/constants.hpp> |
| #include <stout/os/exists.hpp> |
| #include <stout/os/realpath.hpp> |
| #include <stout/os/stat.hpp> |
| |
| #include "common/status_utils.hpp" |
| #include "common/values.hpp" |
| |
| #include "linux/fs.hpp" |
| #include "linux/ns.hpp" |
| |
| #include "linux/routing/route.hpp" |
| #include "linux/routing/utils.hpp" |
| |
| #include "linux/routing/diagnosis/diagnosis.hpp" |
| |
| #include "linux/routing/filter/basic.hpp" |
| #include "linux/routing/filter/icmp.hpp" |
| #include "linux/routing/filter/ip.hpp" |
| |
| #include "linux/routing/handle.hpp" |
| |
| #include "linux/routing/link/link.hpp" |
| #include "linux/routing/link/veth.hpp" |
| |
| #include "linux/routing/queueing/fq_codel.hpp" |
| #include "linux/routing/queueing/htb.hpp" |
| #include "linux/routing/queueing/ingress.hpp" |
| #include "linux/routing/queueing/statistics.hpp" |
| |
| #include "mesos/resources.hpp" |
| |
| #include "slave/constants.hpp" |
| |
| #include "slave/containerizer/mesos/isolators/network/port_mapping.hpp" |
| |
| using namespace mesos::internal; |
| |
| using namespace process; |
| |
| using namespace routing; |
| using namespace routing::filter; |
| using namespace routing::queueing; |
| using namespace routing::queueing::statistics; |
| |
| using std::cerr; |
| using std::cout; |
| using std::dec; |
| using std::endl; |
| using std::hex; |
| using std::list; |
| using std::ostringstream; |
| using std::set; |
| using std::sort; |
| using std::string; |
| using std::vector; |
| |
| using filter::ip::PortRange; |
| |
| using mesos::internal::values::rangesToIntervalSet; |
| |
| using mesos::slave::ContainerConfig; |
| using mesos::slave::ContainerLaunchInfo; |
| using mesos::slave::ContainerLimitation; |
| using mesos::slave::ContainerState; |
| using mesos::slave::Isolator; |
| |
| // An old glibc might not have this symbol. |
| #ifndef MNT_DETACH |
| #define MNT_DETACH 2 |
| #endif |
| |
| namespace mesos { |
| namespace internal { |
| namespace slave { |
| |
| // The minimum number of ephemeral ports a container should have. |
| static const uint16_t MIN_EPHEMERAL_PORTS_SIZE = 16; |
| |
| // Linux traffic control is a combination of queueing disciplines, |
| // filters and classes organized as a tree for the ingress (rx) and |
| // egress (tx) flows for each interface. Each container provides two |
| // networking interfaces, a virtual eth0 and a loopback interface. The |
| // flow of packets from the external network to container is shown |
| // below: |
| // |
| // +----------------------+----------------------+ |
| // | Container | |
| // |----------------------|----------------------| |
| // | eth0 | lo | |
| // +----------------------+----------------------+ |
| // ^ | ^ | |
| // [3] | | [4] | | |
| // | | [7] +-----------+ [10] |
| // | | |
| // | | [8] +-----------+ [9] |
| // [2] | | [5] | | |
| // | v v v |
| // +----------------------+----------------------+ |
| // | veth0 | lo | |
| // +----------------------|----------------------+ |
| // | Host | |
| // |----------------------|----------------------| |
| // | eth0 | |
| // +----------------------+----------------------| |
| // ^ | |
| // [1] | | [6] |
| // | v |
| // |
| // Traffic flowing from outside the network into a container enters |
| // the system via the host ingress interface [1] and is routed based |
| // on destination port to the outbound interface for the matching |
| // container [2], which forwards the packet to the container's inbound |
| // virtual interface. Outbound traffic destined for the external |
| // network flows along the reverse path [4,5,6]. Loopback traffic is |
| // directed to the corresponding Ethernet interface, either [7,10] or |
| // [8,9] where the same destination port routing can be applied as to |
| // external traffic. We use traffic control filters on several of the |
| // interfaces to create these packet paths. |
| // |
| // Linux provides only a very simple topology for ingress interfaces. |
| // A root is provided on a fixed handle (handle::INGRESS_ROOT) under |
| // which a single qdisc can be installed, with handle ingress::HANDLE. |
| // Traffic control filters can then be attached to the ingress qdisc. |
| // We install one or more ingress filters on the host eth0 [1] to |
| // direct traffic to the correct container, and on the container |
| // virtual eth0 [5] to direct traffic to other containers or out of |
| // the box. Since we know the ip port assignments for each container, |
| // we can direct traffic directly to the appropriate container. |
| // However, for ICMP and ARP traffic where no equivalent to a port |
| // exists, we send a copy of the packet to every container and rely on |
| // the network stack to drop unexpected packets. |
| // |
| // We install a Hierarchical Token Bucket (HTB) qdisc and class to |
| // limit the outbound traffic bandwidth as the egress qdisc inside the |
| // container [4] and then add a fq_codel qdisc to limit head of line |
| // blocking on the egress filter. The egress traffic control chain is |
| // thus: |
| // |
| // root device: handle::EGRESS_ROOT -> |
| // htb egress qdisc: CONTAINER_TX_HTB_HANDLE -> |
| // htb rate limiting class: CONTAINER_TX_HTB_CLASS_ID -> |
| // buffer-bloat reduction: FQ_CODEL |
| constexpr Handle CONTAINER_TX_HTB_HANDLE = Handle(1, 0); |
| constexpr Handle CONTAINER_TX_HTB_CLASS_ID = |
| Handle(CONTAINER_TX_HTB_HANDLE, 1); |
| |
| |
| // Finally we create a second fq_codel qdisc on the public interface |
| // of the host [6] to reduce performance interference between |
| // containers. We create independent flows for each container, and |
| // one for the host, which ensures packets from each container are |
| // guaranteed fair access to the host interface. This egress traffic |
| // control chain for the host interface is thus: |
| // |
| // root device: handle::EGRESS_ROOT -> |
| // buffer-bloat reduction: FQ_CODEL |
| constexpr Handle HOST_TX_FQ_CODEL_HANDLE = Handle(1, 0); |
| |
| |
| // The primary priority used by each type of filter. |
| static const uint8_t ARP_FILTER_PRIORITY = 1; |
| static const uint8_t ICMP_FILTER_PRIORITY = 2; |
| static const uint8_t IP_FILTER_PRIORITY = 3; |
| static const uint8_t DEFAULT_FILTER_PRIORITY = 4; |
| |
| |
| // The secondary priorities used by filters. |
| static const uint8_t HIGH = 1; |
| static const uint8_t NORMAL = 2; |
| static const uint8_t LOW = 3; |
| |
| |
| // We assign a separate flow on host eth0 egress for each container |
| // (See MESOS-2422 for details). Host egress traffic is assigned to a |
| // reserved flow (HOST_FLOWID). ARP and ICMP traffic from containers |
| // are not heavy, so they can share the same flow. |
| static const uint16_t HOST_FLOWID = 1; |
| static const uint16_t ARP_FLOWID = 2; |
| static const uint16_t ICMP_FLOWID = 2; |
| static const uint16_t CONTAINER_MIN_FLOWID = 3; |
| |
| |
| // The well known ports. Used for sanity check. |
| static Interval<uint16_t> WELL_KNOWN_PORTS() |
| { |
| return (Bound<uint16_t>::closed(0), Bound<uint16_t>::open(1024)); |
| } |
| |
| |
| ///////////////////////////////////////////////// |
| // Helper functions for the isolator. |
| ///////////////////////////////////////////////// |
| |
| // Given an integer x, find the largest integer t such that t <= x and |
| // t is aligned to power of 2. |
| static uint32_t roundDownToPowerOfTwo(uint32_t x) |
| { |
| // Mutate x from 00001XXX to 0x00001111. |
| |
| // We know the MSB has to be a 1, so kill the LSB and make sure the |
| // first 2 most significant bits are 1s. |
| x = x | (x >> 1); |
| |
| // Now that the 2 most significant bits are 1s, make sure the first |
| // 4 most significant bits are 1s, too. |
| x = x | (x >> 2); |
| |
| // We keep going. Note that the 0s left to the MSB are never turned |
| // to 1s. |
| x = x | (x >> 4); |
| x = x | (x >> 8); |
| |
| // Now we have covered all 32 bits. |
| x = x | (x >> 16); |
| |
| // 0x00001111 - (0x00001111 >> 1) |
| return x - (x >> 1); |
| } |
| |
| |
| // Returns the name of the host end of the virtual ethernet pair for a |
| // given container. The kernel restricts link name to 16 characters or |
| // less, so we cannot put container ID into the device name. Instead, |
| // we use the pid of the executor process forked by the slave to |
| // uniquely name the device for each container. It's safe because we |
| // cannot have two active containers having the same pid for the |
| // executor process. |
| static string veth(pid_t pid) |
| { |
| return PORT_MAPPING_VETH_PREFIX() + stringify(pid); |
| } |
| |
| |
| // Extracts the pid from the given veth name. |
| static Option<pid_t> getPidFromVeth(const string& veth) |
| { |
| if (strings::startsWith(veth, PORT_MAPPING_VETH_PREFIX())) { |
| Try<pid_t> pid = numify<pid_t>( |
| strings::remove(veth, PORT_MAPPING_VETH_PREFIX(), strings::PREFIX)); |
| |
| if (pid.isSome()) { |
| return pid.get(); |
| } |
| } |
| |
| return None(); |
| } |
| |
| |
| // Extracts the container ID from the symlink that points to the |
| // network namespace handle. The following is the layout of the bind |
| // mount root and bind mount symlink root: |
| // <PORT_MAPPING_BIND_MOUNT_ROOT()> |
| // |--- 3945 (pid) <-| |
| // | |
| // <PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()> | |
| // |--- ecf293e7-e6e8-4cbc-aaee-4d6c958aa276 --| |
| // (symlink: container ID -> pid) |
| static Try<ContainerID> getContainerIdFromSymlink(const string& symlink) |
| { |
| if (!os::stat::islink(symlink)) { |
| return Error("Not a symlink"); |
| } |
| |
| string _containerId = Path(symlink).basename(); |
| |
| ContainerID containerId; |
| containerId.set_value(_containerId); |
| |
| return containerId; |
| } |
| |
| |
| // Extracts the pid from the network namespace handle. Returns None if |
| // the handle is clearly not created by us. |
| static Result<pid_t> getPidFromNamespaceHandle(const string& handle) |
| { |
| if (os::stat::islink(handle)) { |
| return Error("Not expecting a symlink"); |
| } |
| |
| string _pid = Path(handle).basename(); |
| |
| Try<pid_t> pid = numify<pid_t>(_pid); |
| if (pid.isError()) { |
| return None(); |
| } |
| |
| return pid.get(); |
| } |
| |
| |
| // Extracts the pid from the symlink that points to the network |
| // namespace handle. Returns None if it's a dangling symlink. |
| static Result<pid_t> getPidFromSymlink(const string& symlink) |
| { |
| if (!os::stat::islink(symlink)) { |
| return Error("Not a symlink"); |
| } |
| |
| Result<string> target = os::realpath(symlink); |
| if (target.isError()) { |
| return Error("Failed to follow the symlink: " + target.error()); |
| } else if (target.isNone()) { |
| // This is a dangling symlink. |
| return None(); |
| } |
| |
| return getPidFromNamespaceHandle(target.get()); |
| } |
| |
| |
| static string getSymlinkPath(const ContainerID& containerId) |
| { |
| return path::join( |
| PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(), |
| stringify(containerId)); |
| } |
| |
| |
| static string getNamespaceHandlePath(const string& bindMountRoot, pid_t pid) |
| { |
| return path::join(bindMountRoot, stringify(pid)); |
| } |
| |
| ///////////////////////////////////////////////// |
| // Implementation for PortMappingUpdate. |
| ///////////////////////////////////////////////// |
| |
| const char* PortMappingUpdate::NAME = "update"; |
| |
| |
| PortMappingUpdate::Flags::Flags() |
| { |
| add(&Flags::eth0_name, |
| "eth0_name", |
| "The name of the public network interface (e.g., eth0)"); |
| |
| add(&Flags::lo_name, |
| "lo_name", |
| "The name of the loopback network interface (e.g., lo)"); |
| |
| add(&Flags::pid, |
| "pid", |
| "The pid of the process whose namespaces we will enter"); |
| |
| add(&Flags::ports_to_add, |
| "ports_to_add", |
| "A collection of port ranges (formatted as a JSON object)\n" |
| "for which to add IP filters. E.g.,\n" |
| "--ports_to_add={\"range\":[{\"begin\":4,\"end\":8}]}"); |
| |
| add(&Flags::ports_to_remove, |
| "ports_to_remove", |
| "A collection of port ranges (formatted as a JSON object)\n" |
| "for which to remove IP filters. E.g.,\n" |
| "--ports_to_remove={\"range\":[{\"begin\":4,\"end\":8}]}"); |
| } |
| |
| |
| // The following two helper functions allow us to convert from a |
| // collection of port ranges to a JSON object and vice versa. They |
| // will be used for the port mapping update operation. |
| template <typename Iterable> |
| JSON::Object json(const Iterable& ranges) |
| { |
| Value::Ranges values; |
| foreach (const PortRange& range, ranges) { |
| Value::Range value; |
| value.set_begin(range.begin()); |
| value.set_end(range.end()); |
| |
| values.add_range()->CopyFrom(value); |
| } |
| return JSON::protobuf(values); |
| } |
| |
| |
| static Try<vector<PortRange>> parse(const JSON::Object& object) |
| { |
| Try<Value::Ranges> parsing = protobuf::parse<Value::Ranges>(object); |
| if (parsing.isError()) { |
| return Error("Failed to parse JSON: " + parsing.error()); |
| } |
| |
| vector<PortRange> ranges; |
| Value::Ranges values = parsing.get(); |
| for (int i = 0; i < values.range_size(); i++) { |
| const Value::Range& value = values.range(i); |
| Try<PortRange> range = PortRange::fromBeginEnd(value.begin(), value.end()); |
| if (range.isError()) { |
| return Error("Invalid port range: " + range.error()); |
| } |
| |
| ranges.push_back(range.get()); |
| } |
| return ranges; |
| } |
| |
| |
| // Helper function to set up IP filters inside the container for a |
| // given port range. |
| static Try<Nothing> addContainerIPFilters( |
| const PortRange& range, |
| const string& eth0, |
| const string& lo) |
| { |
| // Add an IP packet filter on lo such that local traffic inside a |
| // container will not be redirected to eth0. |
| Try<bool> loTerminal = filter::ip::create( |
| lo, |
| ingress::HANDLE, |
| ip::Classifier(None(), None(), None(), range), |
| Priority(IP_FILTER_PRIORITY, HIGH), |
| action::Terminal()); |
| |
| if (loTerminal.isError()) { |
| return Error( |
| "Failed to create an IP packet filter on " + lo + |
| " which stops packets from being sent to " + eth0 + |
| ": " + loTerminal.error()); |
| } else if (!loTerminal.get()) { |
| return Error( |
| "The IP packet filter on " + lo + |
| " which stops packets from being sent to " + |
| eth0 + " already exists"); |
| } |
| |
| // Add an IP packet filter (for loopback IP) from eth0 to lo to |
| // redirect all loopback IP traffic to lo. |
| Try<bool> eth0ToLoLoopback = filter::ip::create( |
| eth0, |
| ingress::HANDLE, |
| ip::Classifier( |
| None(), |
| net::IP::Network::LOOPBACK_V4().address(), |
| None(), |
| range), |
| Priority(IP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(lo)); |
| |
| if (eth0ToLoLoopback.isError()) { |
| return Error( |
| "Failed to create an IP packet filter (for loopback IP) from " + |
| eth0 + " to " + lo + ": " + eth0ToLoLoopback.error()); |
| } else if (!eth0ToLoLoopback.get()) { |
| return Error( |
| "The IP packet filter (for loopback IP) from " + |
| eth0 + " to " + lo + " already exists"); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| // Helper function to remove IP filters inside the container for a |
| // given port range. |
| static Try<Nothing> removeContainerIPFilters( |
| const PortRange& range, |
| const string& eth0, |
| const string& lo) |
| { |
| // Remove the 'terminal' IP packet filter on lo. |
| Try<bool> loTerminal = filter::ip::remove( |
| lo, |
| ingress::HANDLE, |
| ip::Classifier(None(), None(), None(), range)); |
| |
| if (loTerminal.isError()) { |
| return Error( |
| "Failed to remove the IP packet filter on " + lo + |
| " which stops packets from being sent to " + eth0 + |
| ": " + loTerminal.error()); |
| } else if (!loTerminal.get()) { |
| return Error( |
| "The IP packet filter on " + lo + |
| " which stops packets from being sent to " + eth0 + |
| " does not exist"); |
| } |
| |
| // Remove the IP packet filter (for loopback IP) from eth0 to lo. |
| Try<bool> eth0ToLoLoopback = filter::ip::remove( |
| eth0, |
| ingress::HANDLE, |
| ip::Classifier( |
| None(), |
| net::IP::Network::LOOPBACK_V4().address(), |
| None(), |
| range)); |
| |
| if (eth0ToLoLoopback.isError()) { |
| return Error( |
| "Failed to remove the IP packet filter (for loopback IP) from " + |
| eth0 + " to " + lo + ": " + eth0ToLoLoopback.error()); |
| } else if (!eth0ToLoLoopback.get()) { |
| return Error( |
| "The IP packet filter (for loopback IP) from " + |
| eth0 + " to " + lo + " does not exist"); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| int PortMappingUpdate::execute() |
| { |
| if (flags.help) { |
| cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl |
| << "Supported options:" << endl |
| << flags.usage(); |
| return 0; |
| } |
| |
| if (flags.eth0_name.isNone()) { |
| cerr << "The public interface name (e.g., eth0) is not specified" << endl; |
| return 1; |
| } |
| |
| if (flags.lo_name.isNone()) { |
| cerr << "The loopback interface name (e.g., lo) is not specified" << endl; |
| return 1; |
| } |
| |
| if (flags.pid.isNone()) { |
| cerr << "The pid is not specified" << endl; |
| return 1; |
| } |
| |
| if (flags.ports_to_add.isNone() && flags.ports_to_remove.isNone()) { |
| cerr << "Nothing to update" << endl; |
| return 1; |
| } |
| |
| Option<vector<PortRange>> portsToAdd; |
| Option<vector<PortRange>> portsToRemove; |
| |
| if (flags.ports_to_add.isSome()) { |
| Try<vector<PortRange>> parsing = parse(flags.ports_to_add.get()); |
| if (parsing.isError()) { |
| cerr << "Parsing 'ports_to_add' failed: " << parsing.error() << endl; |
| return 1; |
| } |
| portsToAdd = parsing.get(); |
| } |
| |
| if (flags.ports_to_remove.isSome()) { |
| Try<vector<PortRange>> parsing = parse(flags.ports_to_remove.get()); |
| if (parsing.isError()) { |
| cerr << "Parsing 'ports_to_remove' failed: " << parsing.error() << endl; |
| return 1; |
| } |
| portsToRemove = parsing.get(); |
| } |
| |
| // Enter the network namespace. |
| Try<Nothing> setns = ns::setns(flags.pid.get(), "net"); |
| if (setns.isError()) { |
| cerr << "Failed to enter the network namespace of pid " << flags.pid.get() |
| << ": " << setns.error() << endl; |
| return 1; |
| } |
| |
| // Update IP packet filters. |
| const string eth0 = flags.eth0_name.get(); |
| const string lo = flags.lo_name.get(); |
| |
| if (portsToAdd.isSome()) { |
| foreach (const PortRange& range, portsToAdd.get()) { |
| Try<Nothing> add = addContainerIPFilters(range, eth0, lo); |
| if (add.isError()) { |
| cerr << "Failed to add IP filters: " << add.error() << endl; |
| return 1; |
| } |
| } |
| } |
| |
| if (portsToRemove.isSome()) { |
| foreach (const PortRange& range, portsToRemove.get()) { |
| Try<Nothing> remove = removeContainerIPFilters(range, eth0, lo); |
| if (remove.isError()) { |
| cerr << "Failed to remove IP filters: " << remove.error() << endl; |
| return 1; |
| } |
| } |
| } |
| |
| return 0; |
| } |
| |
| ///////////////////////////////////////////////// |
| // Implementation for PortMappingStatistics. |
| ///////////////////////////////////////////////// |
| |
| const char* PortMappingStatistics::NAME = "statistics"; |
| |
| |
| PortMappingStatistics::Flags::Flags() |
| { |
| add(&Flags::eth0_name, |
| "eth0_name", |
| "The name of the public network interface (e.g., eth0)"); |
| |
| add(&Flags::pid, |
| "pid", |
| "The pid of the process whose namespaces we will enter"); |
| |
| add(&Flags::enable_socket_statistics_summary, |
| "enable_socket_statistics_summary", |
| "Whether to collect socket statistics summary for this container\n", |
| false); |
| |
| add(&Flags::enable_socket_statistics_details, |
| "enable_socket_statistics_details", |
| "Whether to collect socket statistics details (e.g., TCP RTT)\n" |
| "for this container.", |
| false); |
| |
| add(&Flags::enable_snmp_statistics, |
| "enable_snmp_statistics", |
| "Whether to collect SNMP statistics details (e.g., TCPRetransSegs)\n" |
| "for this container.", |
| false); |
| } |
| |
| |
| // A helper that copies the traffic control statistics from the |
| // statistics hashmap into the ResourceStatistics protocol buffer. |
| static void addTrafficControlStatistics( |
| const string& id, |
| const hashmap<string, uint64_t>& statistics, |
| ResourceStatistics* result) |
| { |
| TrafficControlStatistics *tc = result->add_net_traffic_control_statistics(); |
| |
| tc->set_id(id); |
| |
| // TODO(pbrett) Use protobuf reflection here. |
| if (statistics.contains(BACKLOG)) { |
| tc->set_backlog(statistics.at(BACKLOG)); |
| } |
| if (statistics.contains(BYTES)) { |
| tc->set_bytes(statistics.at(BYTES)); |
| } |
| if (statistics.contains(DROPS)) { |
| tc->set_drops(statistics.at(DROPS)); |
| } |
| if (statistics.contains(OVERLIMITS)) { |
| tc->set_overlimits(statistics.at(OVERLIMITS)); |
| } |
| if (statistics.contains(PACKETS)) { |
| tc->set_packets(statistics.at(PACKETS)); |
| } |
| if (statistics.contains(QLEN)) { |
| tc->set_qlen(statistics.at(QLEN)); |
| } |
| if (statistics.contains(RATE_BPS)) { |
| tc->set_ratebps(statistics.at(RATE_BPS)); |
| } |
| if (statistics.contains(RATE_PPS)) { |
| tc->set_ratepps(statistics.at(RATE_PPS)); |
| } |
| if (statistics.contains(REQUEUES)) { |
| tc->set_requeues(statistics.at(REQUEUES)); |
| } |
| } |
| |
| |
| static void addIPStatistics( |
| const hashmap<string, int64_t>& statistics, |
| ResourceStatistics* result) |
| { |
| SNMPStatistics *snmp = result->mutable_net_snmp_statistics(); |
| IpStatistics *ip = snmp->mutable_ip_stats(); |
| |
| // TODO(cwang): Use protobuf reflection here. |
| if (statistics.contains("Forwarding")) { |
| ip->set_forwarding(statistics.at("Forwarding")); |
| } |
| if (statistics.contains("DefaultTTL")) { |
| ip->set_defaultttl(statistics.at("DefaultTTL")); |
| } |
| if (statistics.contains("InReceives")) { |
| ip->set_inreceives(statistics.at("InReceives")); |
| } |
| if (statistics.contains("InHdrErrors")) { |
| ip->set_inhdrerrors(statistics.at("InHdrErrors")); |
| } |
| if (statistics.contains("InAddrErrors")) { |
| ip->set_inaddrerrors(statistics.at("InAddrErrors")); |
| } |
| if (statistics.contains("ForwDatagrams")) { |
| ip->set_forwdatagrams(statistics.at("ForwDatagrams")); |
| } |
| if (statistics.contains("InUnknownProtos")) { |
| ip->set_inunknownprotos(statistics.at("InUnknownProtos")); |
| } |
| if (statistics.contains("InDiscards")) { |
| ip->set_indiscards(statistics.at("InDiscards")); |
| } |
| if (statistics.contains("InDelivers")) { |
| ip->set_indelivers(statistics.at("InDelivers")); |
| } |
| if (statistics.contains("OutRequests")) { |
| ip->set_outrequests(statistics.at("OutRequests")); |
| } |
| if (statistics.contains("OutDiscards")) { |
| ip->set_outdiscards(statistics.at("OutDiscards")); |
| } |
| if (statistics.contains("OutNoRoutes")) { |
| ip->set_outnoroutes(statistics.at("OutNoRoutes")); |
| } |
| if (statistics.contains("ReasmTimeout")) { |
| ip->set_reasmtimeout(statistics.at("ReasmTimeout")); |
| } |
| if (statistics.contains("ReasmReqds")) { |
| ip->set_reasmreqds(statistics.at("ReasmReqds")); |
| } |
| if (statistics.contains("ReasmOKs")) { |
| ip->set_reasmoks(statistics.at("ReasmOKs")); |
| } |
| if (statistics.contains("ReasmFails")) { |
| ip->set_reasmfails(statistics.at("ReasmFails")); |
| } |
| if (statistics.contains("FragOKs")) { |
| ip->set_fragoks(statistics.at("FragOKs")); |
| } |
| if (statistics.contains("FragFails")) { |
| ip->set_fragfails(statistics.at("FragFails")); |
| } |
| if (statistics.contains("FragCreates")) { |
| ip->set_fragcreates(statistics.at("FragCreates")); |
| } |
| } |
| |
| |
| static void addICMPStatistics( |
| const hashmap<string, int64_t>& statistics, |
| ResourceStatistics* result) |
| { |
| SNMPStatistics *snmp = result->mutable_net_snmp_statistics(); |
| IcmpStatistics *icmp = snmp->mutable_icmp_stats(); |
| |
| // TODO(cwang): Use protobuf reflection here. |
| if (statistics.contains("InMsgs")) { |
| icmp->set_inmsgs(statistics.at("InMsgs")); |
| } |
| if (statistics.contains("InErrors")) { |
| icmp->set_inerrors(statistics.at("InErrors")); |
| } |
| if (statistics.contains("InCsumErrors")) { |
| icmp->set_incsumerrors(statistics.at("InCsumErrors")); |
| } |
| if (statistics.contains("InDestUnreachs")) { |
| icmp->set_indestunreachs(statistics.at("InDestUnreachs")); |
| } |
| if (statistics.contains("InTimeExcds")) { |
| icmp->set_intimeexcds(statistics.at("InTimeExcds")); |
| } |
| if (statistics.contains("InParmProbs")) { |
| icmp->set_inparmprobs(statistics.at("InParmProbs")); |
| } |
| if (statistics.contains("InSrcQuenchs")) { |
| icmp->set_insrcquenchs(statistics.at("InSrcQuenchs")); |
| } |
| if (statistics.contains("InRedirects")) { |
| icmp->set_inredirects(statistics.at("InRedirects")); |
| } |
| if (statistics.contains("InEchos")) { |
| icmp->set_inechos(statistics.at("InEchos")); |
| } |
| if (statistics.contains("InEchoReps")) { |
| icmp->set_inechoreps(statistics.at("InEchoReps")); |
| } |
| if (statistics.contains("InTimestamps")) { |
| icmp->set_intimestamps(statistics.at("InTimestamps")); |
| } |
| if (statistics.contains("InTimestampReps")) { |
| icmp->set_intimestampreps(statistics.at("InTimestampReps")); |
| } |
| if (statistics.contains("InAddrMasks")) { |
| icmp->set_inaddrmasks(statistics.at("InAddrMasks")); |
| } |
| if (statistics.contains("InAddrMaskReps")) { |
| icmp->set_inaddrmaskreps(statistics.at("InAddrMaskReps")); |
| } |
| if (statistics.contains("OutMsgs")) { |
| icmp->set_outmsgs(statistics.at("OutMsgs")); |
| } |
| if (statistics.contains("OutErrors")) { |
| icmp->set_outerrors(statistics.at("OutErrors")); |
| } |
| if (statistics.contains("OutDestUnreachs")) { |
| icmp->set_outdestunreachs(statistics.at("OutDestUnreachs")); |
| } |
| if (statistics.contains("OutTimeExcds")) { |
| icmp->set_outtimeexcds(statistics.at("OutTimeExcds")); |
| } |
| if (statistics.contains("OutParmProbs")) { |
| icmp->set_outparmprobs(statistics.at("OutParmProbs")); |
| } |
| if (statistics.contains("OutSrcQuenchs")) { |
| icmp->set_outsrcquenchs(statistics.at("OutSrcQuenchs")); |
| } |
| if (statistics.contains("OutRedirects")) { |
| icmp->set_outredirects(statistics.at("OutRedirects")); |
| } |
| if (statistics.contains("OutEchos")) { |
| icmp->set_outechos(statistics.at("OutEchos")); |
| } |
| if (statistics.contains("OutEchoReps")) { |
| icmp->set_outechoreps(statistics.at("OutEchoReps")); |
| } |
| if (statistics.contains("OutTimestamps")) { |
| icmp->set_outtimestamps(statistics.at("OutTimestamps")); |
| } |
| if (statistics.contains("OutTimestampReps")) { |
| icmp->set_outtimestampreps(statistics.at("OutTimestampReps")); |
| } |
| if (statistics.contains("OutAddrMasks")) { |
| icmp->set_outaddrmasks(statistics.at("OutAddrMasks")); |
| } |
| if (statistics.contains("OutAddrMaskReps")) { |
| icmp->set_outaddrmaskreps(statistics.at("OutAddrMaskReps")); |
| } |
| } |
| |
| |
| static void addTCPStatistics( |
| const hashmap<string, int64_t>& statistics, |
| ResourceStatistics* result) |
| { |
| SNMPStatistics *snmp = result->mutable_net_snmp_statistics(); |
| TcpStatistics *tcp = snmp->mutable_tcp_stats(); |
| |
| // TODO(cwang): Use protobuf reflection here. |
| if (statistics.contains("RtoAlgorithm")) { |
| tcp->set_rtoalgorithm(statistics.at("RtoAlgorithm")); |
| } |
| if (statistics.contains("RtoMin")) { |
| tcp->set_rtomin(statistics.at("RtoMin")); |
| } |
| if (statistics.contains("RtoMax")) { |
| tcp->set_rtomax(statistics.at("RtoMax")); |
| } |
| if (statistics.contains("MaxConn")) { |
| tcp->set_maxconn(statistics.at("MaxConn")); |
| } |
| if (statistics.contains("ActiveOpens")) { |
| tcp->set_activeopens(statistics.at("ActiveOpens")); |
| } |
| if (statistics.contains("PassiveOpens")) { |
| tcp->set_passiveopens(statistics.at("PassiveOpens")); |
| } |
| if (statistics.contains("AttemptFails")) { |
| tcp->set_attemptfails(statistics.at("AttemptFails")); |
| } |
| if (statistics.contains("EstabResets")) { |
| tcp->set_estabresets(statistics.at("EstabResets")); |
| } |
| if (statistics.contains("CurrEstab")) { |
| tcp->set_currestab(statistics.at("CurrEstab")); |
| } |
| if (statistics.contains("InSegs")) { |
| tcp->set_insegs(statistics.at("InSegs")); |
| } |
| if (statistics.contains("OutSegs")) { |
| tcp->set_outsegs(statistics.at("OutSegs")); |
| } |
| if (statistics.contains("RetransSegs")) { |
| tcp->set_retranssegs(statistics.at("RetransSegs")); |
| } |
| if (statistics.contains("InErrs")) { |
| tcp->set_inerrs(statistics.at("InErrs")); |
| } |
| if (statistics.contains("OutRsts")) { |
| tcp->set_outrsts(statistics.at("OutRsts")); |
| } |
| if (statistics.contains("InCsumErrors")) { |
| tcp->set_incsumerrors(statistics.at("InCsumErrors")); |
| } |
| } |
| |
| |
| static void addUDPStatistics( |
| const hashmap<string, int64_t>& statistics, |
| ResourceStatistics* result) |
| { |
| SNMPStatistics *snmp = result->mutable_net_snmp_statistics(); |
| UdpStatistics *udp = snmp->mutable_udp_stats(); |
| |
| // TODO(cwang): Use protobuf reflection here. |
| if (statistics.contains("InDatagrams")) { |
| udp->set_indatagrams(statistics.at("InDatagrams")); |
| } |
| if (statistics.contains("NoPorts")) { |
| udp->set_noports(statistics.at("NoPorts")); |
| } |
| if (statistics.contains("InErrors")) { |
| udp->set_inerrors(statistics.at("InErrors")); |
| } |
| if (statistics.contains("OutDatagrams")) { |
| udp->set_outdatagrams(statistics.at("OutDatagrams")); |
| } |
| if (statistics.contains("RcvbufErrors")) { |
| udp->set_rcvbuferrors(statistics.at("RcvbufErrors")); |
| } |
| if (statistics.contains("SndbufErrors")) { |
| udp->set_sndbuferrors(statistics.at("SndbufErrors")); |
| } |
| if (statistics.contains("InCsumErrors")) { |
| udp->set_incsumerrors(statistics.at("InCsumErrors")); |
| } |
| if (statistics.contains("IgnoredMulti")) { |
| udp->set_ignoredmulti(statistics.at("IgnoredMulti")); |
| } |
| } |
| |
| |
| int PortMappingStatistics::execute() |
| { |
| if (flags.help) { |
| cerr << "Usage: " << name() << " [OPTIONS]" << endl << endl |
| << "Supported options:" << endl |
| << flags.usage(); |
| return 0; |
| } |
| |
| if (flags.pid.isNone()) { |
| cerr << "The pid is not specified" << endl; |
| return 1; |
| } |
| |
| if (flags.eth0_name.isNone()) { |
| cerr << "The public interface name (e.g., eth0) is not specified" << endl; |
| return 1; |
| } |
| |
| // Enter the network namespace. |
| Try<Nothing> setns = ns::setns(flags.pid.get(), "net"); |
| if (setns.isError()) { |
| // This could happen if the executor exits before this function is |
| // invoked. We do not log here to avoid spurious logging. |
| return 1; |
| } |
| |
| ResourceStatistics result; |
| |
| // NOTE: We use a dummy value here since this field will be cleared |
| // before the result is sent to the containerizer. |
| result.set_timestamp(0); |
| |
| if (flags.enable_socket_statistics_summary) { |
| // Collections for socket statistics summary are below. |
| |
| // For TCP, get the number of ACTIVE and TIME_WAIT connections, |
| // from reading /proc/net/sockstat (/proc/net/sockstat6 for IPV6). |
| // This is not as expensive in the kernel because only counter |
| // values are accessed instead of a dump of all the sockets. |
| // Example output: |
| |
| // $ cat /proc/net/sockstat |
| // sockets: used 1391 |
| // TCP: inuse 33 orphan 0 tw 0 alloc 37 mem 6 |
| // UDP: inuse 15 mem 7 |
| // UDPLITE: inuse 0 |
| // RAW: inuse 0 |
| // FRAG: inuse 0 memory 0 |
| |
| Try<string> value = os::read("/proc/net/sockstat"); |
| if (value.isError()) { |
| cerr << "Failed to read /proc/net/sockstat: " << value.error() << endl; |
| return 1; |
| } |
| |
| foreach (const string& line, strings::tokenize(value.get(), "\n")) { |
| if (!strings::startsWith(line, "TCP")) { |
| continue; |
| } |
| |
| vector<string> tokens = strings::tokenize(line, " "); |
| for (size_t i = 0; i < tokens.size(); i++) { |
| if (tokens[i] == "inuse") { |
| if (i + 1 >= tokens.size()) { |
| cerr << "Unexpected output from /proc/net/sockstat" << endl; |
| // Be a bit forgiving here here since the /proc file |
| // output format can change, though not very likely. |
| continue; |
| } |
| |
| // Set number of active TCP connections. |
| Try<size_t> inuse = numify<size_t>(tokens[i+1]); |
| if (inuse.isError()) { |
| cerr << "Failed to parse the number of tcp connections in use: " |
| << inuse.error() << endl; |
| continue; |
| } |
| |
| result.set_net_tcp_active_connections(inuse.get()); |
| } else if (tokens[i] == "tw") { |
| if (i + 1 >= tokens.size()) { |
| cerr << "Unexpected output from /proc/net/sockstat" << endl; |
| // Be a bit forgiving here here since the /proc file |
| // output format can change, though not very likely. |
| continue; |
| } |
| |
| // Set number of TIME_WAIT TCP connections. |
| Try<size_t> tw = numify<size_t>(tokens[i+1]); |
| if (tw.isError()) { |
| cerr << "Failed to parse the number of tcp connections in" |
| << " TIME_WAIT: " << tw.error() << endl; |
| continue; |
| } |
| |
| result.set_net_tcp_time_wait_connections(tw.get()); |
| } |
| } |
| } |
| } |
| |
| if (flags.enable_socket_statistics_details) { |
| // Collections for socket statistics details are below. |
| |
| // NOTE: If the underlying library uses the older version of |
| // kernel API, the family argument passed in may not be honored. |
| Try<vector<diagnosis::socket::Info>> infos = |
| diagnosis::socket::infos(AF_INET, diagnosis::socket::state::ALL); |
| |
| if (infos.isError()) { |
| cerr << "Failed to retrieve the socket information" << endl; |
| return 1; |
| } |
| |
| vector<uint32_t> RTTs; |
| foreach (const diagnosis::socket::Info& info, infos.get()) { |
| // We double check on family regardless. |
| if (info.family != AF_INET) { |
| continue; |
| } |
| |
| // We consider all sockets that have non-zero rtt value. |
| if (info.tcpInfo.isSome() && info.tcpInfo->tcpi_rtt != 0) { |
| RTTs.push_back(info.tcpInfo->tcpi_rtt); |
| } |
| } |
| |
| // Only print to stdout when we have results. |
| if (RTTs.size() > 0) { |
| std::sort(RTTs.begin(), RTTs.end()); |
| |
| // NOTE: The size of RTTs is usually within 1 million so we |
| // don't need to worry about overflow here. |
| // TODO(jieyu): Right now, we choose to use "Nearest rank" for |
| // simplicity. Consider directly using the Statistics abstraction |
| // which computes "Linear interpolation between closest ranks". |
| // http://en.wikipedia.org/wiki/Percentile |
| size_t p50 = RTTs.size() * 50 / 100; |
| size_t p90 = RTTs.size() * 90 / 100; |
| size_t p95 = RTTs.size() * 95 / 100; |
| size_t p99 = RTTs.size() * 99 / 100; |
| |
| result.set_net_tcp_rtt_microsecs_p50(RTTs[p50]); |
| result.set_net_tcp_rtt_microsecs_p90(RTTs[p90]); |
| result.set_net_tcp_rtt_microsecs_p95(RTTs[p95]); |
| result.set_net_tcp_rtt_microsecs_p99(RTTs[p99]); |
| } |
| } |
| |
| if (flags.enable_snmp_statistics) { |
| Try<string> value = os::read("/proc/net/snmp"); |
| if (value.isError()) { |
| cerr << "Failed to read /proc/net/snmp: " << value.error() << endl; |
| return 1; |
| } |
| |
| hashmap<string, hashmap<string, int64_t>> SNMPStats; |
| vector<string> keys; |
| bool isKeyLine = true; |
| foreach (const string& line, strings::tokenize(value.get(), "\n")) { |
| vector<string> fields = strings::tokenize(line, ":"); |
| if (fields.size() != 2) { |
| cerr << "Failed to tokenize line '" << line << "' " |
| << " in /proc/net/snmp" << endl; |
| return 1; |
| } |
| vector<string> tokens = strings::tokenize(fields[1], " "); |
| if (isKeyLine) { |
| for (size_t i = 0; i < tokens.size(); i++) { |
| keys.push_back(tokens[i]); |
| } |
| } else { |
| hashmap<string, int64_t> stats; |
| for (size_t i = 0; i < tokens.size(); i++) { |
| Try<int64_t> val = numify<int64_t>(tokens[i]); |
| |
| if (val.isError()) { |
| cerr << "Failed to parse the statistics in " << fields[0] |
| << val.error() << endl; |
| return 1; |
| } |
| stats[keys[i]] = val.get(); |
| } |
| SNMPStats[fields[0]] = stats; |
| keys.clear(); |
| } |
| isKeyLine = !isKeyLine; |
| } |
| |
| addIPStatistics(SNMPStats["Ip"], &result); |
| addICMPStatistics(SNMPStats["Icmp"], &result); |
| addTCPStatistics(SNMPStats["Tcp"], &result); |
| addUDPStatistics(SNMPStats["Udp"], &result); |
| } |
| |
| // Collect traffic statistics for the container from the container |
| // virtual interface and export them in JSON. |
| const string& eth0 = flags.eth0_name.get(); |
| |
| // Overlimits are reported on the HTB qdisc at the egress root. |
| Result<hashmap<string, uint64_t>> statistics = |
| htb::statistics(eth0, EGRESS_ROOT); |
| |
| if (statistics.isSome()) { |
| addTrafficControlStatistics( |
| NET_ISOLATOR_BW_LIMIT, |
| statistics.get(), |
| &result); |
| } else if (statistics.isNone()) { |
| // Traffic control statistics are only available when the |
| // container is created on a slave when the egress rate limit is |
| // on (i.e., egress_rate_limit_per_container flag is set). We |
| // can't just test for that flag here however, since the slave may |
| // have been restarted with different flags since the container |
| // was created. It is also possible that isolator statistics are |
| // unavailable because we the container is in the process of being |
| // created or destroy. Hence we do not report a lack of network |
| // statistics as an error. |
| } else if (statistics.isError()) { |
| cerr << "Failed to get htb qdisc statistics on " << eth0 |
| << " in namespace " << flags.pid.get() << endl; |
| } |
| |
| // Drops due to the bandwidth limit should be reported at the leaf. |
| statistics = fq_codel::statistics(eth0, CONTAINER_TX_HTB_CLASS_ID); |
| if (statistics.isSome()) { |
| addTrafficControlStatistics( |
| NET_ISOLATOR_BLOAT_REDUCTION, |
| statistics.get(), |
| &result); |
| } else if (statistics.isNone()) { |
| // See discussion on network isolator statistics above. |
| } else if (statistics.isError()) { |
| cerr << "Failed to get fq_codel qdisc statistics on " << eth0 |
| << " in namespace " << flags.pid.get() << endl; |
| } |
| |
| cout << stringify(JSON::protobuf(result)); |
| return 0; |
| } |
| |
| |
| ///////////////////////////////////////////////// |
| // Implementation for the isolator. |
| ///////////////////////////////////////////////// |
| |
| PortMappingIsolatorProcess::Metrics::Metrics() |
| : adding_eth0_ip_filters_errors( |
| "port_mapping/adding_eth0_ip_filters_errors"), |
| adding_eth0_ip_filters_already_exist( |
| "port_mapping/adding_eth0_ip_filters_already_exist"), |
| adding_eth0_egress_filters_errors( |
| "port_mapping/adding_eth0_egress_filters_errors"), |
| adding_eth0_egress_filters_already_exist( |
| "port_mapping/adding_eth0_egress_filters_already_exist"), |
| adding_lo_ip_filters_errors( |
| "port_mapping/adding_lo_ip_filters_errors"), |
| adding_lo_ip_filters_already_exist( |
| "port_mapping/adding_lo_ip_filters_already_exist"), |
| adding_veth_ip_filters_errors( |
| "port_mapping/adding_veth_ip_filters_errors"), |
| adding_veth_ip_filters_already_exist( |
| "port_mapping/adding_veth_ip_filters_already_exist"), |
| adding_veth_icmp_filters_errors( |
| "port_mapping/adding_veth_icmp_filters_errors"), |
| adding_veth_icmp_filters_already_exist( |
| "port_mapping/adding_veth_icmp_filters_already_exist"), |
| adding_veth_arp_filters_errors( |
| "port_mapping/adding_veth_arp_filters_errors"), |
| adding_veth_arp_filters_already_exist( |
| "port_mapping/adding_veth_arp_filters_already_exist"), |
| adding_eth0_icmp_filters_errors( |
| "port_mapping/adding_eth0_icmp_filters_errors"), |
| adding_eth0_icmp_filters_already_exist( |
| "port_mapping/adding_eth0_icmp_filters_already_exist"), |
| adding_eth0_arp_filters_errors( |
| "port_mapping/adding_eth0_arp_filters_errors"), |
| adding_eth0_arp_filters_already_exist( |
| "port_mapping/adding_eth0_arp_filters_already_exist"), |
| removing_eth0_ip_filters_errors( |
| "port_mapping/removing_eth0_ip_filters_errors"), |
| removing_eth0_ip_filters_do_not_exist( |
| "port_mapping/removing_eth0_ip_filters_do_not_exist"), |
| removing_eth0_egress_filters_errors( |
| "port_mapping/removing_eth0_egress_filters_errors"), |
| removing_eth0_egress_filters_do_not_exist( |
| "port_mapping/removinging_eth0_egress_filters_do_not_exist"), |
| removing_lo_ip_filters_errors( |
| "port_mapping/removing_lo_ip_filters_errors"), |
| removing_lo_ip_filters_do_not_exist( |
| "port_mapping/removing_lo_ip_filters_do_not_exist"), |
| removing_veth_ip_filters_errors( |
| "port_mapping/removing_veth_ip_filters_errors"), |
| removing_veth_ip_filters_do_not_exist( |
| "port_mapping/removing_veth_ip_filters_do_not_exist"), |
| removing_eth0_icmp_filters_errors( |
| "port_mapping/removing_eth0_icmp_filters_errors"), |
| removing_eth0_icmp_filters_do_not_exist( |
| "port_mapping/removing_eth0_icmp_filters_do_not_exist"), |
| removing_eth0_arp_filters_errors( |
| "port_mapping/removing_eth0_arp_filters_errors"), |
| removing_eth0_arp_filters_do_not_exist( |
| "port_mapping/removing_eth0_arp_filters_do_not_exist"), |
| updating_eth0_icmp_filters_errors( |
| "port_mapping/updating_eth0_icmp_filters_errors"), |
| updating_eth0_icmp_filters_already_exist( |
| "port_mapping/updating_eth0_icmp_filters_already_exist"), |
| updating_eth0_icmp_filters_do_not_exist( |
| "port_mapping/updating_eth0_icmp_filters_do_not_exist"), |
| updating_eth0_arp_filters_errors( |
| "port_mapping/updating_eth0_arp_filters_errors"), |
| updating_eth0_arp_filters_already_exist( |
| "port_mapping/updating_eth0_arp_filters_already_exist"), |
| updating_eth0_arp_filters_do_not_exist( |
| "port_mapping/updating_eth0_arp_filters_do_not_exist"), |
| updating_container_ip_filters_errors( |
| "port_mapping/updating_container_ip_filters_errors") |
| { |
| process::metrics::add(adding_eth0_ip_filters_errors); |
| process::metrics::add(adding_eth0_ip_filters_already_exist); |
| process::metrics::add(adding_lo_ip_filters_errors); |
| process::metrics::add(adding_lo_ip_filters_already_exist); |
| process::metrics::add(adding_veth_ip_filters_errors); |
| process::metrics::add(adding_veth_ip_filters_already_exist); |
| process::metrics::add(adding_veth_icmp_filters_errors); |
| process::metrics::add(adding_veth_icmp_filters_already_exist); |
| process::metrics::add(adding_veth_arp_filters_errors); |
| process::metrics::add(adding_veth_arp_filters_already_exist); |
| process::metrics::add(adding_eth0_icmp_filters_errors); |
| process::metrics::add(adding_eth0_icmp_filters_already_exist); |
| process::metrics::add(adding_eth0_arp_filters_errors); |
| process::metrics::add(adding_eth0_arp_filters_already_exist); |
| process::metrics::add(removing_eth0_ip_filters_errors); |
| process::metrics::add(removing_eth0_ip_filters_do_not_exist); |
| process::metrics::add(removing_lo_ip_filters_errors); |
| process::metrics::add(removing_lo_ip_filters_do_not_exist); |
| process::metrics::add(removing_veth_ip_filters_errors); |
| process::metrics::add(removing_veth_ip_filters_do_not_exist); |
| process::metrics::add(removing_eth0_icmp_filters_errors); |
| process::metrics::add(removing_eth0_icmp_filters_do_not_exist); |
| process::metrics::add(removing_eth0_arp_filters_errors); |
| process::metrics::add(removing_eth0_arp_filters_do_not_exist); |
| process::metrics::add(updating_eth0_icmp_filters_errors); |
| process::metrics::add(updating_eth0_icmp_filters_already_exist); |
| process::metrics::add(updating_eth0_icmp_filters_do_not_exist); |
| process::metrics::add(updating_eth0_arp_filters_errors); |
| process::metrics::add(updating_eth0_arp_filters_already_exist); |
| process::metrics::add(updating_eth0_arp_filters_do_not_exist); |
| process::metrics::add(updating_container_ip_filters_errors); |
| } |
| |
| |
| PortMappingIsolatorProcess::Metrics::~Metrics() |
| { |
| process::metrics::remove(adding_eth0_ip_filters_errors); |
| process::metrics::remove(adding_eth0_ip_filters_already_exist); |
| process::metrics::remove(adding_lo_ip_filters_errors); |
| process::metrics::remove(adding_lo_ip_filters_already_exist); |
| process::metrics::remove(adding_veth_ip_filters_errors); |
| process::metrics::remove(adding_veth_ip_filters_already_exist); |
| process::metrics::remove(adding_veth_icmp_filters_errors); |
| process::metrics::remove(adding_veth_icmp_filters_already_exist); |
| process::metrics::remove(adding_veth_arp_filters_errors); |
| process::metrics::remove(adding_veth_arp_filters_already_exist); |
| process::metrics::remove(adding_eth0_icmp_filters_errors); |
| process::metrics::remove(adding_eth0_icmp_filters_already_exist); |
| process::metrics::remove(adding_eth0_arp_filters_errors); |
| process::metrics::remove(adding_eth0_arp_filters_already_exist); |
| process::metrics::remove(removing_eth0_ip_filters_errors); |
| process::metrics::remove(removing_eth0_ip_filters_do_not_exist); |
| process::metrics::remove(removing_lo_ip_filters_errors); |
| process::metrics::remove(removing_lo_ip_filters_do_not_exist); |
| process::metrics::remove(removing_veth_ip_filters_errors); |
| process::metrics::remove(removing_veth_ip_filters_do_not_exist); |
| process::metrics::remove(removing_eth0_icmp_filters_errors); |
| process::metrics::remove(removing_eth0_icmp_filters_do_not_exist); |
| process::metrics::remove(removing_eth0_arp_filters_errors); |
| process::metrics::remove(removing_eth0_arp_filters_do_not_exist); |
| process::metrics::remove(updating_eth0_icmp_filters_errors); |
| process::metrics::remove(updating_eth0_icmp_filters_already_exist); |
| process::metrics::remove(updating_eth0_icmp_filters_do_not_exist); |
| process::metrics::remove(updating_eth0_arp_filters_errors); |
| process::metrics::remove(updating_eth0_arp_filters_already_exist); |
| process::metrics::remove(updating_eth0_arp_filters_do_not_exist); |
| process::metrics::remove(updating_container_ip_filters_errors); |
| } |
| |
| |
| Try<Isolator*> PortMappingIsolatorProcess::create(const Flags& flags) |
| { |
| // Check for root permission. |
| if (geteuid() != 0) { |
| return Error("Using network isolator requires root permissions"); |
| } |
| |
| // Verify that the network namespace is available by checking the |
| // existence of the network namespace handle of the current process. |
| Try<bool> netSupported = ns::supported(CLONE_NEWNET); |
| if (netSupported.isError() || !netSupported.get()) { |
| return Error( |
| "Using network isolator requires network namespace. " |
| "Make sure your kernel is newer than 3.4"); |
| } |
| |
| // Check the routing library. |
| Try<Nothing> check = routing::check(); |
| if (check.isError()) { |
| return Error( |
| "Routing library check failed: " + |
| check.error()); |
| } |
| |
| // Check the availability of a few Linux commands that we will use. |
| // We use the blocking os::shell here because 'create' will only be |
| // invoked during initialization. |
| Try<string> checkCommandTc = os::shell("tc filter show"); |
| if (checkCommandTc.isError()) { |
| return Error("Check command 'tc' failed: " + checkCommandTc.error()); |
| } |
| |
| // NOTE: loopback device always exists. |
| Try<string> checkCommandEthtool = os::shell("ethtool -k lo"); |
| if (checkCommandEthtool.isError()) { |
| return Error("Check command 'ethtool' failed: " |
| + checkCommandEthtool.error()); |
| } |
| |
| Try<string> checkCommandIp = os::shell("ip link show"); |
| if (checkCommandIp.isError()) { |
| return Error("Check command 'ip' failed: " + checkCommandIp.error()); |
| } |
| |
| Try<Resources> resources = Resources::parse( |
| flags.resources.getOrElse(""), |
| flags.default_role); |
| |
| if (resources.isError()) { |
| return Error("Failed to parse --resources: " + resources.error()); |
| } |
| |
| // Get 'ports' resource from 'resources' flag. These ports will be |
| // treated as non-ephemeral ports. |
| IntervalSet<uint16_t> nonEphemeralPorts; |
| if (resources->ports().isSome()) { |
| Try<IntervalSet<uint16_t>> ports = rangesToIntervalSet<uint16_t>( |
| resources->ports().get()); |
| |
| if (ports.isError()) { |
| return Error( |
| "Invalid ports resource '" + |
| stringify(resources->ports().get()) + |
| "': " + ports.error()); |
| } |
| |
| nonEphemeralPorts = ports.get(); |
| } |
| |
| // Get 'ephemeral_ports' resource from 'resources' flag. These ports |
| // will be allocated to each container as ephemeral ports. |
| IntervalSet<uint16_t> ephemeralPorts; |
| if (resources->ephemeral_ports().isSome()) { |
| Try<IntervalSet<uint16_t>> ports = rangesToIntervalSet<uint16_t>( |
| resources->ephemeral_ports().get()); |
| |
| if (ports.isError()) { |
| return Error( |
| "Invalid ephemeral ports resource '" + |
| stringify(resources->ephemeral_ports().get()) + |
| "': " + ports.error()); |
| } |
| |
| ephemeralPorts = ports.get(); |
| } |
| |
| // Each container requires at least one ephemeral port for slave |
| // executor communication. If no 'ephemeral_ports' resource is |
| // found, we will return error. |
| if (ephemeralPorts.empty()) { |
| return Error("Ephemeral ports are not specified"); |
| } |
| |
| // Sanity check to make sure that the ephemeral ports specified do |
| // not intersect with the specified non-ephemeral ports. |
| if (ephemeralPorts.intersects(nonEphemeralPorts)) { |
| return Error( |
| "The specified ephemeral ports " + stringify(ephemeralPorts) + |
| " intersect with the specified non-ephemeral ports " + |
| stringify(nonEphemeralPorts)); |
| } |
| |
| // This is a sanity check to make sure that the ephemeral ports |
| // specified do not intersect with the well known ports. |
| if (ephemeralPorts.intersects(WELL_KNOWN_PORTS())) { |
| return Error( |
| "The specified ephemeral ports " + stringify(ephemeralPorts) + |
| " intersect with well known ports " + stringify(WELL_KNOWN_PORTS())); |
| } |
| |
| // Obtain the host ephemeral port range by reading the proc file |
| // system ('ip_local_port_range'). |
| Try<string> value = os::read("/proc/sys/net/ipv4/ip_local_port_range"); |
| if (value.isError()) { |
| return Error("Failed to read host ip_local_port_range: " + value.error()); |
| } |
| |
| vector<string> split = strings::split(strings::trim(value.get()), "\t"); |
| if (split.size() != 2) { |
| return Error( |
| "Unexpected format from host ip_local_port_range: " + value.get()); |
| } |
| |
| Try<uint16_t> begin = numify<uint16_t>(split[0]); |
| if (begin.isError()) { |
| return Error( |
| "Failed to parse the begin of host ip_local_port_range: " + split[0]); |
| } |
| |
| Try<uint16_t> end = numify<uint16_t>(split[1]); |
| if (end.isError()) { |
| return Error( |
| "Failed to parse the end of host ip_local_port_range: " + split[1]); |
| } |
| |
| Interval<uint16_t> hostEphemeralPorts = |
| (Bound<uint16_t>::closed(begin.get()), |
| Bound<uint16_t>::closed(end.get())); |
| |
| // Sanity check to make sure the specified ephemeral ports do not |
| // intersect with the ephemeral ports used by the host. |
| if (ephemeralPorts.intersects(hostEphemeralPorts)) { |
| return Error( |
| "The specified ephemeral ports " + stringify(ephemeralPorts) + |
| " intersect with the ephemeral ports used by the host " + |
| stringify(hostEphemeralPorts)); |
| } |
| |
| // TODO(chzhcn): Cross check ephemeral ports with used ports on the |
| // host (e.g., using port scan). |
| |
| // Initialize the ephemeral ports allocator. |
| |
| // In theory, any positive integer can be broken up into a few |
| // numbers that are power of 2 aligned. We choose to not allow this |
| // for now so that each container has a fixed (one) number of |
| // filters for ephemeral ports. This makes it easy to debug and |
| // infer performance. |
| if (roundDownToPowerOfTwo(flags.ephemeral_ports_per_container) != |
| flags.ephemeral_ports_per_container) { |
| return Error( |
| "The number of ephemeral ports for each container (" + |
| stringify(flags.ephemeral_ports_per_container) + |
| ") is not a power of 2"); |
| } |
| |
| if (ephemeralPorts.size() < flags.ephemeral_ports_per_container) { |
| return Error( |
| "Network Isolator is given ephemeral ports of size: " + |
| stringify(ephemeralPorts.size()) + ", but asked to allocate " + |
| stringify(flags.ephemeral_ports_per_container) + |
| " ephemeral ports for a container"); |
| } |
| |
| if (flags.ephemeral_ports_per_container < MIN_EPHEMERAL_PORTS_SIZE) { |
| return Error( |
| "Each container has only " + |
| stringify(flags.ephemeral_ports_per_container) + |
| " ephemeral ports. The minimum required is: " + |
| stringify(MIN_EPHEMERAL_PORTS_SIZE)); |
| } |
| |
| Owned<EphemeralPortsAllocator> ephemeralPortsAllocator( |
| new EphemeralPortsAllocator( |
| ephemeralPorts, |
| flags.ephemeral_ports_per_container)); |
| |
| // Get the name of the public interface (e.g., eth0). If it is not |
| // specified, try to derive its name from the routing library. |
| Result<string> eth0 = link::eth0(); |
| if (flags.eth0_name.isSome()) { |
| eth0 = flags.eth0_name.get(); |
| |
| // Check if the given public interface exists. |
| Try<bool> hostEth0Exists = link::exists(eth0.get()); |
| if (hostEth0Exists.isError()) { |
| return Error( |
| "Failed to check if " + eth0.get() + " exists: " + |
| hostEth0Exists.error()); |
| } else if (!hostEth0Exists.get()) { |
| return Error("The public interface " + eth0.get() + " does not exist"); |
| } |
| } else if (!eth0.isSome()){ |
| // eth0 is not specified in the flag and we did not get a valid |
| // eth0 from the library. |
| return Error( |
| "Network Isolator failed to find a public interface: " + |
| (eth0.isError() ? eth0.error() : "does not have a public interface")); |
| } |
| |
| LOG(INFO) << "Using " << eth0.get() << " as the public interface"; |
| |
| // Get the name of the loopback interface. If it is not specified, |
| // try to derive its name based on the loopback IP address. |
| Result<string> lo = link::lo(); |
| // Option<string> lo = flags.lo_name; |
| if (flags.lo_name.isSome()) { |
| lo = flags.lo_name; |
| |
| // Check if the given loopback interface exists. |
| Try<bool> hostLoExists = link::exists(lo.get()); |
| if (hostLoExists.isError()) { |
| return Error( |
| "Failed to check if " + lo.get() + " exists: " + |
| hostLoExists.error()); |
| } else if (!hostLoExists.get()) { |
| return Error("The loopback interface " + lo.get() + " does not exist"); |
| } |
| } else if (!lo.isSome()) { |
| // lo is not specified in the flag and we did not get a valid |
| // lo from the library. |
| return Error( |
| "Network Isolator failed to find a loopback interface: " + |
| (lo.isError() ? lo.error() : "does not have a loopback interface")); |
| } |
| |
| LOG(INFO) << "Using " << lo.get() << " as the loopback interface"; |
| |
| // If egress rate limit is provided, do a sanity check that it is |
| // not greater than the host physical link speed. |
| Option<Bytes> egressRateLimitPerContainer; |
| if (flags.egress_rate_limit_per_container.isSome()) { |
| // Read host physical link speed from /sys/class/net/eth0/speed. |
| // This value is in MBits/s. Some distribution does not support |
| // reading speed (depending on the driver). If that's the case, |
| // simply print warnings. |
| const string eth0SpeedPath = |
| path::join("/sys/class/net", eth0.get(), "speed"); |
| |
| if (!os::exists(eth0SpeedPath)) { |
| LOG(WARNING) << "Cannot determine link speed of " << eth0.get() |
| << ": '" << eth0SpeedPath << "' does not exist"; |
| } else { |
| Try<string> value = os::read(eth0SpeedPath); |
| if (value.isError()) { |
| // NOTE: Even if the speed file exists, the read might fail if |
| // the driver does not support reading the speed. Therefore, |
| // we print a warning here, instead of failing. |
| LOG(WARNING) << "Cannot determine link speed of " << eth0.get() |
| << ": Failed to read '" << eth0SpeedPath |
| << "': " << value.error(); |
| } else { |
| Try<uint64_t> hostLinkSpeed = |
| numify<uint64_t>(strings::trim(value.get())); |
| |
| CHECK_SOME(hostLinkSpeed); |
| |
| // It could be possible that the nic driver doesn't support |
| // reporting physical link speed. In that case, report error. |
| if (hostLinkSpeed.get() == 0xFFFFFFFF) { |
| LOG(WARNING) << "Link speed reporting is not supported for '" |
| << eth0.get() + "'"; |
| } else { |
| // Convert host link speed to Bytes/s for comparason. |
| if (hostLinkSpeed.get() * 1000000 / 8 < |
| flags.egress_rate_limit_per_container->bytes()) { |
| return Error( |
| "The given egress traffic limit for containers " + |
| stringify(flags.egress_rate_limit_per_container->bytes()) + |
| " Bytes/s is greater than the host link speed " + |
| stringify(hostLinkSpeed.get() * 1000000 / 8) + " Bytes/s"); |
| } |
| } |
| } |
| } |
| |
| if (flags.egress_rate_limit_per_container.get() != Bytes(0)) { |
| egressRateLimitPerContainer = flags.egress_rate_limit_per_container.get(); |
| } else { |
| LOG(WARNING) << "Ignoring the given zero egress rate limit"; |
| } |
| } |
| |
| // Get the host IP network, MAC and default gateway. |
| Result<net::IP::Network> hostIPNetwork = |
| net::IP::Network::fromLinkDevice(eth0.get(), AF_INET); |
| |
| if (!hostIPNetwork.isSome()) { |
| return Error( |
| "Failed to get the public IP network of " + eth0.get() + ": " + |
| (hostIPNetwork.isError() ? |
| hostIPNetwork.error() : |
| "does not have an IPv4 network")); |
| } |
| |
| Result<net::MAC> hostMAC = net::mac(eth0.get()); |
| if (!hostMAC.isSome()) { |
| return Error( |
| "Failed to get the MAC address of " + eth0.get() + ": " + |
| (hostMAC.isError() ? hostMAC.error() : "does not have a MAC address")); |
| } |
| |
| Result<net::IP> hostDefaultGateway = route::defaultGateway(); |
| if (!hostDefaultGateway.isSome()) { |
| return Error( |
| "Failed to get the default gateway of the host: " + |
| (hostDefaultGateway.isError() ? hostDefaultGateway.error() |
| : "The default gateway of the host does not exist")); |
| } |
| |
| // Set the MAC address of the host loopback interface (lo) so that |
| // it matches that of the host public interface (eth0). A fairly |
| // recent kernel patch is needed for this operation to succeed: |
| // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/: |
| // 25f929fbff0d1bcebf2e92656d33025cd330cbf8 |
| Try<bool> setHostLoMAC = link::setMAC(lo.get(), hostMAC.get()); |
| if (setHostLoMAC.isError()) { |
| return Error( |
| "Failed to set the MAC address of " + lo.get() + |
| ": " + setHostLoMAC.error()); |
| } |
| |
| // Set the MTU of the host loopback interface (lo) so that it |
| // matches that of the host public interface (eth0). |
| Result<unsigned int> hostEth0MTU = link::mtu(eth0.get()); |
| if (hostEth0MTU.isError()) { |
| return Error( |
| "Failed to get the MTU of " + eth0.get() + |
| ": " + hostEth0MTU.error()); |
| } |
| |
| // The host public interface should exist since we just checked it. |
| CHECK_SOME(hostEth0MTU); |
| |
| Try<bool> setHostLoMTU = link::setMTU(lo.get(), hostEth0MTU.get()); |
| if (setHostLoMTU.isError()) { |
| return Error( |
| "Failed to set the MTU of " + lo.get() + |
| ": " + setHostLoMTU.error()); |
| } |
| |
| // Prepare the ingress queueing disciplines on host public interface |
| // (eth0) and host loopback interface (lo). |
| Try<bool> createHostEth0IngressQdisc = ingress::create(eth0.get()); |
| if (createHostEth0IngressQdisc.isError()) { |
| return Error( |
| "Failed to create the ingress qdisc on " + eth0.get() + |
| ": " + createHostEth0IngressQdisc.error()); |
| } |
| |
| set<uint16_t> freeFlowIds; |
| Handle hostTxFqCodelHandle = HOST_TX_FQ_CODEL_HANDLE; |
| |
| if (flags.egress_unique_flow_per_container) { |
| Try<Handle> egressParentHandle = Handle::parse( |
| flags.egress_flow_classifier_parent); |
| |
| if (egressParentHandle.isError()) { |
| return Error( |
| "Failed to parse egress_flow_classifier_parent: " + |
| egressParentHandle.error()); |
| } |
| |
| if (egressParentHandle.get() != EGRESS_ROOT) { |
| // TODO(cwang): This is just a guess, we do not know if this |
| // handle is available or not. |
| hostTxFqCodelHandle = Handle(egressParentHandle->primary() + 1, 0); |
| } |
| |
| // Prepare a fq_codel queueing discipline on host public interface |
| // (eth0) for egress flow classification. |
| Try<bool> existHostEth0EgressFqCodel = |
| fq_codel::exists(eth0.get(), egressParentHandle.get()); |
| |
| if (existHostEth0EgressFqCodel.isError()) { |
| return Error( |
| "Failed to check egress qdisc existence on " + eth0.get() + |
| ": " + existHostEth0EgressFqCodel.error()); |
| } |
| |
| if (existHostEth0EgressFqCodel.get()) { |
| // TODO(cwang): We don't know if this existed fq_codel qdisc is |
| // created by ourselves or someone else. |
| LOG(INFO) << "Using an existed egress qdisc on " << eth0.get(); |
| } else { |
| // Either there is no qdisc at all, or there is some non-fq_codel |
| // qdisc at root. We try to create one to check which is true. |
| Try<bool> createHostEth0EgressQdisc = fq_codel::create( |
| eth0.get(), |
| egressParentHandle.get(), |
| hostTxFqCodelHandle); |
| |
| if (createHostEth0EgressQdisc.isError()) { |
| return Error( |
| "Failed to create the egress qdisc on " + eth0.get() + |
| ": " + createHostEth0EgressQdisc.error()); |
| } else if (!createHostEth0EgressQdisc.get()) { |
| // TODO(cwang): Maybe we can continue when some other egress |
| // qdisc exists because this is not a necessary qdisc for |
| // network isolation, but we don't want inconsistency, so we |
| // just fail in this case. See details in MESOS-2370. |
| return Error("Egress qdisc already exists on " + eth0.get()); |
| } |
| } |
| |
| // TODO(cwang): Make sure DEFAULT_FLOWS is large enough so that |
| // it's unlikely to run out of free flow IDs. |
| for (uint16_t i = CONTAINER_MIN_FLOWID; i < fq_codel::DEFAULT_FLOWS; i++) { |
| freeFlowIds.insert(i); |
| } |
| } |
| |
| Try<bool> createHostLoQdisc = ingress::create(lo.get()); |
| if (createHostLoQdisc.isError()) { |
| return Error( |
| "Failed to create the ingress qdisc on " + lo.get() + |
| ": " + createHostLoQdisc.error()); |
| } |
| |
| // Enable 'route_localnet' on host loopback interface (lo). This |
| // enables the use of 127.0.0.1/8 for local routing purpose. This |
| // feature only exists on kernel 3.6 or newer. |
| const string loRouteLocalnet = |
| path::join("/proc/sys/net/ipv4/conf", lo.get(), "route_localnet"); |
| |
| if (!os::exists(loRouteLocalnet)) { |
| // TODO(jieyu): Consider supporting running the isolator if this |
| // feature is not available. We need to conditionally disable |
| // routing for 127.0.0.1/8, and ask the tasks to use the public IP |
| // for container to container and container to host communication. |
| return Error("The kernel does not support 'route_localnet'"); |
| } |
| |
| Try<Nothing> write = os::write(loRouteLocalnet, "1"); |
| if (write.isError()) { |
| return Error( |
| "Failed to enable route_localnet for " + lo.get() + |
| ": " + write.error()); |
| } |
| |
| // We disable 'rp_filter' and 'send_redirects' for host loopback |
| // interface (lo) to work around a kernel bug, which was only |
| // recently addressed in upstream in the following 3 commits. |
| // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/: |
| // 6a662719c9868b3d6c7d26b3a085f0cd3cc15e64 |
| // 0d5edc68739f1c1e0519acbea1d3f0c1882a15d7 |
| // e374c618b1465f0292047a9f4c244bd71ab5f1f0 |
| // The workaround ensures packets don't get dropped at lo. |
| write = os::write("/proc/sys/net/ipv4/conf/all/rp_filter", "0"); |
| if (write.isError()) { |
| return Error( |
| "Failed to disable rp_filter for all: " + write.error()); |
| } |
| |
| write = os::write(path::join( |
| "/proc/sys/net/ipv4/conf", lo.get(), "rp_filter"), "0"); |
| if (write.isError()) { |
| return Error( |
| "Failed to disable rp_filter for " + lo.get() + |
| ": " + write.error()); |
| } |
| |
| write = os::write("/proc/sys/net/ipv4/conf/all/send_redirects", "0"); |
| if (write.isError()) { |
| return Error( |
| "Failed to disable send_redirects for all: " + write.error()); |
| } |
| |
| write = os::write(path::join( |
| "/proc/sys/net/ipv4/conf", lo.get(), "send_redirects"), "0"); |
| if (write.isError()) { |
| return Error( |
| "Failed to disable send_redirects for " + lo.get() + |
| ": " + write.error()); |
| } |
| |
| // We need to enable accept_local on host loopback interface (lo) |
| // for kernels older than 3.6. Refer to the following: |
| // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/: |
| // 7a9bc9b81a5bc6e44ebc80ef781332e4385083f2 |
| // https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt |
| write = os::write(path::join( |
| "/proc/sys/net/ipv4/conf", lo.get(), "accept_local"), "1"); |
| if (write.isError()) { |
| return Error( |
| "Failed to enable accept_local for " + lo.get() + |
| ": " + write.error()); |
| } |
| |
| // Reading host network configurations. Each container will match |
| // these configurations. |
| hashset<string> procs; |
| |
| // TODO(jieyu): The following is a partial list of all the |
| // configurations. In the future, we may want to expose these |
| // configurations using ContainerInfo. |
| |
| // The kernel will use a default value for the following |
| // configurations inside a container. Therefore, we need to set them |
| // in the container to match that on the host. |
| procs.insert("/proc/sys/net/core/somaxconn"); |
| |
| // As of kernel 3.10, the following configurations are shared |
| // between host and containers, and therefore are not required to be |
| // set in containers. We keep them here just in case the kernel |
| // changes in the future. |
| procs.insert("/proc/sys/net/core/netdev_max_backlog"); |
| procs.insert("/proc/sys/net/core/rmem_max"); |
| procs.insert("/proc/sys/net/core/wmem_max"); |
| procs.insert("/proc/sys/net/ipv4/tcp_keepalive_time"); |
| procs.insert("/proc/sys/net/ipv4/tcp_keepalive_intvl"); |
| procs.insert("/proc/sys/net/ipv4/tcp_keepalive_probes"); |
| procs.insert("/proc/sys/net/ipv4/tcp_max_syn_backlog"); |
| procs.insert("/proc/sys/net/ipv4/tcp_rmem"); |
| procs.insert("/proc/sys/net/ipv4/tcp_retries2"); |
| procs.insert("/proc/sys/net/ipv4/tcp_synack_retries"); |
| procs.insert("/proc/sys/net/ipv4/tcp_wmem"); |
| procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh1"); |
| procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh2"); |
| procs.insert("/proc/sys/net/ipv4/neigh/default/gc_thresh3"); |
| |
| hashmap<string, string> hostNetworkConfigurations; |
| foreach (const string& proc, procs) { |
| Try<string> value = os::read(proc); |
| if (value.isSome()) { |
| LOG(INFO) << proc << " = '" << strings::trim(value.get()) << "'"; |
| hostNetworkConfigurations[proc] = strings::trim(value.get()); |
| } |
| } |
| |
| // Self bind mount PORT_MAPPING_BIND_MOUNT_ROOT(). Since we use a |
| // new mount namespace for each container, for this mount point, we |
| // set '--make-rshared' on the host and set '--make-rslave' inside |
| // each container. This is important because when we unmount the |
| // network namespace handles on the host, those handles will be |
| // unmounted in the containers as well, but NOT vice versa. |
| const string portMappingBindMountRoot = PORT_MAPPING_BIND_MOUNT_ROOT(); |
| |
| // We create the bind mount directory if it does not exist. |
| Try<Nothing> mkdir = os::mkdir(portMappingBindMountRoot); |
| if (mkdir.isError()) { |
| return Error( |
| "Failed to create the bind mount root directory at '" + |
| portMappingBindMountRoot + "': " + mkdir.error()); |
| } |
| |
| // We need to get the realpath for the bind mount root since on some |
| // Linux distribution, The bind mount root (i.e., /var/run/netns) |
| // might contain symlink. |
| Result<string> bindMountRoot = os::realpath(portMappingBindMountRoot); |
| if (!bindMountRoot.isSome()) { |
| return Error( |
| "Failed to get realpath for bind mount root '" + |
| PORT_MAPPING_BIND_MOUNT_ROOT() + "': " + |
| (bindMountRoot.isError() ? bindMountRoot.error() : "Not found")); |
| } |
| |
| // Now, check '/proc/self/mounts' to see if the bind mount root has |
| // already been self mounted. |
| Try<fs::MountInfoTable> mountTable = fs::MountInfoTable::read(); |
| if (mountTable.isError()) { |
| return Error("Failed to read the mount table: " + mountTable.error()); |
| } |
| |
| Option<fs::MountInfoTable::Entry> bindMountEntry; |
| foreach (const fs::MountInfoTable::Entry& entry, mountTable->entries) { |
| if (entry.target == bindMountRoot.get()) { |
| bindMountEntry = entry; |
| } |
| } |
| |
| // Do a self bind mount if needed. If the mount already exists, make |
| // sure it is a shared mount of its own peer group. |
| if (bindMountEntry.isNone()) { |
| // NOTE: Instead of using fs::mount to perform the bind mount, we |
| // use the shell command here because the syscall 'mount' does not |
| // update the mount table (i.e., /etc/mtab). In other words, the |
| // mount will not be visible if the operator types command |
| // 'mount'. Since this mount will still be presented after all |
| // containers and the slave are stopped, it's better to make it |
| // visible. It's OK to use the blocking os::shell here because |
| // 'create' will only be invoked during initialization. |
| Try<string> mount = os::shell( |
| "mount --bind %s %s && " |
| "mount --make-slave %s && " |
| "mount --make-shared %s", |
| bindMountRoot->c_str(), |
| bindMountRoot->c_str(), |
| bindMountRoot->c_str(), |
| bindMountRoot->c_str()); |
| |
| if (mount.isError()) { |
| return Error( |
| "Failed to self bind mount '" + bindMountRoot.get() + |
| "' and make it a shared mount: " + mount.error()); |
| } |
| } else { |
| if (bindMountEntry->shared().isNone()) { |
| // This is the case where the work directory mount is not a |
| // shared mount yet (possibly due to slave crash while preparing |
| // the work directory mount). It's safe to re-do the following. |
| Try<string> mount = os::shell( |
| "mount --make-slave %s && " |
| "mount --make-shared %s", |
| bindMountRoot->c_str(), |
| bindMountRoot->c_str()); |
| |
| if (mount.isError()) { |
| return Error( |
| "Failed to self bind mount '" + bindMountRoot.get() + |
| "' and make it a shared mount: " + mount.error()); |
| } |
| } else { |
| // We need to make sure that the shared mount is in its own peer |
| // group. To check that, we need to get the parent mount. |
| foreach (const fs::MountInfoTable::Entry& entry, mountTable->entries) { |
| if (entry.id == bindMountEntry->parent) { |
| // If the bind mount root and its parent mount are in the |
| // same peer group, we need to re-do the following commands |
| // so that they are in different peer groups. |
| if (entry.shared() == bindMountEntry->shared()) { |
| Try<string> mount = os::shell( |
| "mount --make-slave %s && " |
| "mount --make-shared %s", |
| bindMountRoot->c_str(), |
| bindMountRoot->c_str()); |
| |
| if (mount.isError()) { |
| return Error( |
| "Failed to self bind mount '" + bindMountRoot.get() + |
| "' and make it a shared mount: " + mount.error()); |
| } |
| } |
| |
| break; |
| } |
| } |
| } |
| } |
| |
| // Create the network namespace handle symlink directory if it does |
| // not exist. It is used to host from network namespace handle |
| // symlinks whose basename is a container ID. This allows us to |
| // recover container IDs for orphan containers (i.e., not known by |
| // the slave). This is introduced in 0.23.0. |
| mkdir = os::mkdir(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()); |
| if (mkdir.isError()) { |
| return Error( |
| "Failed to create the bind mount root directory at " + |
| PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() + ": " + mkdir.error()); |
| } |
| |
| return new MesosIsolator(Owned<MesosIsolatorProcess>( |
| new PortMappingIsolatorProcess( |
| flags, |
| bindMountRoot.get(), |
| eth0.get(), |
| lo.get(), |
| hostMAC.get(), |
| hostIPNetwork.get(), |
| hostEth0MTU.get(), |
| hostDefaultGateway.get(), |
| hostTxFqCodelHandle, |
| hostNetworkConfigurations, |
| egressRateLimitPerContainer, |
| nonEphemeralPorts, |
| ephemeralPortsAllocator, |
| freeFlowIds))); |
| } |
| |
| |
| Future<Nothing> PortMappingIsolatorProcess::recover( |
| const vector<ContainerState>& states, |
| const hashset<ContainerID>& orphans) |
| { |
| // Extract pids from virtual device names (veth). This tells us |
| // about all the potential live containers on this slave. |
| Try<set<string>> links = net::links(); |
| if (links.isError()) { |
| return Failure("Failed to get all the links: " + links.error()); |
| } |
| |
| hashset<pid_t> pids; |
| foreach (const string& name, links.get()) { |
| Option<pid_t> pid = getPidFromVeth(name); |
| // Not all links follow the naming: mesos{pid}, so we simply |
| // continue, e.g., eth0. |
| if (pid.isNone()) { |
| continue; |
| } else if (pids.contains(pid.get())) { |
| return Failure("Two virtual devices have the same name '" + name + "'"); |
| } |
| |
| pids.insert(pid.get()); |
| } |
| |
| // Scan the bind mount root to cleanup all stale network namespace |
| // handles that do not have an active veth associated with. |
| Try<list<string>> entries = os::ls(bindMountRoot); |
| if (entries.isError()) { |
| return Failure( |
| "Failed to list bind mount root '" + bindMountRoot + |
| "': " + entries.error()); |
| } |
| |
| foreach (const string& entry, entries.get()) { |
| const string path = path::join(bindMountRoot, entry); |
| |
| // NOTE: We expect all regular files whose names are numbers under |
| // the bind mount root are network namespace handles. |
| Result<pid_t> pid = getPidFromNamespaceHandle(path); |
| if (pid.isError()) { |
| return Failure( |
| "Failed to get pid from network namespace handle '" + |
| path + "': " + pid.error()); |
| } else if (pid.isNone()) { |
| // We ignore files that are clearly not network namespace |
| // handles created by us. It's likely that those are created by |
| // users or other tools. |
| LOG(WARNING) << "Unrecognized network namespace handle '" << path << "'"; |
| continue; |
| } |
| |
| // We cleanup the network namespace handle if the associated |
| // containers have clearly exited (i.e., the veth has gone). The |
| // cleanup here is best effort. |
| if (!pids.contains(pid.get())) { |
| LOG(INFO) << "Removing stale network namespace handle '" << path << "'"; |
| |
| Try<Nothing> unmount = fs::unmount(path, MNT_DETACH); |
| if (unmount.isError()) { |
| LOG(WARNING) << "Failed to unmount stale network namespace handle '" |
| << path << "': " << unmount.error(); |
| } |
| |
| Try<Nothing> rm = os::rm(path); |
| if (rm.isError()) { |
| LOG(WARNING) << "Failed to remove stale network namespace handle '" |
| << path << "': " << rm.error(); |
| } |
| } |
| } |
| |
| // Scan the bind mount symlink root for container IDs. This allows us |
| // to recover container IDs for orphan containers (i.e., not known |
| // by the slave). This is introduced in 0.23.0. |
| entries = os::ls(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT()); |
| if (entries.isError()) { |
| return Failure( |
| "Failed to list bind mount symlink root '" + |
| PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() + |
| "': " + entries.error()); |
| } |
| |
| // This map stores the mapping between pids and container IDs |
| // recovered from the bind mount root that have valid veth links. We |
| // use a multihashmap here because multiple container IDs can map to |
| // the same pid if the removal of a symlink fails in '_cleanup()' |
| // and the pid is reused by a new container. |
| multihashmap<pid_t, ContainerID> linkers; |
| |
| foreach (const string& entry, entries.get()) { |
| const string path = |
| path::join(PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT(), entry); |
| |
| // We only create symlinks in this directory and assume |
| // non-symlink files are created by other users or tools, |
| // therefore will be ignored. |
| if (!os::stat::islink(path)) { |
| LOG(WARNING) << "Ignored non-symlink file '" << path |
| << "' under bind mount symlink root '" |
| << PORT_MAPPING_BIND_MOUNT_SYMLINK_ROOT() << "'"; |
| continue; |
| } |
| |
| // NOTE: We expect all symlinks under the bind mount symlink root |
| // to be container ID symlinks. |
| |
| Try<ContainerID> containerId = getContainerIdFromSymlink(path); |
| if (containerId.isError()) { |
| return Failure( |
| "Failed to get container ID from network namespace handle symlink '" + |
| path + "': " + containerId.error()); |
| } |
| |
| Result<pid_t> pid = getPidFromSymlink(path); |
| if (pid.isError()) { |
| return Failure( |
| "Failed to get pid from network namespace handle symlink '" + path + |
| "': " + pid.error()); |
| } |
| |
| // We remove the symlink if it's dangling or the associated |
| // containers have clearly exited (i.e., the veth has gone). The |
| // cleanup here is best effort. |
| if (pid.isNone() || !pids.contains(pid.get())) { |
| LOG(INFO) << "Removing stale network namespace handle symlink '" |
| << path << "'"; |
| |
| Try<Nothing> rm = os::rm(path); |
| if (rm.isError()) { |
| LOG(WARNING) << "Failed to remove stale network namespace handle " |
| << " symlink '" << path << "': " << rm.error(); |
| } |
| } else { |
| LOG(INFO) << "Discovered network namespace handle symlink " |
| << containerId.get() << " -> " << pid.get(); |
| |
| linkers.put(pid.get(), containerId.get()); |
| } |
| } |
| |
| // If multiple container IDs point to the same pid, we remove both |
| // symlinks for safety (as if we cannot derive the container ID for |
| // orphans, which is OK because it'll be treated the same as those |
| // containers that are created by older (pre 0.23.0) versions). Note |
| // that it's possible that multiple container IDs map to the same |
| // pid if the removal of a symlink fails in '_cleanup()' and the pid |
| // is reused by a new container. |
| foreach (pid_t pid, linkers.keys()) { |
| list<ContainerID> containerIds = linkers.get(pid); |
| if (containerIds.size() > 1) { |
| foreach (const ContainerID& containerId, containerIds) { |
| const string linker = getSymlinkPath(containerId); |
| |
| LOG(WARNING) << "Removing duplicated network namespace handle symlink '" |
| << linker << "'"; |
| |
| Try<Nothing> rm = os::rm(linker); |
| if (rm.isError()) { |
| LOG(WARNING) << "Failed to remove duplicated network namespace " |
| << "handle symlink '" << linker << "': " << rm.error(); |
| } |
| } |
| |
| linkers.remove(pid); |
| } |
| } |
| |
| // Now, actually recover the isolator from slave's state. |
| foreach (const ContainerState& state, states) { |
| const ContainerID& containerId = state.container_id(); |
| pid_t pid = state.pid(); |
| |
| VLOG(1) << "Recovering network isolator for container " |
| << containerId << " with pid " << pid; |
| |
| if (!pids.contains(pid)) { |
| // There are two possible cases here: |
| // |
| // 1) The container was launched by the slave with network |
| // isolation disabled, so the pid could not be found in the |
| // device names in the system. |
| // |
| // 2) The container was launched by the slave with network |
| // isolation enabled, but veth is removed (because the |
| // corresponding container is destroyed), but the slave |
| // restarts before it is able to write the sentinel file. |
| // |
| // In both cases, we treat the container as unmanaged. For case |
| // (2), it's safe to do so because the container has already |
| // been destroyed. |
| VLOG(1) << "Skipped recovery for container " << containerId |
| << "with pid " << pid << " as either it was not managed by " |
| << "the network isolator or it has already been destroyed"; |
| |
| unmanaged.insert(containerId); |
| continue; |
| } |
| |
| Try<Info*> recover = _recover(pid); |
| if (recover.isError()) { |
| foreachvalue (Info* info, infos) { |
| delete info; |
| } |
| |
| return Failure( |
| "Failed to recover container " + stringify(containerId) + |
| " with pid " + stringify(pid) + ": " + recover.error()); |
| } |
| |
| infos[containerId] = recover.get(); |
| |
| // Remove the successfully recovered pid. |
| pids.erase(pid); |
| } |
| |
| // Recover orphans. Known orphans will be destroyed by containerizer |
| // using the normal cleanup path (refer to MESOS-2367 for details). |
| // Unknown orphans will be cleaned up immediately. The recovery will |
| // fail if there is some unknown orphan that cannot be cleaned up. |
| vector<Info*> unknownOrphans; |
| |
| foreach (pid_t pid, pids) { |
| Try<Info*> recover = _recover(pid); |
| if (recover.isError()) { |
| foreachvalue (Info* info, infos) { |
| delete info; |
| } |
| foreach (Info* info, unknownOrphans) { |
| delete info; |
| } |
| |
| return Failure( |
| "Failed to recover orphaned container with pid " + |
| stringify(pid) + ": " + recover.error()); |
| } |
| |
| if (linkers.get(pid).size() == 1) { |
| const ContainerID containerId = linkers.get(pid).front(); |
| CHECK(!infos.contains(containerId)); |
| |
| if (orphans.contains(containerId)) { |
| infos[containerId] = recover.get(); |
| continue; |
| } |
| } |
| |
| unknownOrphans.push_back(recover.get()); |
| } |
| |
| foreach (Info* info, unknownOrphans) { |
| CHECK_SOME(info->pid); |
| pid_t pid = info->pid.get(); |
| |
| Option<ContainerID> containerId; |
| if (linkers.get(pid).size() == 1) { |
| containerId = linkers.get(pid).front(); |
| } |
| |
| // NOTE: If 'infos' is empty (means there is no regular container |
| // or known orphan), the '_cleanup' below will remove the ICMP and |
| // ARP packet filters on host eth0. This will cause subsequent |
| // calls to '_cleanup' for unknown orphans to fail. However, this |
| // is OK because when slave restarts and tries to recover again, |
| // it'll try to remove the remaining unknown orphans. |
| // TODO(jieyu): Consider call '_cleanup' for all the unknown |
| // orphans before returning even if error occurs. |
| Try<Nothing> cleanup = _cleanup(info, containerId); |
| if (cleanup.isError()) { |
| foreachvalue (Info* info, infos) { |
| delete info; |
| } |
| |
| // TODO(jieyu): Also delete 'info' in unknownOrphans. Notice |
| // that some 'info' in unknownOrphans might have already been |
| // deleted in '_cleanup' above. |
| |
| return Failure( |
| "Failed to cleanup orphaned container with pid " + |
| stringify(pid) + ": " + cleanup.error()); |
| } |
| } |
| |
| // TODO(cwang): Consider removing unrecognized flow classifiers from |
| // host eth0 egress. |
| |
| LOG(INFO) << "Network isolator recovery complete"; |
| |
| return Nothing(); |
| } |
| |
| |
| Try<PortMappingIsolatorProcess::Info*> |
| PortMappingIsolatorProcess::_recover(pid_t pid) |
| { |
| // Get all the IP filters on veth. |
| // NOTE: We only look at veth devices to recover port ranges |
| // assigned to each container. That's the reason why we need to make |
| // sure that we add filters to veth before adding filters to host |
| // eth0 and host lo. Also, we need to make sure we remove filters |
| // from host eth0 and host lo before removing filters from veth. |
| Result<vector<ip::Classifier>> vethIngressClassifiers = |
| ip::classifiers(veth(pid), ingress::HANDLE); |
| |
| if (vethIngressClassifiers.isError()) { |
| return Error( |
| "Failed to get all the IP filters on " + veth(pid) + |
| ": " + vethIngressClassifiers.error()); |
| } else if (vethIngressClassifiers.isNone()) { |
| return Error( |
| "Failed to get all the IP filters on " + veth(pid) + |
| ": link does not exist"); |
| } |
| |
| hashmap<PortRange, uint16_t> flowIds; |
| |
| if (flags.egress_unique_flow_per_container) { |
| // Get all egress IP flow classifiers on eth0. |
| Result<vector<filter::Filter<ip::Classifier>>> eth0EgressFilters = |
| ip::filters(eth0, hostTxFqCodelHandle); |
| |
| if (eth0EgressFilters.isError()) { |
| return Error( |
| "Failed to get all the IP flow classifiers on " + eth0 + |
| ": " + eth0EgressFilters.error()); |
| } else if (eth0EgressFilters.isNone()) { |
| return Error( |
| "Failed to get all the IP flow classifiers on " + eth0 + |
| ": link does not exist"); |
| } |
| |
| // Construct a port range to flow ID mapping from host eth0 |
| // egress. This map will be used later. |
| foreach (const filter::Filter<ip::Classifier>& filter, |
| eth0EgressFilters.get()) { |
| const Option<PortRange> sourcePorts = filter.classifier.sourcePorts; |
| const Option<Handle> classid = filter.classid; |
| |
| if (sourcePorts.isNone()) { |
| return Error("Missing source ports for filters on egress of " + eth0); |
| } |
| |
| if (classid.isNone()) { |
| return Error("Missing classid for filters on egress of " + eth0); |
| } |
| |
| if (flowIds.contains(sourcePorts.get())) { |
| return Error( |
| "Duplicated port range " + stringify(sourcePorts.get()) + |
| " detected on egress of " + eth0); |
| } |
| |
| flowIds[sourcePorts.get()] = classid->secondary(); |
| } |
| } |
| |
| IntervalSet<uint16_t> nonEphemeralPorts; |
| IntervalSet<uint16_t> ephemeralPorts; |
| Option<uint16_t> flowId; |
| |
| foreach (const ip::Classifier& classifier, vethIngressClassifiers.get()) { |
| const Option<PortRange> sourcePorts = classifier.sourcePorts; |
| const Option<PortRange> destinationPorts = classifier.destinationPorts; |
| |
| // All the IP filters on veth used by us only have source ports. |
| if (sourcePorts.isNone() || destinationPorts.isSome()) { |
| return Error("Unexpected IP filter detected on " + veth(pid)); |
| } |
| |
| if (flowIds.contains(sourcePorts.get())) { |
| if (flowId.isNone()) { |
| flowId = flowIds.get(sourcePorts.get()); |
| } else if (flowId != flowIds.get(sourcePorts.get())) { |
| return Error( |
| "A container is associated with multiple flows " |
| "on egress of " + eth0); |
| } |
| } else if (flowId.isSome()) { |
| // This is the case where some port range of a container is |
| // assigned to a flow while some isn't. This could happen if |
| // slave crashes while those filters are created. However, this |
| // is OK for us because packets by default go to the host flow. |
| LOG(WARNING) << "Container port range " << sourcePorts.get() |
| << " does not have flow id " << flowId.get() |
| << " assigned"; |
| } |
| |
| Interval<uint16_t> ports = |
| (Bound<uint16_t>::closed(sourcePorts->begin()), |
| Bound<uint16_t>::closed(sourcePorts->end())); |
| |
| if (managedNonEphemeralPorts.contains(ports)) { |
| nonEphemeralPorts += ports; |
| } else if (ephemeralPortsAllocator->isManaged(ports)) { |
| // We have duplicate here because we have two IP filters with |
| // the same ephemeral port range (one for eth0 and one for lo). |
| // But we should never have two intersecting port ranges. |
| if (!ephemeralPorts.contains(ports) && ephemeralPorts.intersects(ports)) { |
| return Error("Unexpected intersected ephemeral port ranges"); |
| } |
| |
| ephemeralPorts += ports; |
| } else { |
| return Error("Unexpected IP filter detected on " + veth(pid)); |
| } |
| } |
| |
| Info* info = nullptr; |
| |
| if (ephemeralPorts.empty()) { |
| // NOTE: This is possible because the slave may crash while |
| // calling 'isolate()', leaving a partially isolated container. To |
| // clean up this partially isolated container, we still create an |
| // Info struct here and let the 'cleanup' function clean it up |
| // later. |
| LOG(WARNING) << "No ephemeral ports found for container with pid " |
| << stringify(pid) << ". This could happen if agent crashes " |
| << "while isolating a container"; |
| |
| info = new Info(nonEphemeralPorts, Interval<uint16_t>(), pid); |
| } else { |
| if (ephemeralPorts.intervalCount() != 1) { |
| return Error("Each container should have only one ephemeral port range"); |
| } |
| |
| // Tell the allocator that this ephemeral port range is used. |
| ephemeralPortsAllocator->allocate(*ephemeralPorts.begin()); |
| |
| info = new Info(nonEphemeralPorts, *ephemeralPorts.begin(), pid); |
| |
| VLOG(1) << "Recovered network isolator for container with pid " << pid |
| << " non-ephemeral port ranges " << nonEphemeralPorts |
| << " and ephemeral port range " << *ephemeralPorts.begin(); |
| } |
| |
| if (flowId.isSome()) { |
| freeFlowIds.erase(flowId.get()); |
| info->flowId = flowId.get(); |
| } |
| |
| return CHECK_NOTNULL(info); |
| } |
| |
| |
| Future<Option<ContainerLaunchInfo>> PortMappingIsolatorProcess::prepare( |
| const ContainerID& containerId, |
| const ContainerConfig& containerConfig) |
| { |
| if (unmanaged.contains(containerId)) { |
| return Failure("Asked to prepare an unmanaged container"); |
| } |
| |
| if (infos.contains(containerId)) { |
| return Failure("Container has already been prepared"); |
| } |
| |
| const ExecutorInfo& executorInfo = containerConfig.executor_info(); |
| const Resources resources(containerConfig.resources()); |
| |
| IntervalSet<uint16_t> nonEphemeralPorts; |
| |
| if (resources.ports().isSome()) { |
| nonEphemeralPorts = rangesToIntervalSet<uint16_t>( |
| resources.ports().get()).get(); |
| |
| // Sanity check to make sure that the assigned non-ephemeral ports |
| // for the container are part of the non-ephemeral ports specified |
| // by the slave. |
| if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) { |
| return Failure( |
| "Some non-ephemeral ports specified in " + |
| stringify(nonEphemeralPorts) + |
| " are not managed by the agent"); |
| } |
| } |
| |
| // TODO(jieyu): For now, we simply ignore the 'ephemeral_ports' |
| // specified in the executor info. However, this behavior needs to |
| // be changed once the master can make default allocations for |
| // ephemeral ports. |
| if (resources.ephemeral_ports().isSome()) { |
| LOG(WARNING) << "Ignoring the specified ephemeral_ports '" |
| << resources.ephemeral_ports().get() |
| << "' for container " << containerId |
| << " of executor '" << executorInfo.executor_id() << "'"; |
| } |
| |
| // Allocate the ephemeral ports used by this container. |
| Try<Interval<uint16_t>> ephemeralPorts = ephemeralPortsAllocator->allocate(); |
| if (ephemeralPorts.isError()) { |
| return Failure( |
| "Failed to allocate ephemeral ports: " + ephemeralPorts.error()); |
| } |
| |
| infos[containerId] = new Info(nonEphemeralPorts, ephemeralPorts.get()); |
| |
| LOG(INFO) << "Using non-ephemeral ports " << nonEphemeralPorts |
| << " and ephemeral ports " << ephemeralPorts.get() |
| << " for container " << containerId << " of executor '" |
| << executorInfo.executor_id() << "'"; |
| |
| ContainerLaunchInfo launchInfo; |
| launchInfo.add_pre_exec_commands()->set_value(scripts(infos[containerId])); |
| |
| // NOTE: the port mapping isolator itself doesn't require mount |
| // namespace. However, if mount namespace is enabled because of |
| // other isolators, we need to set mount sharing accordingly for |
| // PORT_MAPPING_BIND_MOUNT_ROOT to avoid races described in |
| // MESOS-1558. So we turn on mount namespace here for consistency. |
| launchInfo.add_clone_namespaces(CLONE_NEWNET); |
| launchInfo.add_clone_namespaces(CLONE_NEWNS); |
| |
| return launchInfo; |
| } |
| |
| |
| Future<Nothing> PortMappingIsolatorProcess::isolate( |
| const ContainerID& containerId, |
| pid_t pid) |
| { |
| if (unmanaged.contains(containerId)) { |
| return Failure("Asked to isolate an unmanaged container"); |
| } |
| |
| if (!infos.contains(containerId)) { |
| return Failure("Unknown container"); |
| } |
| |
| Info* info = CHECK_NOTNULL(infos[containerId]); |
| |
| if (info->pid.isSome()) { |
| return Failure("The container has already been isolated"); |
| } |
| |
| info->pid = pid; |
| |
| if (flags.egress_unique_flow_per_container) { |
| info->flowId = getNextFlowId(); |
| } |
| |
| // Bind mount the network namespace handle of the process 'pid' to a |
| // directory to hold an extra reference to the network namespace |
| // which will be released in 'cleanup'. By holding the extra |
| // reference, the network namespace will not be destroyed even if |
| // the process 'pid' is gone, which allows us to explicitly control |
| // the network namespace life cycle. |
| const string source = path::join("/proc", stringify(pid), "ns", "net"); |
| const string target = getNamespaceHandlePath(bindMountRoot, pid); |
| |
| Try<Nothing> touch = os::touch(target); |
| if (touch.isError()) { |
| return Failure("Failed to create the bind mount point: " + touch.error()); |
| } |
| |
| Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, nullptr); |
| if (mount.isError()) { |
| return Failure( |
| "Failed to mount the network namespace handle from '" + |
| source + "' to '" + target + "': " + mount.error()); |
| } |
| |
| LOG(INFO) << "Bind mounted '" << source << "' to '" << target |
| << "' for container " << containerId; |
| |
| // Since 0.23.0, we create a symlink to the network namespace handle |
| // using the container ID. This serves two purposes. First, it |
| // allows us to recover the container ID later when slave restarts |
| // even if slave's checkpointed meta data is deleted. Second, it |
| // makes the debugging easier. See MESOS-2528 for details. |
| const string linker = getSymlinkPath(containerId); |
| Try<Nothing> symlink = ::fs::symlink(target, linker); |
| if (symlink.isError()) { |
| return Failure( |
| "Failed to symlink the network namespace handle '" + |
| linker + "' -> '" + target + "': " + symlink.error()); |
| } |
| |
| LOG(INFO) << "Created network namespace handle symlink '" |
| << linker << "' -> '" << target << "'"; |
| |
| // Create a virtual ethernet pair for this container. |
| Try<bool> createVethPair = link::veth::create(veth(pid), eth0, pid); |
| if (createVethPair.isError()) { |
| return Failure( |
| "Failed to create virtual ethernet pair: " + |
| createVethPair.error()); |
| } |
| |
| // We cannot reuse the existing veth pair, because one of them is |
| // still inside another container. |
| if (!createVethPair.get()) { |
| return Failure( |
| "Virtual ethernet pair " + veth(pid) + " already exists"); |
| } |
| |
| // Disable IPv6 for veth as IPv6 packets won't be forwarded anyway. |
| const string disableIPv6 = |
| path::join("/proc/sys/net/ipv6/conf", veth(pid), "disable_ipv6"); |
| |
| if (os::exists(disableIPv6)) { |
| Try<Nothing> write = os::write(disableIPv6, "1"); |
| if (write.isError()) { |
| return Failure( |
| "Failed to disable IPv6 for " + veth(pid) + |
| ": " + write.error()); |
| } |
| } |
| |
| // Sets the MAC address of veth to match the MAC address of the host |
| // public interface (eth0). |
| Try<bool> setVethMAC = link::setMAC(veth(pid), hostMAC); |
| if (setVethMAC.isError()) { |
| return Failure( |
| "Failed to set the MAC address of " + veth(pid) + |
| ": " + setVethMAC.error()); |
| } |
| |
| // Prepare the ingress queueing disciplines on veth. |
| Try<bool> createQdisc = ingress::create(veth(pid)); |
| if (createQdisc.isError()) { |
| return Failure( |
| "Failed to create the ingress qdisc on " + veth(pid) + |
| ": " + createQdisc.error()); |
| } |
| |
| // Veth device should exist since we just created it. |
| CHECK(createQdisc.get()); |
| |
| // For each port range, add a set of IP packet filters to properly |
| // redirect IP traffic to/from containers. |
| foreach (const PortRange& range, |
| getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) { |
| if (info->flowId.isSome()) { |
| LOG(INFO) << "Adding IP packet filters with ports " << range |
| << " with flow ID " << info->flowId.get() |
| << " for container " << containerId; |
| } else { |
| LOG(INFO) << "Adding IP packet filters with ports " << range |
| << " for container " << containerId; |
| } |
| |
| Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid)); |
| if (add.isError()) { |
| return Failure( |
| "Failed to add IP packet filter with ports " + |
| stringify(range) + " for container with pid " + |
| stringify(pid) + ": " + add.error()); |
| } |
| } |
| |
| // Relay ICMP packets from veth of the container to host eth0. |
| Try<bool> icmpVethToEth0 = filter::icmp::create( |
| veth(pid), |
| ingress::HANDLE, |
| icmp::Classifier(None()), |
| Priority(ICMP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(eth0)); |
| |
| if (icmpVethToEth0.isError()) { |
| ++metrics.adding_veth_icmp_filters_errors; |
| |
| return Failure( |
| "Failed to create an ICMP packet filter from " + veth(pid) + |
| " to host " + eth0 + ": " + icmpVethToEth0.error()); |
| } else if (!icmpVethToEth0.get()) { |
| ++metrics.adding_veth_icmp_filters_already_exist; |
| |
| return Failure( |
| "The ICMP packet filter from " + veth(pid) + |
| " to host " + eth0 + " already exists"); |
| } |
| |
| // Relay ARP packets from veth of the container to host eth0. |
| Try<bool> arpVethToEth0 = filter::basic::create( |
| veth(pid), |
| ingress::HANDLE, |
| ETH_P_ARP, |
| Priority(ARP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(eth0)); |
| |
| if (arpVethToEth0.isError()) { |
| ++metrics.adding_veth_arp_filters_errors; |
| |
| return Failure( |
| "Failed to create an ARP packet filter from " + veth(pid) + |
| " to host " + eth0 + ": " + arpVethToEth0.error()); |
| } else if (!arpVethToEth0.get()) { |
| ++metrics.adding_veth_arp_filters_already_exist; |
| |
| return Failure( |
| "The ARP packet filter from " + veth(pid) + |
| " to host " + eth0 + " already exists"); |
| } |
| |
| // Setup filters for ICMP and ARP packets. We mirror ICMP and ARP |
| // packets from host eth0 to veths of all the containers. We also |
| // setup flow classifiers for host eth0 egress. |
| set<string> targets; |
| foreachvalue (Info* info, infos) { |
| if (info->pid.isSome()) { |
| targets.insert(veth(info->pid.get())); |
| } |
| } |
| |
| if (targets.size() == 1) { |
| // We just create the first container in which case we should |
| // create filters for ICMP and ARP packets. |
| |
| // Create a new ICMP filter on host eth0 ingress for mirroring |
| // packets from host eth0 to veth. |
| Try<bool> icmpEth0ToVeth = filter::icmp::create( |
| eth0, |
| ingress::HANDLE, |
| icmp::Classifier(hostIPNetwork.address()), |
| Priority(ICMP_FILTER_PRIORITY, NORMAL), |
| action::Mirror(targets)); |
| |
| if (icmpEth0ToVeth.isError()) { |
| ++metrics.adding_eth0_icmp_filters_errors; |
| |
| return Failure( |
| "Failed to create an ICMP packet filter from host " + eth0 + |
| " to " + veth(pid) + ": " + icmpEth0ToVeth.error()); |
| } else if (!icmpEth0ToVeth.get()) { |
| ++metrics.adding_eth0_icmp_filters_already_exist; |
| |
| return Failure( |
| "The ICMP packet filter on host " + eth0 + " already exists"); |
| } |
| |
| // Create a new ARP filter on host eth0 ingress for mirroring |
| // packets from host eth0 to veth. |
| Try<bool> arpEth0ToVeth = filter::basic::create( |
| eth0, |
| ingress::HANDLE, |
| ETH_P_ARP, |
| Priority(ARP_FILTER_PRIORITY, NORMAL), |
| action::Mirror(targets)); |
| |
| if (arpEth0ToVeth.isError()) { |
| ++metrics.adding_eth0_arp_filters_errors; |
| |
| return Failure( |
| "Failed to create an ARP packet filter from host " + eth0 + |
| " to " + veth(pid) + ": " + arpEth0ToVeth.error()); |
| } else if (!arpEth0ToVeth.get()) { |
| ++metrics.adding_eth0_arp_filters_already_exist; |
| |
| return Failure( |
| "The ARP packet filter on host " + eth0 + " already exists"); |
| } |
| } else { |
| // This is not the first container in which case we should update |
| // filters for ICMP and ARP packets. |
| |
| // Update the ICMP filter on host eth0 ingress. |
| Try<bool> icmpEth0ToVeth = filter::icmp::update( |
| eth0, |
| ingress::HANDLE, |
| icmp::Classifier(hostIPNetwork.address()), |
| action::Mirror(targets)); |
| |
| if (icmpEth0ToVeth.isError()) { |
| ++metrics.updating_eth0_icmp_filters_errors; |
| |
| return Failure( |
| "Failed to append a ICMP mirror action from host " + |
| eth0 + " to " + veth(pid) + ": " + icmpEth0ToVeth.error()); |
| } else if (!icmpEth0ToVeth.get()) { |
| ++metrics.updating_eth0_icmp_filters_already_exist; |
| |
| return Failure( |
| "The ICMP packet filter on host " + eth0 + " already exists"); |
| } |
| |
| // Update the ARP filter on host eth0 ingress. |
| Try<bool> arpEth0ToVeth = filter::basic::update( |
| eth0, |
| ingress::HANDLE, |
| ETH_P_ARP, |
| action::Mirror(targets)); |
| |
| if (arpEth0ToVeth.isError()) { |
| ++metrics.updating_eth0_arp_filters_errors; |
| |
| return Failure( |
| "Failed to append an ARP mirror action from host " + |
| eth0 + " to " + veth(pid) + ": " + arpEth0ToVeth.error()); |
| } else if (!arpEth0ToVeth.get()) { |
| ++metrics.updating_eth0_arp_filters_already_exist; |
| |
| return Failure( |
| "The ARP packet filter on host " + eth0 + " already exists"); |
| } |
| } |
| |
| if (flags.egress_unique_flow_per_container) { |
| // Create a new ICMP filter on host eth0 egress for classifying |
| // packets into a reserved flow. |
| Try<bool> icmpEth0Egress = filter::icmp::create( |
| eth0, |
| hostTxFqCodelHandle, |
| icmp::Classifier(None()), |
| Priority(ICMP_FILTER_PRIORITY, NORMAL), |
| Handle(hostTxFqCodelHandle, ICMP_FLOWID)); |
| |
| if (icmpEth0Egress.isError()) { |
| ++metrics.adding_eth0_egress_filters_errors; |
| |
| return Failure( |
| "Failed to create the ICMP flow classifier on host " + |
| eth0 + ": " + icmpEth0Egress.error()); |
| } else if (!icmpEth0Egress.get()) { |
| // We try to create the filter every time a container is |
| // launched. Ignore if it already exists. |
| } |
| |
| // Create a new ARP filter on host eth0 egress for classifying |
| // packets into a reserved flow. |
| Try<bool> arpEth0Egress = filter::basic::create( |
| eth0, |
| hostTxFqCodelHandle, |
| ETH_P_ARP, |
| Priority(ARP_FILTER_PRIORITY, NORMAL), |
| Handle(hostTxFqCodelHandle, ARP_FLOWID)); |
| |
| if (arpEth0Egress.isError()) { |
| ++metrics.adding_eth0_egress_filters_errors; |
| |
| return Failure( |
| "Failed to create the ARP flow classifier on host " + |
| eth0 + ": " + arpEth0Egress.error()); |
| } else if (!arpEth0Egress.get()) { |
| // We try to create the filter every time a container is |
| // launched. Ignore if it already exists. |
| } |
| |
| // Rest of the host packets go to a reserved flow. |
| Try<bool> defaultEth0Egress = filter::basic::create( |
| eth0, |
| hostTxFqCodelHandle, |
| ETH_P_ALL, |
| Priority(DEFAULT_FILTER_PRIORITY, NORMAL), |
| Handle(hostTxFqCodelHandle, HOST_FLOWID)); |
| |
| if (defaultEth0Egress.isError()) { |
| ++metrics.adding_eth0_egress_filters_errors; |
| |
| return Failure( |
| "Failed to create the default flow classifier on host " + |
| eth0 + ": " + defaultEth0Egress.error()); |
| } else if (!defaultEth0Egress.get()) { |
| // NOTE: Since we don't remove this filter on purpose in |
| // _cleanup() (see the comments there), we just continue even |
| // if it already exists, so do nothing here. |
| } |
| } |
| |
| // Turn on the veth. |
| Try<bool> enable = link::setUp(veth(pid)); |
| if (enable.isError()) { |
| return Failure("Failed to turn on " + veth(pid) + ": " + enable.error()); |
| } else if (!enable.get()) { |
| return Failure("Not expecting " + veth(pid) + " to be missing"); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| Future<ContainerLimitation> PortMappingIsolatorProcess::watch( |
| const ContainerID& containerId) |
| { |
| if (unmanaged.contains(containerId)) { |
| LOG(WARNING) << "Ignoring watch for unmanaged container " << containerId; |
| } else if (!infos.contains(containerId)) { |
| LOG(WARNING) << "Ignoring watch for unknown container " << containerId; |
| } |
| |
| // Currently, we always return a pending future because limitation |
| // is never reached. |
| return Future<ContainerLimitation>(); |
| } |
| |
| |
| void PortMappingIsolatorProcess::_update( |
| const ContainerID& containerId, |
| const Future<Option<int>>& status) |
| { |
| if (!status.isReady()) { |
| ++metrics.updating_container_ip_filters_errors; |
| |
| LOG(ERROR) << "Failed to start a process for updating container " |
| << containerId << ": " |
| << (status.isFailed() ? status.failure() : "discarded"); |
| } else if (status->isNone()) { |
| ++metrics.updating_container_ip_filters_errors; |
| |
| LOG(ERROR) << "The process for updating container " << containerId |
| << " is not expected to be reaped elsewhere"; |
| } else if (status->get() != 0) { |
| ++metrics.updating_container_ip_filters_errors; |
| |
| LOG(ERROR) << "The process for updating container " << containerId << " " |
| << WSTRINGIFY(status->get()); |
| } else { |
| LOG(INFO) << "The process for updating container " << containerId |
| << " finished successfully"; |
| } |
| } |
| |
| |
| Future<Nothing> PortMappingIsolatorProcess::update( |
| const ContainerID& containerId, |
| const Resources& resourceRequests, |
| const google::protobuf::Map<string, Value::Scalar>& resourceLimits) |
| { |
| // It is possible for the network isolator to be asked to update a |
| // container that isn't managed by it, for instance, containers |
| // recovered from a previous run without a network isolator. |
| if (unmanaged.contains(containerId)) { |
| return Nothing(); |
| } |
| |
| if (!infos.contains(containerId)) { |
| LOG(WARNING) << "Ignoring update for unknown container " << containerId; |
| return Nothing(); |
| } |
| |
| // TODO(jieyu): For now, we simply ignore the 'ephemeral_ports' |
| // specified in 'resources'. However, this behavior needs to be |
| // changed once the master can make default allocations for |
| // ephemeral ports. |
| if (resourceRequests.ephemeral_ports().isSome()) { |
| LOG(WARNING) << "Ignoring the specified ephemeral_ports '" |
| << resourceRequests.ephemeral_ports().get() |
| << "' for container" << containerId; |
| } |
| |
| Info* info = CHECK_NOTNULL(infos[containerId]); |
| |
| if (info->pid.isNone()) { |
| return Failure("The container has not been isolated"); |
| } |
| pid_t pid = info->pid.get(); |
| |
| IntervalSet<uint16_t> nonEphemeralPorts; |
| |
| if (resourceRequests.ports().isSome()) { |
| nonEphemeralPorts = rangesToIntervalSet<uint16_t>( |
| resourceRequests.ports().get()).get(); |
| |
| // Sanity check to make sure that the assigned non-ephemeral ports |
| // for the container are part of the non-ephemeral ports specified |
| // by the slave. |
| if (!managedNonEphemeralPorts.contains(nonEphemeralPorts)) { |
| return Failure( |
| "Some non-ephemeral ports specified in " + |
| stringify(nonEphemeralPorts) + |
| " are not managed by the agent"); |
| } |
| } |
| |
| // No need to proceed if no change to the non-ephemeral ports. |
| if (nonEphemeralPorts == info->nonEphemeralPorts) { |
| return Nothing(); |
| } |
| |
| LOG(INFO) << "Updating non-ephemeral ports for container " |
| << containerId << " from " << info->nonEphemeralPorts |
| << " to " << nonEphemeralPorts; |
| |
| Result<vector<ip::Classifier>> classifiers = |
| ip::classifiers(veth(pid), ingress::HANDLE); |
| |
| if (classifiers.isError()) { |
| return Failure( |
| "Failed to get all the IP filters on " + veth(pid) + |
| ": " + classifiers.error()); |
| } else if (classifiers.isNone()) { |
| return Failure("Failed to find " + veth(pid)); |
| } |
| |
| // We first decide what port ranges need to be removed. Any filter |
| // whose port range is not within the new non-ephemeral ports should |
| // be removed. |
| hashset<PortRange> portsToRemove; |
| IntervalSet<uint16_t> remaining = info->nonEphemeralPorts; |
| |
| foreach (const ip::Classifier& classifier, classifiers.get()) { |
| Option<PortRange> sourcePorts = classifier.sourcePorts; |
| Option<PortRange> destinationPorts = classifier.destinationPorts; |
| |
| // All the IP filters on veth used by us only have source ports. |
| if (sourcePorts.isNone() || destinationPorts.isSome()) { |
| return Failure("Unexpected IP filter detected on " + veth(pid)); |
| } |
| |
| Interval<uint16_t> ports = |
| (Bound<uint16_t>::closed(sourcePorts->begin()), |
| Bound<uint16_t>::closed(sourcePorts->end())); |
| |
| // Skip the ephemeral ports. |
| if (ports == info->ephemeralPorts) { |
| continue; |
| } |
| |
| if (!nonEphemeralPorts.contains(ports)) { |
| remaining -= ports; |
| portsToRemove.insert(sourcePorts.get()); |
| } |
| } |
| |
| // We then decide what port ranges need to be added. |
| vector<PortRange> portsToAdd = getPortRanges(nonEphemeralPorts - remaining); |
| |
| foreach (const PortRange& range, portsToAdd) { |
| if (info->flowId.isSome()) { |
| LOG(INFO) << "Adding IP packet filters with ports " << range |
| << " with flow ID " << info->flowId.get() |
| << " for container " << containerId; |
| } else { |
| LOG(INFO) << "Adding IP packet filters with ports " << range |
| << " for container " << containerId; |
| } |
| |
| // All IP packets from a container will be assigned a single flow |
| // on host eth0. |
| Try<Nothing> add = addHostIPFilters(range, info->flowId, veth(pid)); |
| if (add.isError()) { |
| return Failure( |
| "Failed to add IP packet filter with ports " + |
| stringify(range) + " for container with pid " + |
| stringify(pid) + ": " + add.error()); |
| } |
| } |
| |
| foreach (const PortRange& range, portsToRemove) { |
| LOG(INFO) << "Removing IP packet filters with ports " << range |
| << " for container with pid " << pid; |
| |
| Try<Nothing> removing = removeHostIPFilters(range, veth(pid)); |
| if (removing.isError()) { |
| return Failure( |
| "Failed to remove IP packet filter with ports " + |
| stringify(range) + " for container with pid " + |
| stringify(pid) + ": " + removing.error()); |
| } |
| } |
| |
| // Update the non-ephemeral ports of this container. |
| info->nonEphemeralPorts = nonEphemeralPorts; |
| |
| // Update the IP filters inside the container. |
| PortMappingUpdate update; |
| update.flags.eth0_name = eth0; |
| update.flags.lo_name = lo; |
| update.flags.pid = pid; |
| update.flags.ports_to_add = json(portsToAdd); |
| update.flags.ports_to_remove = json(portsToRemove); |
| |
| vector<string> argv(2); |
| argv[0] = "mesos-network-helper"; |
| argv[1] = PortMappingUpdate::NAME; |
| |
| Try<Subprocess> s = subprocess( |
| path::join(flags.launcher_dir, "mesos-network-helper"), |
| argv, |
| Subprocess::PATH(os::DEV_NULL), |
| Subprocess::FD(STDOUT_FILENO), |
| Subprocess::FD(STDERR_FILENO), |
| &update.flags); |
| |
| if (s.isError()) { |
| return Failure("Failed to launch update subcommand: " + s.error()); |
| } |
| |
| return s->status() |
| .onAny(defer( |
| PID<PortMappingIsolatorProcess>(this), |
| &PortMappingIsolatorProcess::_update, |
| containerId, |
| lambda::_1)) |
| .then([]() { return Nothing(); }); |
| } |
| |
| |
| Future<ResourceStatistics> PortMappingIsolatorProcess::usage( |
| const ContainerID& containerId) |
| { |
| ResourceStatistics result; |
| |
| // Do nothing for unmanaged container. |
| if (unmanaged.contains(containerId)) { |
| return result; |
| } |
| |
| if (!infos.contains(containerId)) { |
| VLOG(1) << "Unknown container " << containerId; |
| return result; |
| } |
| |
| Info* info = CHECK_NOTNULL(infos[containerId]); |
| |
| if (info->pid.isNone()) { |
| return result; |
| } |
| |
| Result<hashmap<string, uint64_t>> stat = |
| link::statistics(veth(info->pid.get())); |
| |
| if (stat.isError()) { |
| return Failure( |
| "Failed to retrieve statistics on link " + |
| veth(info->pid.get()) + ": " + stat.error()); |
| } else if (stat.isNone()) { |
| return Failure("Failed to find link: " + veth(info->pid.get())); |
| } |
| |
| // Note: The RX/TX statistics on the two ends of the veth are the |
| // exact opposite, because of its 'tunnel' nature. We sample on the |
| // host end of the veth pair, which means we have to reverse RX and |
| // TX to reflect statistics inside the container. |
| |
| // +----------+ +----------+ |
| // | | | | |
| // | |RX<--------------+TX| | |
| // | | | | |
| // | veth | | eth0 | |
| // | | | | |
| // | |TX+-------------->RX| | |
| // | | | | |
| // +----------+ +----------+ |
| |
| Option<uint64_t> rx_packets = stat->get("tx_packets"); |
| if (rx_packets.isSome()) { |
| result.set_net_rx_packets(rx_packets.get()); |
| } |
| |
| Option<uint64_t> rx_bytes = stat->get("tx_bytes"); |
| if (rx_bytes.isSome()) { |
| result.set_net_rx_bytes(rx_bytes.get()); |
| } |
| |
| Option<uint64_t> rx_errors = stat->get("tx_errors"); |
| if (rx_errors.isSome()) { |
| result.set_net_rx_errors(rx_errors.get()); |
| } |
| |
| Option<uint64_t> rx_dropped = stat->get("tx_dropped"); |
| if (rx_dropped.isSome()) { |
| result.set_net_rx_dropped(rx_dropped.get()); |
| } |
| |
| Option<uint64_t> tx_packets = stat->get("rx_packets"); |
| if (tx_packets.isSome()) { |
| result.set_net_tx_packets(tx_packets.get()); |
| } |
| |
| Option<uint64_t> tx_bytes = stat->get("rx_bytes"); |
| if (tx_bytes.isSome()) { |
| result.set_net_tx_bytes(tx_bytes.get()); |
| } |
| |
| Option<uint64_t> tx_errors = stat->get("rx_errors"); |
| if (tx_errors.isSome()) { |
| result.set_net_tx_errors(tx_errors.get()); |
| } |
| |
| Option<uint64_t> tx_dropped = stat->get("rx_dropped"); |
| if (tx_dropped.isSome()) { |
| result.set_net_tx_dropped(tx_dropped.get()); |
| } |
| |
| // Retrieve the socket information from inside the container. |
| PortMappingStatistics statistics; |
| statistics.flags.pid = info->pid.get(); |
| statistics.flags.eth0_name = eth0; |
| statistics.flags.enable_socket_statistics_summary = |
| flags.network_enable_socket_statistics_summary; |
| statistics.flags.enable_socket_statistics_details = |
| flags.network_enable_socket_statistics_details; |
| statistics.flags.enable_snmp_statistics = |
| flags.network_enable_snmp_statistics; |
| |
| vector<string> argv(2); |
| argv[0] = "mesos-network-helper"; |
| argv[1] = PortMappingStatistics::NAME; |
| |
| // We don't need STDIN; we need STDOUT for the result; we leave |
| // STDERR as is to log to slave process. |
| Try<Subprocess> s = subprocess( |
| path::join(flags.launcher_dir, "mesos-network-helper"), |
| argv, |
| Subprocess::PATH(os::DEV_NULL), |
| Subprocess::PIPE(), |
| Subprocess::FD(STDERR_FILENO), |
| &statistics.flags); |
| |
| if (s.isError()) { |
| return Failure("Failed to launch the statistics subcommand: " + s.error()); |
| } |
| |
| // TODO(chzhcn): it is possible for the subprocess to block on |
| // writing to its end of the pipe and never exit because the pipe |
| // has limited buffer size, but we have been careful to send very |
| // few bytes so this shouldn't be a problem. |
| return s->status() |
| .then(defer( |
| PID<PortMappingIsolatorProcess>(this), |
| &PortMappingIsolatorProcess::_usage, |
| result, |
| s.get())); |
| } |
| |
| |
| Future<ResourceStatistics> PortMappingIsolatorProcess::_usage( |
| const ResourceStatistics& result, |
| const Subprocess& s) |
| { |
| CHECK_READY(s.status()); |
| |
| Option<int> status = s.status().get(); |
| |
| if (status.isNone()) { |
| return Failure( |
| "The process for getting network statistics is unexpectedly reaped"); |
| } else if (status.get() != 0) { |
| return Failure( |
| "The process for getting network statistics has non-zero exit code: " + |
| WSTRINGIFY(status.get())); |
| } |
| |
| return io::read(s.out().get()) |
| .then(defer( |
| PID<PortMappingIsolatorProcess>(this), |
| &PortMappingIsolatorProcess::__usage, |
| result, |
| lambda::_1)); |
| } |
| |
| |
| Future<ResourceStatistics> PortMappingIsolatorProcess::__usage( |
| ResourceStatistics result, |
| const Future<string>& out) |
| { |
| CHECK_READY(out); |
| |
| // NOTE: It's possible the subprocess has no output. |
| if (out->empty()) { |
| return result; |
| } |
| |
| Try<JSON::Object> object = JSON::parse<JSON::Object>(out.get()); |
| if (object.isError()) { |
| return Failure( |
| "Failed to parse the output from the process that gets the " |
| "network statistics: " + object.error()); |
| } |
| |
| Result<ResourceStatistics> _result = |
| protobuf::parse<ResourceStatistics>(object.get()); |
| |
| if (_result.isError()) { |
| return Failure( |
| "Failed to parse the output from the process that gets the " |
| "network statistics: " + object.error()); |
| } |
| |
| result.MergeFrom(_result.get()); |
| |
| // NOTE: We unset the 'timestamp' field here because otherwise it |
| // will overwrite the timestamp set in the containerizer. |
| result.clear_timestamp(); |
| |
| return result; |
| } |
| |
| |
| Future<Nothing> PortMappingIsolatorProcess::cleanup( |
| const ContainerID& containerId) |
| { |
| if (unmanaged.contains(containerId)) { |
| unmanaged.erase(containerId); |
| return Nothing(); |
| } |
| |
| if (!infos.contains(containerId)) { |
| LOG(WARNING) << "Ignoring cleanup for unknown container " << containerId; |
| return Nothing(); |
| } |
| |
| Info* info = CHECK_NOTNULL(infos[containerId]); |
| |
| // For a normally exited container, we take its info pointer off the |
| // hashmap infos before using the helper function to clean it up. |
| infos.erase(containerId); |
| |
| Try<Nothing> cleanup = _cleanup(info, containerId); |
| if (cleanup.isError()) { |
| return Failure(cleanup.error()); |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| // TODO(jieyu): We take an optional container ID here because not all |
| // the containers we want to cleanup have container IDs available. For |
| // instance, we cannot get container IDs for those orphan containers |
| // created by older (pre 0.23.0) versions of this isolator (with no |
| // associated namespace handle symlinks). |
| Try<Nothing> PortMappingIsolatorProcess::_cleanup( |
| Info* _info, |
| const Option<ContainerID>& containerId) |
| { |
| // Set '_info' to be auto-managed so that it will be deleted when |
| // this function returns. |
| Owned<Info> info(CHECK_NOTNULL(_info)); |
| |
| // Free the ephemeral ports used by the container. Filters |
| // associated with those ports will be removed below if they were |
| // set up. |
| if (info->ephemeralPorts != Interval<uint16_t>()) { |
| ephemeralPortsAllocator->deallocate(info->ephemeralPorts); |
| |
| LOG(INFO) << "Freed ephemeral ports " << info->ephemeralPorts |
| << " used by container" |
| << (containerId.isSome() |
| ? " " + stringify(containerId.get()) : "") |
| << (info->pid.isSome() |
| ? " with pid " + stringify(info->pid.get()) : ""); |
| } |
| |
| if (!info->pid.isSome()) { |
| LOG(WARNING) << "The container has not been isolated"; |
| return Nothing(); |
| } |
| |
| pid_t pid = info->pid.get(); |
| |
| // NOTE: The 'isolate()' function above may fail at any point if the |
| // child process with 'pid' is gone (e.g., killed by a user, failed |
| // to load shared libraries, etc.). Therefore, this cleanup function |
| // needs to deal with cases where filters/veth/mount are not |
| // installed or do not exist. Also, we choose to continue on each |
| // single failure so that we can clean up filters/veth/mount as much |
| // as possible. We concatenate all the error messages and report |
| // them at the end. |
| vector<string> errors; |
| |
| // Remove the IP filters on eth0 and lo for non-ephemeral port |
| // ranges and the ephemeral port range. |
| foreach (const PortRange& range, |
| getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) { |
| LOG(INFO) << "Removing IP packet filters with ports " << range |
| << " for container with pid " << pid; |
| |
| // No need to remove filters on veth as they will be automatically |
| // removed by the kernel when we remove the link below. |
| Try<Nothing> removing = removeHostIPFilters(range, veth(pid), false); |
| if (removing.isError()) { |
| errors.push_back( |
| "Failed to remove IP packet filter with ports " + |
| stringify(range) + " for container with pid " + |
| stringify(pid) + ": " + removing.error()); |
| } |
| } |
| |
| if (info->flowId.isSome()) { |
| freeFlowIds.insert(info->flowId.get()); |
| |
| LOG(INFO) << "Freed flow ID " << info->flowId.get() |
| << " used by container with pid " << pid; |
| } |
| |
| set<string> targets; |
| foreachvalue (Info* info, infos) { |
| if (info->pid.isSome()) { |
| targets.insert(veth(info->pid.get())); |
| } |
| } |
| |
| if (targets.empty()) { |
| // This is the last container, remove the ARP and ICMP filters on |
| // host eth0, remove the flow classifiers on eth0 egress too. |
| |
| // Remove the ICMP filter on host eth0. |
| Try<bool> icmpEth0ToVeth = filter::icmp::remove( |
| eth0, |
| ingress::HANDLE, |
| icmp::Classifier(hostIPNetwork.address())); |
| |
| if (icmpEth0ToVeth.isError()) { |
| ++metrics.removing_eth0_icmp_filters_errors; |
| |
| errors.push_back( |
| "Failed to remove the ICMP packet filter on host " + eth0 + |
| ": " + icmpEth0ToVeth.error()); |
| } else if (!icmpEth0ToVeth.get()) { |
| ++metrics.removing_eth0_icmp_filters_do_not_exist; |
| |
| LOG(ERROR) << "The ICMP packet filter on host " << eth0 |
| << " does not exist"; |
| } |
| |
| // Remove the ARP filter on host eth0. |
| Try<bool> arpEth0ToVeth = filter::basic::remove( |
| eth0, |
| ingress::HANDLE, |
| ETH_P_ARP); |
| |
| if (arpEth0ToVeth.isError()) { |
| ++metrics.removing_eth0_arp_filters_errors; |
| |
| errors.push_back( |
| "Failed to remove the ARP packet filter on host " + eth0 + |
| ": " + arpEth0ToVeth.error()); |
| } else if (!arpEth0ToVeth.get()) { |
| ++metrics.removing_eth0_arp_filters_do_not_exist; |
| |
| LOG(ERROR) << "The ARP packet filter on host " << eth0 |
| << " does not exist"; |
| } |
| |
| if (flags.egress_unique_flow_per_container) { |
| // Remove the ICMP flow classifier on host eth0. |
| Try<bool> icmpEth0Egress = filter::icmp::remove( |
| eth0, |
| hostTxFqCodelHandle, |
| icmp::Classifier(None())); |
| |
| if (icmpEth0Egress.isError()) { |
| ++metrics.removing_eth0_egress_filters_errors; |
| |
| errors.push_back( |
| "Failed to remove the ICMP flow classifier on host " + eth0 + |
| ": " + icmpEth0Egress.error()); |
| } else if (!icmpEth0Egress.get()) { |
| ++metrics.removing_eth0_egress_filters_do_not_exist; |
| |
| LOG(ERROR) << "The ICMP flow classifier on host " << eth0 |
| << " does not exist"; |
| } |
| |
| // Remove the ARP flow classifier on host eth0. |
| Try<bool> arpEth0Egress = filter::basic::remove( |
| eth0, |
| hostTxFqCodelHandle, |
| ETH_P_ARP); |
| |
| if (arpEth0Egress.isError()) { |
| ++metrics.removing_eth0_egress_filters_errors; |
| |
| errors.push_back( |
| "Failed to remove the ARP flow classifier on host " + eth0 + |
| ": " + arpEth0Egress.error()); |
| } else if (!arpEth0Egress.get()) { |
| ++metrics.removing_eth0_egress_filters_do_not_exist; |
| |
| LOG(ERROR) << "The ARP flow classifier on host " << eth0 |
| << " does not exist"; |
| } |
| |
| // Kernel creates a place-holder filter, with handle 0, for each |
| // tuple (protocol, priority, kind). Our current implementation |
| // doesn't remove them, so all these filters are left. Packets |
| // will be dropped because these filters don't set a valid flow |
| // ID. We have to work around this situation for egress. The |
| // long term solution is removing all these filters after our |
| // own filters are all gone, see the upstream commit |
| // 1e052be69d045c8d0f82ff1116fd3e5a79661745 from: |
| // http://git.kernel.org/cgit/linux/kernel/git/davem/net-next.git. |
| // |
| // So, here we do NOT remove the default flow classifier on host |
| // eth0 on purpose so that after all containers are gone the |
| // host traffic still goes into this flow, this guarantees no |
| // traffic will be dropped by fq_codel qdisc. |
| // |
| // Maybe we need to remove the fq_codel qdisc on host eth0. |
| // TODO(cwang): Revise this in MESOS-2370, we don't remove |
| // ingress qdisc either. |
| } |
| } else { |
| // This is not the last container. Replace the ARP and ICMP |
| // filters. The reason we do this is that we don't have an easy |
| // way to search and delete an action from the multiple actions on |
| // a single filter. |
| Try<bool> icmpEth0ToVeth = filter::icmp::update( |
| eth0, |
| ingress::HANDLE, |
| icmp::Classifier(hostIPNetwork.address()), |
| action::Mirror(targets)); |
| |
| if (icmpEth0ToVeth.isError()) { |
| ++metrics.updating_eth0_icmp_filters_errors; |
| |
| errors.push_back( |
| "Failed to update the ICMP mirror action from host " + eth0 + |
| " to " + veth(pid) + ": " + icmpEth0ToVeth.error()); |
| } else if (!icmpEth0ToVeth.get()) { |
| ++metrics.updating_eth0_icmp_filters_do_not_exist; |
| |
| errors.push_back( |
| "The ICMP packet filter on host " + eth0 + " does not exist"); |
| } |
| |
| Try<bool> arpEth0ToVeth = filter::basic::update( |
| eth0, |
| ingress::HANDLE, |
| ETH_P_ARP, |
| action::Mirror(targets)); |
| |
| if (arpEth0ToVeth.isError()) { |
| ++metrics.updating_eth0_arp_filters_errors; |
| |
| errors.push_back( |
| "Failed to update the ARP mirror action from host " + eth0 + |
| " to " + veth(pid) + ": " + arpEth0ToVeth.error()); |
| } else if (!arpEth0ToVeth.get()) { |
| ++metrics.updating_eth0_arp_filters_do_not_exist; |
| |
| errors.push_back( |
| "The ARP packet filter on host " + eth0 + " does not exist"); |
| } |
| } |
| |
| // We manually remove veth to avoid having to wait for the kernel to |
| // do it. |
| Try<bool> remove = link::remove(veth(pid)); |
| if (remove.isError()) { |
| errors.push_back( |
| "Failed to remove the link " + veth(pid) + ": " + remove.error()); |
| } |
| |
| // Remove the symlink for the network namespace handle if a |
| // container ID is specified. |
| if (containerId.isSome()) { |
| const string linker = getSymlinkPath(containerId.get()); |
| |
| // NOTE: Since we introduced the network namespace handle symlink |
| // in 0.23.0, it's likely that the symlink does not exist. |
| if (os::exists(linker)) { |
| Try<Nothing> rm = os::rm(linker); |
| if (rm.isError()) { |
| errors.push_back( |
| "Failed to remove the network namespace symlink '" + |
| linker + "' " + rm.error()); |
| } |
| } |
| } |
| |
| // Release the bind mount for this container. |
| const string target = getNamespaceHandlePath(bindMountRoot, pid); |
| Try<Nothing> unmount = fs::unmount(target, MNT_DETACH); |
| if (unmount.isError()) { |
| errors.push_back( |
| "Failed to unmount the network namespace handle '" + |
| target + "': " + unmount.error()); |
| } |
| |
| // MNT_DETACH does a lazy unmount, which means unmount will |
| // eventually succeed when the mount point becomes idle, but |
| // possiblely not soon enough every time for this remove to go |
| // through, e.g, someone entered into the container for debugging |
| // purpose. In that case remove will fail, which is okay, because we |
| // only leaked an empty file, which could also be reused later if |
| // the pid (the name of the file) is used again. |
| Try<Nothing> rm = os::rm(target); |
| if (rm.isError()) { |
| LOG(WARNING) << "Failed to remove the network namespace handle '" |
| << target << "' during cleanup: " << rm.error(); |
| } |
| |
| // If any error happens along the way, return error. |
| if (!errors.empty()) { |
| return Error(strings::join(", ", errors)); |
| } |
| |
| LOG(INFO) << "Successfully performed cleanup for pid " << pid; |
| return Nothing(); |
| } |
| |
| |
| // Helper function to set up IP filters on the host side for a given |
| // port range. |
| Try<Nothing> PortMappingIsolatorProcess::addHostIPFilters( |
| const PortRange& range, |
| const Option<uint16_t>& flowId, |
| const string& veth) |
| { |
| // NOTE: The order in which these filters are added is important! |
| // For each port range, we need to make sure that we don't try to |
| // add filters on host eth0 and host lo until we have successfully |
| // added filters on veth. This is because the slave could crash |
| // while we are adding filters, we want to make sure we don't leak |
| // any filters on host eth0 and host lo. |
| |
| // Add an IP packet filter from veth of the container to host eth0 |
| // to properly redirect IP packets sent from one container to |
| // external hosts. This filter has a lower priority compared to the |
| // 'vethToHostLo' filter because it does not check the destination |
| // IP. Notice that here we also check the source port of a packet. |
| // If the source port is not within the port ranges allocated for |
| // the container, the packet will get dropped. |
| Try<bool> vethToHostEth0 = filter::ip::create( |
| veth, |
| ingress::HANDLE, |
| ip::Classifier(None(), None(), range, None()), |
| Priority(IP_FILTER_PRIORITY, LOW), |
| action::Redirect(eth0)); |
| |
| if (vethToHostEth0.isError()) { |
| ++metrics.adding_veth_ip_filters_errors; |
| |
| return Error( |
| "Failed to create an IP packet filter from " + veth + |
| " to host " + eth0 + ": " + vethToHostEth0.error()); |
| } else if (!vethToHostEth0.get()) { |
| ++metrics.adding_veth_ip_filters_already_exist; |
| |
| return Error( |
| "The IP packet filter from " + veth + |
| " to host " + eth0 + " already exists"); |
| } |
| |
| // Add two IP packet filters (one for public IP and one for loopback |
| // IP) from veth of the container to host lo to properly redirect IP |
| // packets sent from one container to either the host or another |
| // container. Notice that here we also check the source port of a |
| // packet. If the source port is not within the port ranges |
| // allocated for the container, the packet will get dropped. |
| Try<bool> vethToHostLoPublic = filter::ip::create( |
| veth, |
| ingress::HANDLE, |
| ip::Classifier(None(), hostIPNetwork.address(), range, None()), |
| Priority(IP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(lo)); |
| |
| if (vethToHostLoPublic.isError()) { |
| ++metrics.adding_veth_ip_filters_errors; |
| |
| return Error( |
| "Failed to create an IP packet filter (for public IP) from " + |
| veth + " to host " + lo + ": " + vethToHostLoPublic.error()); |
| } else if (!vethToHostLoPublic.get()) { |
| ++metrics.adding_veth_ip_filters_already_exist; |
| |
| return Error( |
| "The IP packet filter (for public IP) from " + |
| veth + " to host " + lo + " already exists"); |
| } |
| |
| Try<bool> vethToHostLoLoopback = filter::ip::create( |
| veth, |
| ingress::HANDLE, |
| ip::Classifier( |
| None(), |
| net::IP::Network::LOOPBACK_V4().address(), |
| range, |
| None()), |
| Priority(IP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(lo)); |
| |
| if (vethToHostLoLoopback.isError()) { |
| ++metrics.adding_veth_ip_filters_errors; |
| |
| return Error( |
| "Failed to create an IP packet filter (for loopback IP) from " + |
| veth + " to host " + lo + ": " + vethToHostLoLoopback.error()); |
| } else if (!vethToHostLoLoopback.get()) { |
| ++metrics.adding_veth_ip_filters_already_exist; |
| |
| return Error( |
| "The IP packet filter (for loopback IP) from " + |
| veth + " to host " + lo + " already exists"); |
| } |
| |
| // Add an IP packet filter from host eth0 to veth of the container |
| // such that any incoming IP packet will be properly redirected to |
| // the corresponding container based on its destination port. |
| Try<bool> hostEth0ToVeth = filter::ip::create( |
| eth0, |
| ingress::HANDLE, |
| ip::Classifier(hostMAC, hostIPNetwork.address(), None(), range), |
| Priority(IP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(veth)); |
| |
| if (hostEth0ToVeth.isError()) { |
| ++metrics.adding_eth0_ip_filters_errors; |
| |
| return Error( |
| "Failed to create an IP packet filter from host " + |
| eth0 + " to " + veth + ": " + hostEth0ToVeth.error()); |
| } else if (!hostEth0ToVeth.get()) { |
| ++metrics.adding_eth0_ip_filters_already_exist; |
| |
| return Error( |
| "The IP packet filter from host " + eth0 + " to " + |
| veth + " already exists"); |
| } |
| |
| // Add an IP packet filter from host lo to veth of the container |
| // such that any internally generated IP packet will be properly |
| // redirected to the corresponding container based on its |
| // destination port. |
| Try<bool> hostLoToVeth = filter::ip::create( |
| lo, |
| ingress::HANDLE, |
| ip::Classifier(None(), None(), None(), range), |
| Priority(IP_FILTER_PRIORITY, NORMAL), |
| action::Redirect(veth)); |
| |
| if (hostLoToVeth.isError()) { |
| ++metrics.adding_lo_ip_filters_errors; |
| |
| return Error( |
| "Failed to create an IP packet filter from host " + |
| lo + " to " + veth + ": " + hostLoToVeth.error()); |
| } else if (!hostLoToVeth.get()) { |
| ++metrics.adding_lo_ip_filters_already_exist; |
| |
| return Error( |
| "The IP packet filter from host " + lo + " to " + |
| veth + " already exists"); |
| } |
| |
| if (flowId.isSome()) { |
| // Add IP packet filters to classify traffic sending to eth0 |
| // in the same way so that traffic of each container will be |
| // classified to different flows defined by fq_codel. |
| Try<bool> hostEth0Egress = filter::ip::create( |
| eth0, |
| hostTxFqCodelHandle, |
| ip::Classifier(None(), None(), range, None()), |
| Priority(IP_FILTER_PRIORITY, LOW), |
| Handle(hostTxFqCodelHandle, flowId.get())); |
| |
| if (hostEth0Egress.isError()) { |
| ++metrics.adding_eth0_egress_filters_errors; |
| |
| return Error( |
| "Failed to create a flow classifier for " + veth + |
| " on host " + eth0 + ": " + hostEth0Egress.error()); |
| } else if (!hostEth0Egress.get()) { |
| ++metrics.adding_eth0_egress_filters_already_exist; |
| |
| return Error( |
| "The flow classifier for veth " + veth + |
| " on host " + eth0 + " already exists"); |
| } |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| // Helper function to remove IP filters from the host side for a given |
| // port range. The boolean flag 'removeFiltersOnVeth' indicates if we |
| // need to remove filters on veth. |
| Try<Nothing> PortMappingIsolatorProcess::removeHostIPFilters( |
| const PortRange& range, |
| const string& veth, |
| bool removeFiltersOnVeth) |
| { |
| // NOTE: Similar to above. The order in which these filters are |
| // removed is important. We need to remove filters on host eth0 and |
| // host lo first before we remove filters on veth. |
| |
| // Remove the IP packet filter from host eth0 to veth of the container. |
| Try<bool> hostEth0ToVeth = filter::ip::remove( |
| eth0, |
| ingress::HANDLE, |
| ip::Classifier(hostMAC, hostIPNetwork.address(), None(), range)); |
| |
| if (hostEth0ToVeth.isError()) { |
| ++metrics.removing_eth0_ip_filters_errors; |
| |
| return Error( |
| "Failed to remove the IP packet filter from host " + |
| eth0 + " to " + veth + ": " + hostEth0ToVeth.error()); |
| } else if (!hostEth0ToVeth.get()) { |
| ++metrics.removing_eth0_ip_filters_do_not_exist; |
| |
| LOG(ERROR) << "The IP packet filter from host " << eth0 |
| << " to " << veth << " does not exist"; |
| } |
| |
| // Remove the IP packet filter from host lo to veth of the container. |
| Try<bool> hostLoToVeth = filter::ip::remove( |
| lo, |
| ingress::HANDLE, |
| ip::Classifier(None(), None(), None(), range)); |
| |
| if (hostLoToVeth.isError()) { |
| ++metrics.removing_lo_ip_filters_errors; |
| |
| return Error( |
| "Failed to remove the IP packet filter from host " + |
| lo + " to " + veth + ": " + hostLoToVeth.error()); |
| } else if (!hostLoToVeth.get()) { |
| ++metrics.removing_lo_ip_filters_do_not_exist; |
| |
| LOG(ERROR) << "The IP packet filter from host " << lo |
| << " to " << veth << " does not exist"; |
| } |
| |
| if (flags.egress_unique_flow_per_container) { |
| // Remove the egress flow classifier on host eth0. |
| Try<bool> hostEth0Egress = filter::ip::remove( |
| eth0, |
| hostTxFqCodelHandle, |
| ip::Classifier(None(), None(), range, None())); |
| |
| if (hostEth0Egress.isError()) { |
| ++metrics.removing_eth0_egress_filters_errors; |
| |
| return Error( |
| "Failed to remove the flow classifier from host " + |
| eth0 + " for " + veth + ": " + hostEth0Egress.error()); |
| } else if (!hostEth0Egress.get()) { |
| ++metrics.removing_eth0_egress_filters_do_not_exist; |
| |
| LOG(ERROR) << "The flow classifier from host " << eth0 |
| << " for " << range << " does not exist"; |
| } |
| } |
| |
| // Now, we try to remove filters on veth. No need to proceed if the |
| // user does not ask us to do so. |
| if (!removeFiltersOnVeth) { |
| return Nothing(); |
| } |
| |
| // Remove the IP packet filter from veth of the container to |
| // host lo for the public IP. |
| Try<bool> vethToHostLoPublic = filter::ip::remove( |
| veth, |
| ingress::HANDLE, |
| ip::Classifier(None(), hostIPNetwork.address(), range, None())); |
| |
| if (vethToHostLoPublic.isError()) { |
| ++metrics.removing_lo_ip_filters_errors; |
| |
| return Error( |
| "Failed to remove the IP packet filter (for public IP) from " + |
| veth + " to host " + lo + ": " + vethToHostLoPublic.error()); |
| } else if (!vethToHostLoPublic.get()) { |
| ++metrics.removing_lo_ip_filters_do_not_exist; |
| |
| LOG(ERROR) << "The IP packet filter (for public IP) from " |
| << veth << " to host " << lo << " does not exist"; |
| } |
| |
| // Remove the IP packet filter from veth of the container to |
| // host lo for the loopback IP. |
| Try<bool> vethToHostLoLoopback = filter::ip::remove( |
| veth, |
| ingress::HANDLE, |
| ip::Classifier( |
| None(), |
| net::IP::Network::LOOPBACK_V4().address(), |
| range, |
| None())); |
| |
| if (vethToHostLoLoopback.isError()) { |
| ++metrics.removing_veth_ip_filters_errors; |
| |
| return Error( |
| "Failed to remove the IP packet filter (for loopback IP) from " + |
| veth + " to host " + lo + ": " + vethToHostLoLoopback.error()); |
| } else if (!vethToHostLoLoopback.get()) { |
| ++metrics.removing_veth_ip_filters_do_not_exist; |
| |
| LOG(ERROR) << "The IP packet filter (for loopback IP) from " |
| << veth << " to host " << lo << " does not exist"; |
| } |
| |
| // Remove the IP packet filter from veth of the container to |
| // host eth0. |
| Try<bool> vethToHostEth0 = filter::ip::remove( |
| veth, |
| ingress::HANDLE, |
| ip::Classifier(None(), None(), range, None())); |
| |
| if (vethToHostEth0.isError()) { |
| ++metrics.removing_veth_ip_filters_errors; |
| |
| return Error( |
| "Failed to remove the IP packet filter from " + veth + |
| " to host " + eth0 + ": " + vethToHostEth0.error()); |
| } else if (!vethToHostEth0.get()) { |
| ++metrics.removing_veth_ip_filters_do_not_exist; |
| |
| LOG(ERROR) << "The IP packet filter from " << veth |
| << " to host " << eth0 << " does not exist"; |
| } |
| |
| return Nothing(); |
| } |
| |
| |
| // This function returns the scripts that need to be run in child |
| // context before child execs to complete network isolation. |
| // TODO(jieyu): Use the Subcommand abstraction to remove most of the |
| // logic here. Completely remove this function once we can assume a |
| // newer kernel where 'setns' works for mount namespaces. |
| string PortMappingIsolatorProcess::scripts(Info* info) |
| { |
| ostringstream script; |
| |
| script << "#!/bin/sh\n"; |
| script << "set -xe\n"; |
| |
| // Mark the mount point PORT_MAPPING_BIND_MOUNT_ROOT() as slave |
| // mount so that changes in the container will not be propagated to |
| // the host. |
| script << "mount --make-rslave " << bindMountRoot << "\n"; |
| |
| // Disable IPv6 when IPv6 module is loaded as IPv6 packets won't be |
| // forwarded anyway. |
| script << "test -f /proc/sys/net/ipv6/conf/all/disable_ipv6 &&" |
| << " echo 1 > /proc/sys/net/ipv6/conf/all/disable_ipv6\n"; |
| |
| // Configure lo and eth0. |
| script << "ip link set " << lo << " address " << hostMAC |
| << " mtu " << hostEth0MTU << " up\n"; |
| |
| // NOTE: This is mostly a kernel issue: in veth_xmit() the kernel |
| // tags the packet's checksum as UNNECESSARY if we do not disable it |
| // here, this causes a corrupt packet to be delivered into the stack |
| // when we receive a packet with a bad checksum. Disabling rx |
| // checksum offloading ensures the TCP layer will checksum and drop |
| // it. |
| script << "ethtool -K " << eth0 << " rx off\n"; |
| script << "ip link set " << eth0 << " address " << hostMAC |
| << " mtu " << hostEth0MTU << " up\n"; |
| script << "ip addr add " << hostIPNetwork << " dev " << eth0 << "\n"; |
| |
| // Set up the default gateway to match that of eth0. |
| script << "ip route add default via " << hostDefaultGateway << "\n"; |
| |
| // Restrict the ephemeral ports that can be used by the container. |
| script << "echo " << info->ephemeralPorts.lower() << " " |
| << (info->ephemeralPorts.upper() - 1) |
| << " > /proc/sys/net/ipv4/ip_local_port_range\n"; |
| |
| // Allow eth0 and lo in the container to accept local packets. We |
| // need this because we will set up filters to redirect packets from |
| // lo to eth0 in the container. |
| script << "echo 1 > /proc/sys/net/ipv4/conf/" << eth0 << "/accept_local\n"; |
| script << "echo 1 > /proc/sys/net/ipv4/conf/" << lo << "/accept_local\n"; |
| |
| // Enable route_localnet on lo because by default 127.0.0.1 traffic |
| // is dropped. This feature exists on 3.6 kernel or newer. |
| if (os::exists(path::join("/proc/sys/net/ipv4/conf", lo, "route_localnet"))) { |
| script << "echo 1 > /proc/sys/net/ipv4/conf/" << lo << "/route_localnet\n"; |
| } |
| |
| // Configure container network to match host network configurations. |
| foreachpair (const string& proc, |
| const string& value, |
| hostNetworkConfigurations) { |
| script << "if [ -f \"" << proc << "\" ]; then\n"; |
| script << " echo '" << value << "' > " << proc << "\n"; |
| script << "fi\n"; |
| } |
| |
| // Set up filters on lo and eth0. |
| script << "tc qdisc add dev " << lo << " ingress\n"; |
| script << "tc qdisc add dev " << eth0 << " ingress\n"; |
| |
| // Allow talking between containers and from container to host. |
| // TODO(chzhcn): Consider merging the following two filters. |
| script << "tc filter add dev " << lo << " parent " << ingress::HANDLE |
| << " protocol ip" |
| << " prio " << Priority(IP_FILTER_PRIORITY, NORMAL).get() << " u32" |
| << " flowid ffff:0" |
| << " match ip dst " << hostIPNetwork.address() |
| << " action mirred egress redirect dev " << eth0 << "\n"; |
| |
| script << "tc filter add dev " << lo << " parent " << ingress::HANDLE |
| << " protocol ip" |
| << " prio " << Priority(IP_FILTER_PRIORITY, NORMAL).get() << " u32" |
| << " flowid ffff:0" |
| << " match ip dst " |
| << net::IP::Network::LOOPBACK_V4().address() |
| << " action mirred egress redirect dev " << eth0 << "\n"; |
| |
| foreach (const PortRange& range, |
| getPortRanges(info->nonEphemeralPorts + info->ephemeralPorts)) { |
| // Local traffic inside a container will not be redirected to eth0. |
| script << "tc filter add dev " << lo << " parent " << ingress::HANDLE |
| << " protocol ip" |
| << " prio " << Priority(IP_FILTER_PRIORITY, HIGH).get() << " u32" |
| << " flowid ffff:0" |
| << " match ip dport " << range.begin() << " " |
| << hex << range.mask() << dec << "\n"; |
| |
| // Traffic going to host loopback IP and ports assigned to this |
| // container will be redirected to lo. |
| script << "tc filter add dev " << eth0 << " parent " << ingress::HANDLE |
| << " protocol ip" |
| << " prio " << Priority(IP_FILTER_PRIORITY, NORMAL).get() << " u32" |
| << " flowid ffff:0" |
| << " match ip dst " |
| << net::IP::Network::LOOPBACK_V4().address() |
| << " match ip dport " << range.begin() << " " |
| << hex << range.mask() << dec |
| << " action mirred egress redirect dev " << lo << "\n"; |
| } |
| |
| // Do not forward the ICMP packet if the destination IP is self. |
| script << "tc filter add dev " << lo << " parent " << ingress::HANDLE |
| << " protocol ip" |
| << " prio " << Priority(ICMP_FILTER_PRIORITY, NORMAL).get() << " u32" |
| << " flowid ffff:0" |
| << " match ip protocol 1 0xff" |
| << " match ip dst " << hostIPNetwork.address() << "\n"; |
| |
| script << "tc filter add dev " << lo << " parent " << ingress::HANDLE |
| << " protocol ip" |
| << " prio " << Priority(ICMP_FILTER_PRIORITY, NORMAL).get() << " u32" |
| << " flowid ffff:0" |
| << " match ip protocol 1 0xff" |
| << " match ip dst " |
| << net::IP::Network::LOOPBACK_V4().address() << "\n"; |
| |
| // Display the filters created on eth0 and lo. |
| script << "tc filter show dev " << eth0 |
| << " parent " << ingress::HANDLE << "\n"; |
| script << "tc filter show dev " << lo |
| << " parent " << ingress::HANDLE << "\n"; |
| |
| // If throughput limit for container egress traffic exists, use HTB |
| // qdisc to achieve traffic shaping. |
| // TBF has some known issues with GSO packets. |
| // https://git.kernel.org/cgit/linux/kernel/git/davem/net.git/: |
| // e43ac79a4bc6ca90de4ba10983b4ca39cd215b4b |
| // Additionally, HTB has a simpler interface for just capping the |
| // throughput. TBF requires other parameters such as 'burst' that |
| // HTB already has default values for. |
| if (egressRateLimitPerContainer.isSome()) { |
| script << "tc qdisc add dev " << eth0 << " root handle " |
| << CONTAINER_TX_HTB_HANDLE << " htb default 1\n"; |
| script << "tc class add dev " << eth0 << " parent " |
| << CONTAINER_TX_HTB_HANDLE << " classid " |
| << CONTAINER_TX_HTB_CLASS_ID << " htb rate " |
| << egressRateLimitPerContainer->bytes() * 8 << "bit\n"; |
| |
| // Packets are buffered at the leaf qdisc if we send them faster |
| // than the HTB rate limit and may be dropped when the queue is |
| // full, so we change the default leaf qdisc from pfifo_fast to |
| // fq_codel, which has a larger buffer and better control on |
| // buffer bloat. |
| // TODO(cwang): Verity that fq_codel qdisc is available. |
| script << "tc qdisc add dev " << eth0 |
| << " parent " << CONTAINER_TX_HTB_CLASS_ID << " fq_codel\n"; |
| |
| // Display the htb qdisc and class created on eth0. |
| script << "tc qdisc show dev " << eth0 << "\n"; |
| script << "tc class show dev " << eth0 << "\n"; |
| } |
| |
| return script.str(); |
| } |
| |
| |
| uint16_t PortMappingIsolatorProcess::getNextFlowId() |
| { |
| // NOTE: It is very unlikely that we exhaust all the flow IDs. |
| CHECK(freeFlowIds.begin() != freeFlowIds.end()); |
| |
| uint16_t flowId = *freeFlowIds.begin(); |
| |
| freeFlowIds.erase(freeFlowIds.begin()); |
| |
| return flowId; |
| } |
| |
| |
| //////////////////////////////////////////////////// |
| // Implementation for the ephemeral ports allocator. |
| //////////////////////////////////////////////////// |
| |
| |
| uint32_t EphemeralPortsAllocator::nextMultipleOf(uint32_t x, uint32_t m) |
| { |
| uint32_t div = x / m; |
| uint32_t mod = x % m; |
| |
| return (div + (mod == 0 ? 0 : 1)) * m; |
| } |
| |
| |
| Try<Interval<uint16_t>> EphemeralPortsAllocator::allocate() |
| { |
| if (portsPerContainer_ == 0) { |
| return Error("Number of ephemeral ports per container is zero"); |
| } |
| |
| Option<Interval<uint16_t>> allocated; |
| |
| foreach (const Interval<uint16_t>& interval, free) { |
| uint16_t upper = interval.upper(); |
| uint16_t lower = interval.lower(); |
| uint16_t size = upper - lower; |
| |
| if (size < portsPerContainer_) { |
| continue; |
| } |
| |
| // If 'lower' is not aligned, calculate the new aligned 'lower'. |
| if (lower % portsPerContainer_ != 0) { |
| lower = nextMultipleOf(lower, portsPerContainer_); |
| if (lower + portsPerContainer_ > upper) { |
| continue; |
| } |
| } |
| |
| allocated = (Bound<uint16_t>::closed(lower), |
| Bound<uint16_t>::open(lower + portsPerContainer_)); |
| break; |
| } |
| |
| if (allocated.isNone()) { |
| return Error("Failed to allocate ephemeral ports"); |
| } |
| |
| allocate(allocated.get()); |
| |
| return allocated.get(); |
| } |
| |
| |
| void EphemeralPortsAllocator::allocate(const Interval<uint16_t>& ports) |
| { |
| CHECK(free.contains(ports)); |
| CHECK(!used.contains(ports)); |
| free -= ports; |
| used += ports; |
| } |
| |
| |
| void EphemeralPortsAllocator::deallocate(const Interval<uint16_t>& ports) |
| { |
| CHECK(!free.contains(ports)); |
| CHECK(used.contains(ports)); |
| free += ports; |
| used -= ports; |
| } |
| |
| |
| // This function is exposed for unit testing. |
| vector<PortRange> getPortRanges(const IntervalSet<uint16_t>& ports) |
| { |
| vector<PortRange> ranges; |
| |
| foreach (const Interval<uint16_t>& interval, ports) { |
| uint16_t lower = interval.lower(); // Inclusive lower. |
| uint16_t upper = interval.upper(); // Exclusive upper. |
| |
| // Construct a set of valid port ranges (i.e., that can be used by |
| // a filter) from 'interval'. We keep incrementing 'lower' as we |
| // find valid port ranges until we reach 'upper'. |
| while (lower < upper) { |
| // Determine the size of the port range starting from 'lower'. |
| // The size has to satisfy the following conditions: 1) size = |
| // 2^n (n=0,1,2,...); 2) lower % size == 0. |
| size_t size; |
| for (size = roundDownToPowerOfTwo(lower) ; size > 1; size = size / 2) { |
| if (lower % size == 0 && lower + size <= upper) { |
| break; |
| } |
| } |
| |
| // Construct the port range given the size. |
| Try<PortRange> range = PortRange::fromBeginEnd(lower, lower + size - 1); |
| |
| CHECK_SOME(range) << "Invalid port range: " << "[" << lower << "," |
| << (lower + size - 1) << "]"; |
| |
| ranges.push_back(range.get()); |
| |
| lower += size; |
| } |
| } |
| |
| return ranges; |
| } |
| |
| } // namespace slave { |
| } // namespace internal { |
| } // namespace mesos { |