blob: 78cfe73afe0faeddf6770e42606565dcc2bbce5c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.supervisor;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.container.CgroupCenter;
import com.alibaba.jstorm.container.Hierarchy;
import com.alibaba.jstorm.container.SubSystemType;
import com.alibaba.jstorm.utils.SystemOperation;
import com.alibaba.jstorm.container.cgroup.CgroupCommon;
import com.alibaba.jstorm.container.cgroup.core.CgroupCore;
import com.alibaba.jstorm.container.cgroup.core.CpuCore;
import com.alibaba.jstorm.utils.JStormUtils;
/**
* @author Johnfang (xiaojian.fxj@alibaba-inc.com)
*/
public class CgroupManager {
public static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class);
public static final String JSTORM_HIERARCHY_NAME = "jstorm_cpu";
public static final int ONE_CPU_SLOT = 1024;
private CgroupCenter center;
private Hierarchy h;
private CgroupCommon rootCgroup;
private static final String JSTORM_CPU_HIERARCHY_DIR = "/cgroup/cpu";
private static String rootDir;
public CgroupManager(Map conf) {
LOG.info("running on cgroup mode");
// Cgconfig service is used to create the corresponding cpu hierarchy
// "/cgroup/cpu"
rootDir = ConfigExtension.getCgroupRootDir(conf);
if (rootDir == null)
throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing.");
File file = new File(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir);
if (!file.exists()) {
LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir + " is not existing.");
throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file.");
}
center = CgroupCenter.getInstance();
if (center == null)
throw new RuntimeException("Cgroup error, please check /proc/cgroups");
this.prepareSubSystem();
}
private int validateCpuUpperLimitValue(int value) {
/*
* Valid value is -1 or 1~10 -1 means no control
*/
if (value > 10)
value = 10;
else if (value < 1 && value != -1)
value = 1;
return value;
}
private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException {
/*
* User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. If making a process to fully use two cpu cores, set cfs_period_us to
* 100000 and set cfs_quota_us to 200000 The highest value of "cpu core upper limit" is 10
*/
cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit);
if (cpuCoreUpperLimit == -1) {
// No control of cpu usage
cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit);
} else {
cpuCore.setCpuCfsPeriodUs(100000);
cpuCore.setCpuCfsQuotaUs(cpuCoreUpperLimit * 100000);
}
}
public String startNewWorker(Map conf, int cpuNum, String workerId) throws SecurityException, IOException {
CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup);
this.center.create(workerGroup);
CgroupCore cpu = workerGroup.getCores().get(SubSystemType.cpu);
CpuCore cpuCore = (CpuCore) cpu;
cpuCore.setCpuShares(cpuNum * ONE_CPU_SLOT);
setCpuUsageUpperLimit(cpuCore, ConfigExtension.getWorkerCpuCoreUpperLimit(conf));
StringBuilder sb = new StringBuilder();
sb.append("cgexec -g cpu:").append(workerGroup.getName()).append(" ");
return sb.toString();
}
public void shutDownWorker(String workerId, boolean isKilled) {
CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup);
try {
if (isKilled == false) {
for (Integer pid : workerGroup.getTasks()) {
JStormUtils.kill(pid);
}
JStormUtils.sleepMs(1500);
}
center.delete(workerGroup);
} catch (Exception e) {
LOG.info("No task of " + workerId);
}
}
public void close() throws IOException {
this.center.delete(this.rootCgroup);
}
private void prepareSubSystem() {
h = center.busy(SubSystemType.cpu);
if (h == null) {
Set<SubSystemType> types = new HashSet<SubSystemType>();
types.add(SubSystemType.cpu);
h = new Hierarchy(JSTORM_HIERARCHY_NAME, types, JSTORM_CPU_HIERARCHY_DIR);
}
rootCgroup = new CgroupCommon(rootDir, h, h.getRootCgroups());
}
}