blob: 126685f3389bff1d737b84c1af56da861f42470f [file] [log] [blame]
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TrafficControlBandwidthHandlerImpl
implements OutboundBandwidthResourceHandler {
private static final Logger LOG =
LoggerFactory.getLogger(TrafficControlBandwidthHandlerImpl.class);
//In the absence of 'scheduling' support, we'll 'infer' the guaranteed
//outbound bandwidth for each container based on this number. This will
//likely go away once we add support on the RM for this resource type.
private static final int MAX_CONTAINER_COUNT = 50;
private final PrivilegedOperationExecutor privilegedOperationExecutor;
private final CGroupsHandler cGroupsHandler;
private final TrafficController trafficController;
private final ConcurrentHashMap<ContainerId, Integer> containerIdClassIdMap;
private Configuration conf;
private String device;
private boolean strictMode;
private int containerBandwidthMbit;
private int rootBandwidthMbit;
private int yarnBandwidthMbit;
public TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
privilegedOperationExecutor, CGroupsHandler cGroupsHandler,
TrafficController trafficController) {
this.privilegedOperationExecutor = privilegedOperationExecutor;
this.cGroupsHandler = cGroupsHandler;
this.trafficController = trafficController;
this.containerIdClassIdMap = new ConcurrentHashMap<>();
}
/**
* Bootstrapping 'outbound-bandwidth' resource handler - mounts net_cls
* controller and bootstraps a traffic control bandwidth shaping hierarchy
* @param configuration yarn configuration in use
* @return (potentially empty) list of privileged operations to execute.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
conf = configuration;
//We'll do this inline for the time being - since this is a one time
//operation. At some point, LCE code can be refactored to batch mount
//operations across multiple controllers - cpu, net_cls, blkio etc
cGroupsHandler
.initializeCGroupController(CGroupsHandler.CGroupController.NET_CLS);
device = conf.get(YarnConfiguration.NM_NETWORK_RESOURCE_INTERFACE,
YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE);
strictMode = configuration.getBoolean(YarnConfiguration
.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, YarnConfiguration
.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
rootBandwidthMbit = conf.getInt(YarnConfiguration
.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, YarnConfiguration
.DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT);
yarnBandwidthMbit = conf.getInt(YarnConfiguration
.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, rootBandwidthMbit);
containerBandwidthMbit = (int) Math.ceil((double) yarnBandwidthMbit /
MAX_CONTAINER_COUNT);
StringBuffer logLine = new StringBuffer("strict mode is set to :")
.append(strictMode).append(System.lineSeparator());
if (strictMode) {
logLine.append("container bandwidth will be capped to soft limit.")
.append(System.lineSeparator());
} else {
logLine.append(
"containers will be allowed to use spare YARN bandwidth.")
.append(System.lineSeparator());
}
logLine
.append("containerBandwidthMbit soft limit (in mbit/sec) is set to : ")
.append(containerBandwidthMbit);
LOG.info(logLine.toString());
trafficController.bootstrap(device, rootBandwidthMbit, yarnBandwidthMbit);
return null;
}
/**
* Pre-start hook for 'outbound-bandwidth' resource. A cgroup is created
* and a net_cls classid is generated and written to a cgroup file. A
* traffic control shaping rule is created in order to limit outbound
* bandwidth utilization.
* @param container Container being launched
* @return privileged operations for some cgroups/tc operations.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String containerIdStr = container.getContainerId().toString();
int classId = trafficController.getNextClassId();
String classIdStr = trafficController.getStringForNetClsClassId(classId);
cGroupsHandler.createCGroup(CGroupsHandler.CGroupController
.NET_CLS,
containerIdStr);
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS,
containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr);
containerIdClassIdMap.put(container.getContainerId(), classId);
//Now create a privileged operation in order to update the tasks file with
//the pid of the running container process (root of process tree). This can
//only be done at the time of launching the container, in a privileged
//executable.
String tasksFile = cGroupsHandler.getPathForCGroupTasks(
CGroupsHandler.CGroupController.NET_CLS, containerIdStr);
String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX)
.append(tasksFile).toString();
List<PrivilegedOperation> ops = new ArrayList<>();
ops.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg));
//Create a privileged operation to create a tc rule for this container
//We'll return this to the calling (Linux) Container Executor
//implementation for batching optimizations so that we don't fork/exec
//additional times during container launch.
TrafficController.BatchBuilder builder = trafficController.new
BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE);
builder.addContainerClass(classId, containerBandwidthMbit, strictMode);
ops.add(builder.commitBatchToTempFile());
return ops;
}
/**
* Reacquires state for a container - reads the classid from the cgroup
* being used for the container being reacquired
* @param containerId if of the container being reacquired.
* @return (potentially empty) list of privileged operations
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
String containerIdStr = containerId.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to reacquire classId for container: " +
containerIdStr);
}
String classIdStrFromFile = cGroupsHandler.getCGroupParam(
CGroupsHandler.CGroupController.NET_CLS, containerIdStr,
CGroupsHandler.CGROUP_PARAM_CLASSID);
int classId = trafficController
.getClassIdFromFileContents(classIdStrFromFile);
LOG.info("Reacquired containerId -> classId mapping: " + containerIdStr
+ " -> " + classId);
containerIdClassIdMap.put(containerId, classId);
return null;
}
/**
* Returns total bytes sent per container to be used for metrics tracking
* purposes.
* @return a map of containerId to bytes sent
* @throws ResourceHandlerException
*/
public Map<ContainerId, Integer> getBytesSentPerContainer()
throws ResourceHandlerException {
Map<Integer, Integer> classIdStats = trafficController.readStats();
Map<ContainerId, Integer> containerIdStats = new HashMap<>();
for (Map.Entry<ContainerId, Integer> entry : containerIdClassIdMap
.entrySet()) {
ContainerId containerId = entry.getKey();
Integer classId = entry.getValue();
Integer bytesSent = classIdStats.get(classId);
if (bytesSent == null) {
LOG.warn("No bytes sent metric found for container: " + containerId +
" with classId: " + classId);
continue;
}
containerIdStats.put(containerId, bytesSent);
}
return containerIdStats;
}
/**
* Cleanup operations once container is completed - deletes cgroup and
* removes traffic shaping rule(s).
* @param containerId of the container that was completed.
* @return
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
LOG.info("postComplete for container: " + containerId.toString());
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS,
containerId.toString());
Integer classId = containerIdClassIdMap.get(containerId);
if (classId != null) {
PrivilegedOperation op = trafficController.new
BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
.deleteContainerClass(classId).commitBatchToTempFile();
try {
privilegedOperationExecutor.executePrivilegedOperation(op, false);
trafficController.releaseClassId(classId);
} catch (PrivilegedOperationException e) {
LOG.warn("Failed to delete tc rule for classId: " + classId);
throw new ResourceHandlerException(
"Failed to delete tc rule for classId:" + classId);
}
} else {
LOG.warn("Not cleaning up tc rules. classId unknown for container: " +
containerId.toString());
}
return null;
}
@Override
public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
if (LOG.isDebugEnabled()) {
LOG.debug("teardown(): Nothing to do");
}
return null;
}
}