blob: 0a845ae34ab9baaa9e6833a8e1d35a6009c68785 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.container.cgroup;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.SystemUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.container.DefaultResourceIsolationManager;
import org.apache.storm.container.cgroup.core.CpuCore;
import org.apache.storm.container.cgroup.core.CpusetCore;
import org.apache.storm.container.cgroup.core.MemoryCore;
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
import org.apache.storm.daemon.supervisor.ExitCodeCallback;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that implements ResourceIsolationInterface that manages cgroups.
*/
public class CgroupManager extends DefaultResourceIsolationManager {
private static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class);
private CgroupCenter center;
private Hierarchy hierarchy;
private CgroupCommon rootCgroup;
private String rootDir;
private Map<String, String> workerToNumaId;
/**
* initialize data structures.
*
* @param conf storm confs
*/
@Override
public void prepare(Map<String, Object> conf) throws IOException {
super.prepare(conf);
this.rootDir = DaemonConfig.getCgroupRootDir(this.conf);
if (this.rootDir == null) {
throw new RuntimeException("Check configuration file. The storm.supervisor.cgroup.rootdir is missing.");
}
File file = new File(DaemonConfig.getCgroupStormHierarchyDir(conf), rootDir);
if (!file.exists()) {
LOG.error("{} does not exist", file.getPath());
throw new RuntimeException(
"Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file.");
}
this.center = CgroupCenter.getInstance();
if (this.center == null) {
throw new RuntimeException("Cgroup error, please check /proc/cgroups");
}
this.prepareSubSystem(this.conf);
workerToNumaId = new ConcurrentHashMap();
}
/**
* Initialize subsystems.
*/
private void prepareSubSystem(Map<String, Object> conf) throws IOException {
List<SubSystemType> subSystemTypes = new LinkedList<>();
for (String resource : DaemonConfig.getCgroupStormResources(conf)) {
subSystemTypes.add(SubSystemType.getSubSystem(resource));
}
this.hierarchy = center.getHierarchyWithSubSystems(subSystemTypes);
if (this.hierarchy == null) {
Set<SubSystemType> types = new HashSet<>();
types.add(SubSystemType.cpu);
this.hierarchy = new Hierarchy(DaemonConfig.getCgroupStormHierarchyName(conf), types,
DaemonConfig.getCgroupStormHierarchyDir(conf));
}
this.rootCgroup =
new CgroupCommon(this.rootDir, this.hierarchy, this.hierarchy.getRootCgroups());
// set upper limit to how much cpu can be used by all workers running on supervisor node.
// This is done so that some cpu cycles will remain free to run the daemons and other miscellaneous OS
// operations.
CpuCore supervisorRootCpu = (CpuCore) this.rootCgroup.getCores().get(SubSystemType.cpu);
setCpuUsageUpperLimit(supervisorRootCpu, ((Number) this.conf.get(Config.SUPERVISOR_CPU_CAPACITY)).intValue());
}
/**
* Use cfs_period & cfs_quota to control the upper limit use of cpu core e.g.
* If making a process to fully use two cpu cores, set cfs_period_us to
* 100000 and set cfs_quota_us to 200000
*/
private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException {
if (cpuCoreUpperLimit == -1) {
// No control of cpu usage
cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit);
} else {
cpuCore.setCpuCfsPeriodUs(100000);
cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit * 1000);
}
}
@Override
public void reserveResourcesForWorker(String workerId, Integer totalMem, Integer cpuNum, String numaId) throws SecurityException {
LOG.info("Creating cgroup for worker {} with resources {} MB {} % CPU", workerId, totalMem, cpuNum);
// The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by
// RAS (Resource Aware Scheduler)
if (conf.get(DaemonConfig.STORM_WORKER_CGROUP_CPU_LIMIT) != null) {
cpuNum = ((Number) conf.get(DaemonConfig.STORM_WORKER_CGROUP_CPU_LIMIT)).intValue();
}
// The manually set STORM_WORKER_CGROUP_MEMORY_MB_LIMIT config on supervisor will overwrite
// resources assigned by RAS (Resource Aware Scheduler)
if (this.conf.get(DaemonConfig.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT) != null) {
totalMem =
((Number) this.conf.get(DaemonConfig.STORM_WORKER_CGROUP_MEMORY_MB_LIMIT)).intValue();
}
CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
try {
this.center.createCgroup(workerGroup);
} catch (Exception e) {
throw new RuntimeException("Error when creating Cgroup! Exception: ", e);
}
if (cpuNum != null) {
CpuCore cpuCore = (CpuCore) workerGroup.getCores().get(SubSystemType.cpu);
try {
cpuCore.setCpuShares(cpuNum.intValue());
} catch (IOException e) {
throw new RuntimeException("Cannot set cpu.shares! Exception: ", e);
}
}
if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
if (totalMem != null) {
int cgroupMem =
(int)
(Math.ceil(
ObjectReader.getDouble(
this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
0.0)));
long memLimit = Long.valueOf((totalMem.longValue() + cgroupMem) * 1024 * 1024);
MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
try {
memCore.setPhysicalUsageLimit(memLimit);
} catch (IOException e) {
throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e);
}
// need to set memory.memsw.limit_in_bytes after setting memory.limit_in_bytes or error
// might occur
try {
memCore.setWithSwapUsageLimit(memLimit);
} catch (IOException e) {
throw new RuntimeException("Cannot set memory.memsw.limit_in_bytes! Exception: ", e);
}
}
}
if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_INHERIT_CPUSET_CONFIGS)) {
if (workerGroup.getParent().getCores().containsKey(SubSystemType.cpuset)) {
CpusetCore parentCpusetCore = (CpusetCore) workerGroup.getParent().getCores().get(SubSystemType.cpuset);
CpusetCore cpusetCore = (CpusetCore) workerGroup.getCores().get(SubSystemType.cpuset);
try {
cpusetCore.setCpus(parentCpusetCore.getCpus());
} catch (IOException e) {
throw new RuntimeException("Cannot set cpuset.cpus! Exception: ", e);
}
try {
cpusetCore.setMems(parentCpusetCore.getMems());
} catch (IOException e) {
throw new RuntimeException("Cannot set cpuset.mems! Exception: ", e);
}
}
}
if (numaId != null) {
workerToNumaId.put(workerId, numaId);
}
}
@Override
public void releaseResourcesForWorker(String workerId) {
CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, this.rootCgroup);
try {
Set<Integer> tasks = workerGroup.getTasks();
if (!tasks.isEmpty()) {
throw new Exception("Cannot correctly shutdown worker CGroup " + workerId + "tasks " + tasks
+ " still running!");
}
this.center.deleteCgroup(workerGroup);
} catch (Exception e) {
LOG.error("Exception thrown when shutting worker {} Exception: {}", workerId, e);
}
}
/**
* Extracting out to mock it for tests.
* @return true if on Linux.
*/
protected static boolean isOnLinux() {
return SystemUtils.IS_OS_LINUX;
}
private void prefixNumaPinning(List<String> command, String numaId) {
if (isOnLinux()) {
command.add(0, "numactl");
command.add(1, "--cpunodebind=" + numaId);
command.add(2, "--membind=" + numaId);
return;
} else {
// TODO : Add support for pinning on Windows host
throw new RuntimeException("numactl pinning currently not supported on non-Linux hosts");
}
}
@Override
public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
int port, String workerId,
List<String> command, Map<String, String> env, String logPrefix,
ExitCodeCallback processExitCallback, File targetDir) throws IOException {
if (workerToNumaId.containsKey(workerId)) {
prefixNumaPinning(command, workerToNumaId.get(workerId));
}
if (runAsUser) {
String workerDir = targetDir.getAbsolutePath();
List<String> args = Arrays.asList("worker", workerDir, ServerUtils.writeScript(workerDir, command, env));
List<String> commandPrefix = getLaunchCommandPrefix(workerId);
ClientSupervisorUtils.processLauncher(conf, user, commandPrefix, args, null,
logPrefix, processExitCallback, targetDir);
} else {
command = getLaunchCommand(workerId, command);
ClientSupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);
}
}
/**
* To compose launch command based on workerId and existing command.
* @param workerId the worker id
* @param existingCommand the current command to run that may need to be modified
* @return new commandline with necessary additions to launch worker
*/
@VisibleForTesting
public List<String> getLaunchCommand(String workerId, List<String> existingCommand) {
List<String> newCommand = getLaunchCommandPrefix(workerId);
if (workerToNumaId.containsKey(workerId)) {
prefixNumaPinning(newCommand, workerToNumaId.get(workerId));
}
newCommand.addAll(existingCommand);
return newCommand;
}
private List<String> getLaunchCommandPrefix(String workerId) {
CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
if (!this.rootCgroup.getChildren().contains(workerGroup)) {
throw new RuntimeException(
"cgroup " + workerGroup + " doesn't exist! Need to reserve resources for worker first!");
}
StringBuilder sb = new StringBuilder();
sb.append(this.conf.get(DaemonConfig.STORM_CGROUP_CGEXEC_CMD)).append(" -g ");
Iterator<SubSystemType> it = this.hierarchy.getSubSystems().iterator();
while (it.hasNext()) {
sb.append(it.next().toString());
if (it.hasNext()) {
sb.append(",");
} else {
sb.append(":");
}
}
sb.append(workerGroup.getName());
List<String> newCommand = new ArrayList<String>();
newCommand.addAll(Arrays.asList(sb.toString().split(" ")));
return newCommand;
}
private Set<Long> getRunningPids(String workerId) throws IOException {
CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
if (!this.rootCgroup.getChildren().contains(workerGroup)) {
LOG.warn("cgroup {} doesn't exist!", workerGroup);
return Collections.emptySet();
}
return workerGroup.getPids();
}
/**
* Get all of the pids that are a part of this container.
* @param workerId the worker id
* @return all of the pids that are a part of this container
*/
@Override
protected Set<Long> getAllPids(String workerId) throws IOException {
Set<Long> ret = super.getAllPids(workerId);
Set<Long> morePids = getRunningPids(workerId);
assert (morePids != null);
ret.addAll(morePids);
return ret;
}
@Override
public long getMemoryUsage(String user, String workerId, int port) throws IOException {
CgroupCommon workerGroup = new CgroupCommon(workerId, this.hierarchy, this.rootCgroup);
MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
return memCore.getPhysicalUsage();
}
@Override
public long getSystemFreeMemoryMb() throws IOException {
long rootCgroupLimitFree = Long.MAX_VALUE;
try {
MemoryCore memRoot = (MemoryCore) rootCgroup.getCores().get(SubSystemType.memory);
if (memRoot != null) {
//For cgroups no limit is max long.
long limit = memRoot.getPhysicalUsageLimit();
long used = memRoot.getMaxPhysicalUsage();
rootCgroupLimitFree = (limit - used) / 1024 / 1024;
}
} catch (FileNotFoundException e) {
//Ignored if cgroups is not setup don't do anything with it
}
return Long.min(rootCgroupLimitFree, ServerUtils.getMemInfoFreeMb());
}
@Override
public boolean isResourceManaged() {
return true;
}
}