blob: 98ff03c1c7dc553b001be3cad863e0ee019c3261 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.launcher;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
/**
* Manages cgroup container on a node
*
* Supported operations: - cgcreate - creates cgroup container - cgset - sets max memory limit for
* an existing container
*
* This code supports both new and old cgconfig. The old configuration adds cgroups to <cgroup
* location>/ducc where the new adds it to: <cgroup location/cpu/ducc and <cgroup
* location>/memory/ducc. On startup the agent detects which cgconfig is active and adjusts
* accordingly.
*
*/
public class CGroupsManager {
private DuccLogger agentLogger = null;
private static final String SYSTEM = System.getProperty("user.name"); // Agent runs as the "ducc"
// user
// the following three properties are only used for the new cgconfig
private static final String CGDuccMemoryPath = "/memory/" + SYSTEM + "/";
private static final String CGDuccCpuPath = "/cpu/" + SYSTEM + "/";
private static final String CGProcsFile = "/cgroup.procs";
// legacy means that the cgonfig points to <cgroup location>/ducc
private boolean legacyCgConfig = false;
private Object waitForLockObject = new Object();
enum CGroupCommand {
CGSET("cgset"), CGCREATE("cgcreate");
String cmd;
CGroupCommand(String cmd) {
this.cmd = cmd;
}
public String cmd() {
return cmd;
}
};
// manages list of 'active' cgroup containers by container id
private Set<String> containerIds = new LinkedHashSet<String>();
// stores cgroup base location
private String cgroupBaseDir = "";
// stores cgroup utils location like cgcreate, cgset, etc
private String cgroupUtilsDir = null;
// stores comma separated list of subsystems like cpu,memory
private boolean cpuInfoSymlinked = true;
private String cgroupSubsystems = ""; // comma separated list of subsystems
private long retryMax = 4;
private long delayFactor = 2000; // 2 secs in millis
private long maxTimeToWaitForProcessToStop;
private static String fetchCgroupsBaseDir(String mounts) {
String cbaseDir = null;
BufferedReader br = null;
List<String> cgroupsEntries = new ArrayList<String>();
try {
List<String> lines = Files.readAllLines(Paths.get(mounts), Charset.defaultCharset());
for (String line : lines) {
// trim the list to just cgroups
if (line.trim().startsWith("cgroup")) {
cgroupsEntries.add(line);
}
}
// check if this is legacy cgroups installation looking like
// cgroup /cgroup cgroup rw,relatime,memory,cpuacct,cpu 0 0
if (cgroupsEntries.size() == 1) {
String[] cgroupsInfo = cgroupsEntries.get(0).split(" ");
// return the mount point minus the memory part
cbaseDir = cgroupsInfo[1].trim();
} else {
// check if this is recent cgroups installation looking like
// cgroup /cgroup/memory cgroup rw,relatime,memory 0 0
// OR
// cgroup /sys/fs/cgroup/memory cgroup rw ...
// return the mount point minus the memory part
for (String line : cgroupsEntries) {
String[] cgroupsInfo = line.split(" ");
if (cgroupsInfo[1].indexOf("/memory") != -1) {
cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory"));
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (Exception ex) {
}
}
}
return cbaseDir;
}
/**
* @param args
*/
public static void main(String[] args) {
String cgroupsUtilsDirs = "/usr/bin,/bin"; // System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
String cgUtilsPath = null;
DuccLogger logger = DuccLogger.getLogger(CGroupsManager.class, "CGroupsManager");
// boolean useCgroups = false;
String[] paths = cgroupsUtilsDirs.split(",");
for (String path : paths) {
File file = new File(path.trim() + "/cgexec");
if (file.exists()) {
cgUtilsPath = path;
break;
}
}
String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");
if (cgUtilsPath == null) {
System.out.println(
"------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties");
} else if (cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) {
System.out.println(
"------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts");
} else {
System.out.println("Agent found cgroups runtime in " + cgUtilsPath + " cgroups base dir="
+ cgroupsBaseDir);
String cgroupsSubsystems = "memory,cpu";
long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
CGroupsManager cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir,
cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);
// check if cgroups base directory exists in the filesystem
// which means that cgroups
// and cgroups convenience package are installed and the
// daemon is up and running.
try {
if (cgroupsManager.cgroupExists(cgroupsBaseDir)) {
System.out.println("Agent found cgroup base directory in " + cgroupsBaseDir);
String containerId = "test";
// validate cgroups by creating a dummy cgroup. The code checks if cgroup actually got
// created by
// verifying existence of test cgroup file. The second step in verification is to check if
// CPU control is working. Configured in cgconfig.conf, the CPU control allows for setting
// cpu.shares. The code will attempt to set the shares and subsequently tries to read the
// value from cpu.shares file to make sure the values match. Any exception in the above
// steps
// will cause cgroups to be disabled.
//
cgroupsManager
.validator(cgroupsBaseDir, containerId, System.getProperty("user.name"), false)
.cgcreate().cgset(100); // write cpu.shares=100 and validate
// cleanup dummy cgroup
cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"), -9);
}
} catch (Exception ee) {
ee.printStackTrace();
}
}
}
public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String subsystems,
DuccLogger agentLogger, long maxTimeToWaitForProcessToStop) {
this.cgroupUtilsDir = cgroupUtilsDir;
this.cgroupBaseDir = cgroupBaseDir;
this.cgroupSubsystems = subsystems;
this.agentLogger = agentLogger;
this.maxTimeToWaitForProcessToStop = maxTimeToWaitForProcessToStop;
// on some systems cpu and cpuacct may be linked to the same directory. In such
// cases we need adjust cgdelete command to only include memory,cpu as submodules:
//
// determine what cgroup base location should be. For legacy cgconfig
// it will be <cgroup folder>/ducc
try {
// check if the new (standard) cgconfig is active. It should have
// the following format <cgroup location>/memory
File f = new File(cgroupBaseDir + "/memory");
if (!f.exists()) {
// legacy cgconfig is active
this.cgroupBaseDir += "/" + SYSTEM + "/";
legacyCgConfig = true;
} else {
// new (standard) cgconfig is active
}
} catch (Exception e) {
e.printStackTrace();
// if there is an error here, the new cgconfig is assumed and subject
// to additional testing on agent startup.
}
cpuInfoSymlinked = symbolicLinksForCpu();
String location = getCGroupLocation("cpuacct").trim();
if (!legacyCgConfig) {
if (!location.endsWith(System.getProperty("file.separator"))) {
location += System.getProperty("file.separator");
}
location += SYSTEM + System.getProperty("file.separator");
}
System.out.println("------------- Location:" + location);
/*
* if ( !legacyCgConfig ) { location = cgroupBaseDir;
* //SYSTEM+System.getProperty("file.separator"); if (
* !location.endsWith(System.getProperty("file.separator"))) { location = location +
* System.getProperty("file.separator"); } location +=
* SYSTEM+System.getProperty("file.separator"); } else { location =
* getCGroupLocation("cpuacct").trim(); if (
* !location.endsWith(System.getProperty("file.separator"))) { location = location +
* System.getProperty("file.separator"); }
*
* }
*/
File cpuacctUsageFile = new File(location + "cpuacct.usage");
if (cpuacctUsageFile.exists()) {
System.out.println("Got cpuacct.usage file");
this.cgroupSubsystems += ",cpuacct";
}
}
/**
* Return location where cgroups utils like cgcreate can be found
*
* @return - absolute path to cgroup utils
*/
public String getCGroupsUtilsDir() {
return cgroupUtilsDir;
}
/**
* Return cgroup base dir for legacy cgconfig or base dir/subsystem for the new cgconfig. The old
* looks like <cgroup folder>/ducc where the new looks like <cgroup folder>/memory
*/
private String getCGroupLocation(String subsystem) {
String location = cgroupBaseDir.trim();
if (legacyCgConfig) {
return location;
} else if (!cgroupBaseDir.endsWith(System.getProperty("file.separator"))) {
location += System.getProperty("file.separator");
}
return location + subsystem;
}
private boolean isCpuSubmodule(String[] parts, String submoduleName) {
return (parts.length > 10 && parts[10].equals(submoduleName));
}
private boolean symbolicLinksForCpu() {
// File f = new File("/sys/fs/cgroup");
File f = new File(cgroupBaseDir);
if (!f.exists()) {
return false;
}
InputStreamReader isr = null;
BufferedReader reader = null;
try {
// String cmd[] = {"/usr/bin/ls","-l","/sys/fs/cgroup"};
String cmd[] = { "/bin/ls", "-l", cgroupBaseDir };
StringBuffer sb = new StringBuffer();
for (String s : cmd) {
sb.append(s).append(" ");
}
agentLogger.info("symbolicLinksForCpu", null,
"Launching Process - Commandline:" + sb.toString());
ProcessBuilder processLauncher = new ProcessBuilder();
processLauncher.command(cmd);
processLauncher.redirectErrorStream(true);
java.lang.Process process = processLauncher.start();
isr = new InputStreamReader(process.getInputStream());
reader = new BufferedReader(isr);
String line;
String cpuLinkDir = "";
String cpuacctLinkDir = "";
agentLogger.debug("symbolicLinksForCpu", null, "Consuming Process Streams");
while ((line = reader.readLine()) != null) {
agentLogger.info("symbolicLinksForCpu", null, ">>>>" + line);
// groupName = line.trim();
String parts[] = line.split(" ");
if (parts.length > 0 && parts[0].charAt(0) == 'l') { // link
if (isCpuSubmodule(parts, "cpu")) {
cpuLinkDir = parts[parts.length - 1];
} else if (isCpuSubmodule(parts, "cpuacct")) {
cpuacctLinkDir = parts[parts.length - 1];
}
// check if we got what we were looking for. If so, no need to iterate more
if (cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0) {
break;
}
}
}
agentLogger.debug("symbolicLinksForCpu", null, "Waiting for Process to Exit");
int retCode = process.waitFor();
agentLogger.info("symbolicLinksForCpu", null, "Pocess Exit Code=" + retCode);
if (cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0) {
if (cpuLinkDir.trim().equals(cpuLinkDir.trim())) {
return true; // both cpu and cpuacct link to the same dir
}
}
} catch (Exception e) {
agentLogger.error("symbolicLinksForCpu", null, e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
}
}
}
return false;
}
public void configure(NodeAgent agent) {
if (agent != null) {
if (agent.configurationFactory.maxRetryCount != null) {
retryMax = Integer.valueOf(agent.configurationFactory.maxRetryCount);
}
if (agent.configurationFactory.retryDelayFactor != null) {
delayFactor = Integer.valueOf(agent.configurationFactory.retryDelayFactor);
}
}
}
public Validator validator(String cgroupsBaseDir, String containerId, String userName,
boolean useDuccling) throws Exception {
return new Validator(this, getCGroupLocation("memory"), containerId, userName,
getUserGroupName(userName), useDuccling);
}
public String[] getPidsInCgroup(String cgroupName) throws Exception {
// File f = new File(cgroupBaseDir + CGDuccMemoryPath + cgroupName + CGProcsFile);
File f = new File(getCGroupLocation(CGDuccMemoryPath) + cgroupName + CGProcsFile);
// collect all pids
return readPids(f);
}
private String[] readPids(File f) throws Exception {
List<String> pids = new ArrayList<String>();
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
pids.add(line.trim());
}
br.close();
return pids.toArray(new String[pids.size()]);
}
/**
* Finds all stale CGroups and cleans them up. The code only cleans up cgroups folders with names
* that follow ducc's cgroup naming convention: <id>.<id>.<id>. First, each cgroup is checked for
* still running processes in the cgroup by looking at /<cgroup base dir>/<id>/cgroup.proc file
* which includes PIDs of processes associated with the cgroups. If processes are found, each one
* is killed via -9 and the cgroup is removed.
*
* @throws Exception
*/
public void cleanup() throws Exception {
Set<NodeProcessInfo> processes = getProcessesOnNode();
// Match any folder under /cgroup/ducc that has syntax
// <number>.<number>.<number>
// This syntax is assigned by ducc to each cgroup
Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
File cgroupsFolder = new File(getCGroupLocation(CGDuccMemoryPath));
String[] files = cgroupsFolder.list();
if (files == null || files.length == 0) {
return;
}
for (String cgroupFolder : files) {
Matcher m = p.matcher(cgroupFolder);
// only look at ducc's cgroups
if (m.find()) {
try {
// open proc file which may include PIDs if processes are
// still running
File f = new File(getCGroupLocation(CGDuccMemoryPath) + cgroupFolder + CGProcsFile);
// collect all pids
String[] pids = readPids(f);
if (pids != null && pids.length > 0) {
agentLogger.info("cleanupOnStartup", null, "Agent found " + pids.length
+ " cgroup proceses still active. Proceeding to remove running processes");
}
int zombieCount = 0;
// kill each runnig process via -9
if (pids != null && pids.length > 0) {
for (String pid : pids) {
// Got cgroup processes still running. Kill them
for (NodeProcessInfo proc : processes) {
// Dont kill zombie process as it is already dead. Just increment how many of them
// we have
if (proc.isZombie()) {
zombieCount++;
} else if (proc.getPid().equals(pid)) {
// kill process hard via -9
kill(proc.getUserid(), proc.getPid(), NodeAgent.SIGKILL);
}
}
}
long logCount = 0;
// it may take some time for the cgroups to udate accounting. Just cycle until
// the procs file becomes empty under a given cgroup
while (true) {
pids = readPids(f);
// if the cgroup contains no pids or there are only zombie processes dont wait
// for cgroup accounting. These processes will never terminate. The idea
// is not to enter into an infinite loop due to zombies
if (pids == null || pids.length == 0 || (zombieCount == pids.length)) {
break;
} else {
try {
synchronized (this) {
// log every ~30 minutes (10000 * 200), where 200 is a wait time in ms between
// tries
if (logCount % 10000 == 0) {
agentLogger.info("cleanupOnStartup", null, "--- CGroup:" + cgroupFolder
+ " procs file still showing processes running. Wait until CGroups updates acccounting");
}
logCount++;
wait(200);
}
} catch (InterruptedException ee) {
break;
}
}
}
}
// Don't remove CGroups if there are zombie processes there. Otherwise, attempt
// to remove the CGroup may hang a thread.
if (zombieCount == 0) { // no zombies in the container
destroyContainer(cgroupFolder, SYSTEM, NodeAgent.SIGTERM);
agentLogger.info("cleanupOnStartup", null,
"--- Agent Removed Empty CGroup:" + cgroupFolder);
} else {
agentLogger.info("cleanupOnStartup", null, "CGroup " + cgroupFolder
+ " Contains Zombie Processing. Not Removing the Container");
}
} catch (FileNotFoundException e) {
// noop. Cgroup may have been removed already
} catch (Exception e) {
agentLogger.error("cleanupOnStartup", null, e);
}
}
}
}
private boolean isTargetForKill(Set<String> targets, String pid) {
Iterator<String> it = targets.iterator();
while (it.hasNext()) {
if (pid.equals(it.next())) {
return true;
}
}
return false;
}
/**
* Finds all stale CGroups and cleans them up. The code only cleans up cgroups folders with names
* that follow ducc's cgroup naming convention: <id>.<id>.<id>. First, each cgroup is checked for
* still running processes in the cgroup by looking at /<cgroup base dir>/<id>/cgroup.proc file
* which includes PIDs of processes associated with the cgroups. If processes are found, each one
* is killed via -9 and the cgroup is removed.
*
* @throws Exception
*/
public void cleanupPids(Set<String> pidsToKill) throws Exception {
Set<NodeProcessInfo> processes = getProcessesOnNode();
// Match any folder under /cgroup/ducc that has syntax
// <number>.<number>.<number>
// This syntax is assigned by ducc to each cgroup
Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
File cgroupsFolder = new File(getCGroupLocation(CGDuccMemoryPath));
String[] files = cgroupsFolder.list();
if (files == null || files.length == 0) {
return;
}
for (String cgroupFolder : files) {
Matcher m = p.matcher(cgroupFolder);
// only look at ducc's cgroups
if (m.find()) {
try {
// open proc file which may include PIDs if processes are
// still running
File f = new File(getCGroupLocation(CGDuccMemoryPath) + cgroupFolder + CGProcsFile);
// collect all pids
String[] pids = readPids(f);
if (pids != null && pids.length > 0) {
agentLogger.info("cleanupOnStartup", null, "Agent found " + pids.length
+ " cgroup proceses still active. Proceeding to remove running processes");
}
int zombieCount = 0;
// kill each runnig process via -9
if (pids != null && pids.length > 0) {
for (String pid : pids) {
if (!isTargetForKill(pidsToKill, pid)) {
continue;
}
// Got cgroup processes still running. Kill them
for (NodeProcessInfo proc : processes) {
// Dont kill zombie process as it is already dead. Just increment how many of them
// we have
if (proc.isZombie()) {
zombieCount++;
} else if (proc.getPid().equals(pid)) {
// kill process hard via -9
System.out.println(">>>>>> Killing target process " + proc.getPid());
kill(proc.getUserid(), proc.getPid(), NodeAgent.SIGKILL);
}
}
}
long logCount = 0;
// it may take some time for the cgroups to udate accounting. Just cycle until
// the procs file becomes empty under a given cgroup
while (true) {
boolean found = false;
pids = readPids(f);
for (String pid : pids) {
if (isTargetForKill(pidsToKill, pid)) {
found = true;
break; // at least one process from the target list is still running
}
}
// if the cgroup contains no pids or there are only zombie processes dont wait
// for cgroup accounting. These processes will never terminate. The idea
// is not to enter into an infinite loop due to zombies
if (!found || pids == null || pids.length == 0 || (zombieCount == pids.length)) {
break;
} else {
try {
synchronized (this) {
// log every ~30 minutes (10000 * 200), where 200 is a wait time in ms between
// tries
if (logCount % 10000 == 0) {
agentLogger.info("cleanupOnStartup", null, "--- CGroup:" + cgroupFolder
+ " procs file still showing processes running. Wait until CGroups updates acccounting");
}
logCount++;
wait(200);
}
} catch (InterruptedException ee) {
break;
}
}
}
}
// Don't remove CGroups if there are zombie processes there. Otherwise, attempt
// to remove the CGroup may hang a thread.
if (zombieCount == 0) { // no zombies in the container
destroyContainer(cgroupFolder, SYSTEM, NodeAgent.SIGTERM);
agentLogger.info("cleanupOnStartup", null,
"--- Agent Removed Empty CGroup:" + cgroupFolder);
} else {
agentLogger.info("cleanupOnStartup", null, "CGroup " + cgroupFolder
+ " Contains Zombie Processing. Not Removing the Container");
}
} catch (FileNotFoundException e) {
// noop. Cgroup may have been removed already
} catch (Exception e) {
agentLogger.error("cleanupOnStartup", null, e);
}
}
}
}
public boolean isPidInCGroup(String pid) throws Exception {
String[] pids = getAllCGroupPids();
for (String p : pids) {
if (p.equals(pid)) {
return true;
}
}
return false;
}
/**
* Returns an array of PIDs managed by cgroups.
*
* @return - String array of PIDs
* @throws Exception
*/
public String[] getAllCGroupPids() throws Exception {
List<String> cgroupPids = new ArrayList<String>();
// Match any folder under <cgroup base dir> that has syntax
// <number>.<number>.<number>
// This syntax is assigned by ducc to each cgroup
Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
File cgroupsFolder = new File(getCGroupLocation(CGDuccMemoryPath));
String[] files = cgroupsFolder.list();
if (files == null || files.length == 0) {
return new String[0]; // empty better than NULL
}
for (String cgroupFolder : files) {
Matcher m = p.matcher(cgroupFolder);
// only look at ducc's cgroups
if (m.find()) {
try {
// open proc file which may include PIDs if processes are
// still running
File f = new File(getCGroupLocation(CGDuccMemoryPath) + cgroupFolder + CGProcsFile);
// collect all pids
String[] pids = readPids(f);
for (String pid : pids) {
cgroupPids.add(pid);
}
} catch (Exception e) {
agentLogger.error("getAllCGroupPids", null, e);
throw e;
}
}
}
String[] pids = new String[cgroupPids.size()];
return cgroupPids.toArray(pids);
}
public void kill(final String user, final String pid, final int signal) {
final String methodName = "kill";
String c_launcher_path = "";
try {
boolean useDuccling = false;
String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
useDuccling = true;
c_launcher_path = Utils.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
}
String cmdLine;
String arg;
if (Utils.isWindows()) {
cmdLine = "taskkill";
arg = "/PID";
} else {
cmdLine = "/bin/kill";
arg = "-" + signal;
}
String[] command;
if (useDuccling) {
command = new String[] { c_launcher_path, "-u", user, "--", cmdLine, arg, pid };
} else {
command = new String[] { cmdLine, arg, pid };
}
launchCommand(command);
StringBuffer sb = new StringBuffer();
for (String part : command) {
sb.append(part).append(" ");
}
if (agentLogger == null) {
System.out.println("--------- Killed Process:" + pid + " Owned by:" + user + " Command:"
+ sb.toString());
} else {
agentLogger.info(methodName, null, "--------- Killed CGroup Process:" + pid + " Owned by:"
+ user + " Command:" + sb.toString());
}
} catch (Exception e) {
agentLogger.error(methodName, null, e);
}
}
public String getContainerId(ManagedProcess managedProcess) {
String containerId;
if (managedProcess.getDuccProcess().getProcessType().equals(ProcessType.Service)) {
containerId = String.valueOf(managedProcess.getDuccProcess().getCGroup().getId());
} else {
containerId = managedProcess.getWorkDuccId().getFriendly() + "."
+ managedProcess.getDuccProcess().getCGroup().getId();
}
return containerId;
}
public String getDuccUid() {
return SYSTEM;
}
/**
* Creates cgroup container with a given id and owner.
*
* @param containerId
* - new cgroup container id
* @param userId
* - owner of the cgroup container
* @param useDuccSpawn
* - use duccling to run 'cgcreate' command
*
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean createContainer(String containerId, String userName, String groupName,
boolean useDuccSpawn) throws Exception {
String message = "";
agentLogger.info("createContainer", null, "Creating CGroup Container:" + containerId);
String[] command = new String[] { cgroupUtilsDir + "/cgcreate", "-t",
userName + ":" + groupName, "-a", userName + ":" + groupName, "-g",
cgroupSubsystems + ":" + SYSTEM + "/" + containerId };
int retCode = launchCommand(command);
// first fetch the location of cgroups on this system. If cgroups is configured
// with newer cgconfig add 'memory' to the base dir
if (cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {
// Starting with libcgroup v.0.38, the cgcreate fails
// with exit code = 96 even though the cgroup gets
// created! The following code treats such return code
// as success. In case there is an error, subsequent
// cgset or cgexec will fail.
if (retCode == 0 || retCode == 96) {
containerIds.add(containerId);
agentLogger.info("createContainer", null, ">>>>" + "SUCCESS - Created CGroup Container:"
+ containerId + ". The cgcreate return code:" + retCode);
return true;
}
} else {
message = ">>> CGroup Container:" + containerId + " not found in "
+ getCGroupLocation(CGDuccMemoryPath) + containerId;
}
agentLogger.error("createContainer", null, message);
System.out.println(message);
return false;
}
private String composeCpuAccountingFileName(String id) {
String location = getCGroupLocation("cpuacct").trim();
if (!location.endsWith(System.getProperty("file.separator"))) {
location = location + System.getProperty("file.separator");
}
if (!legacyCgConfig) {
location += SYSTEM + System.getProperty("file.separator");
}
return location + id + "cpuacct.usage";
}
private String composeMemoryStatFileName(String id) {
String location = getCGroupLocation("memory").trim();
if (!location.endsWith(System.getProperty("file.separator"))) {
location = location + System.getProperty("file.separator");
}
if (!legacyCgConfig) {
location += SYSTEM + System.getProperty("file.separator");
}
return location + id + "memory.stat";
}
public boolean isCpuReportingEnabled() {
// String file =
// getCGroupLocation("cpuacct")+System.getProperty("file.separator")+"cpuacct.usage";
File f = new File(composeCpuAccountingFileName(""));
System.out.println(f.getAbsolutePath());
return f.exists();
}
public long getCpuUsage(String containerId) throws Exception {
long usage = 0;
if (!containerId.endsWith(System.getProperty("file.separator"))) {
containerId = containerId + System.getProperty("file.separator");
}
String file = composeCpuAccountingFileName(containerId.trim());
agentLogger.debug("getCpuUsage", null, "CPUACCT.USAGE file:" + file);
File f = new File(file);
if (f.exists()) {
InputStreamReader isr = new InputStreamReader(new FileInputStream(f));
BufferedReader br = new BufferedReader(isr);
String line;
try {
while ((line = br.readLine()) != null) {
agentLogger.trace("getCpuUsage", null, "CPUACCT.USAGE Line:" + line);
usage = Long.parseLong(line.trim());
break;
}
} catch (Exception e) {
agentLogger.error("getCpuUsage", null, e);
} finally {
if (isr != null) {
isr.close();
}
agentLogger.trace("getCpuUsage", null, "Done Reading cpuacct.stat file:" + file);
}
} else {
agentLogger.info("getCpuUsage", null,
"CPUACCT.USAGE file:" + file + " Not Found - Process CPU Usage is Unavailable");
usage = -1; // cgroups accounting not configured
}
return usage;
}
public enum CgroupMemoryStat {
RSS("rss"), SWAP("swap"), FAULTS("pgpgin");
String key;
private CgroupMemoryStat(String aKey) {
this.key = aKey;
}
public String getKey() {
return this.key;
}
}
public long getUsageForMemoryStat(CgroupMemoryStat stat, String containerId) throws Exception {
long usage = -1;
if (!containerId.endsWith(System.getProperty("file.separator"))) {
containerId = containerId + System.getProperty("file.separator");
}
String file = composeMemoryStatFileName(containerId.trim());
agentLogger.debug("getUsageForMemoryStat", null, "MEMORY.STAT file:" + file);
File f = new File(file);
if (f.exists()) {
InputStreamReader isr = new InputStreamReader(new FileInputStream(f));
BufferedReader br = new BufferedReader(isr);
String line;
try {
while ((line = br.readLine()) != null) {
agentLogger.debug("getUsageForMemoryStat", null, "MEMORY.STAT Line:" + line);
if (line.startsWith(stat.getKey())) {
usage = Long.parseLong(line.trim().split(" ")[1]);
break;
}
}
} catch (Exception e) {
agentLogger.error("getUsageForMemoryStat", null, e);
} finally {
if (isr != null) {
isr.close();
}
agentLogger.debug("getUsageForMemoryStat", null, "Done Reading memory.stat file:" + file);
}
} else {
agentLogger.debug("getUsageForMemoryStat", null,
"MEMORY.STAT file:" + file + " Not Found - Process RSS Usage is Unavailable");
usage = -1; // cgroups accounting not configured
}
return usage;
}
/**
* Sets the max memory use for an existing cgroup container.
*
* @param containerId
* - existing container id for which limit will be set
* @param userId
* - container owner
* @param useDuccSpawn
* - run 'cgset' command as a user
* @param containerMaxSize
* - max memory limit
*
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean setContainerMaxMemoryLimit(String containerId, String userId, boolean useDuccSpawn,
long containerMaxSize) throws Exception {
try {
String[] command = new String[] { cgroupUtilsDir + "/cgset", "-r",
"memory.limit_in_bytes=" + containerMaxSize, SYSTEM + "/" + containerId };
int retCode = launchCommand(command);
if (retCode == 0) {
agentLogger.debug("setContainerMaxMemoryLimit", null,
">>>>" + "SUCCESS - Created CGroup Limit on Container:" + containerId);
return true;
} else {
agentLogger.info("setContainerMaxMemoryLimit", null,
">>>>" + "FAILURE - Unable To Create CGroup Container:" + containerId);
return false;
}
} catch (Exception e) {
agentLogger.error("setContainerMaxMemoryLimit", null,
">>>>" + "FAILURE - Unable To Set Limit On CGroup Container:" + containerId, e);
return false;
}
}
/**
* Sets the cpu shares use for an existing cgroup container.
*
* @param containerId
* - existing container id for which limit will be set
* @param userId
* - container owner
* @param useDuccSpawn
* - run 'cgset' command as a user
* @param containerCpuShares
* - cpu shares
*
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean setContainerCpuShares(String containerId, String userId, boolean useDuccSpawn,
long containerCpuShares) throws Exception {
try {
String[] command = new String[] { cgroupUtilsDir + "/cgset", "-r",
"cpu.shares=" + containerCpuShares, SYSTEM + "/" + containerId };
int retCode = launchCommand(command);
if (retCode == 0) {
agentLogger.debug("setContainerCpuShares", null,
">>>>" + "SUCCESS - Created CGroup with CPU Shares=" + containerCpuShares
+ " on Container:" + containerId);
return true;
} else {
agentLogger.info("setContainerCpuShares", null,
">>>>" + "FAILURE - Unable To Set CPU shares on CGroup Container:" + containerId);
return false;
}
} catch (Exception e) {
agentLogger.error("setContainerCpuShares", null,
">>>>" + "FAILURE - Unable To Set CPU shares On CGroup Container:" + containerId, e);
return false;
}
}
/**
* Sets the memory swappiness for an existing cgroup container.
*
* @param containerId
* - existing container id for which limit will be set
* @param userId
* - container owner
* @param useDuccSpawn
* - run 'cgset' command as a user
* @param swappiness
* - swappiness
*
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean setContainerSwappiness(String containerId, String userId, boolean useDuccSpawn,
long swappiness) throws Exception {
try {
String[] command = new String[] { cgroupUtilsDir + "/cgset", "-r",
"memory.swappiness=" + swappiness, SYSTEM + "/" + containerId };
int retCode = launchCommand(command);
if (retCode == 0) {
agentLogger.info("setContainerSwappiness", null,
">>>>" + "SUCCESS - Updated CGroup with Memory Swappiness=" + swappiness
+ " on Container:" + containerId);
return true;
} else {
agentLogger.info("setContainerSwappiness", null,
">>>>" + "FAILURE - Unable To Set Swappiness on CGroup Container:" + containerId);
return false;
}
} catch (Exception e) {
agentLogger.error("setContainerSwappiness", null,
">>>>" + "FAILURE - Unable To Set Swappiness On CGroup Container:" + containerId, e);
return false;
}
}
private int killChildProcesses(String containerId, String userId, int signal) throws Exception {
int childCount = 0;
String[] pids = getPidsInCgroup(containerId);
if (pids != null) {
if (pids.length > 0) {
childCount = pids.length;
agentLogger.info("killChildProcesses", null, "Found " + pids.length
+ " child processes still in container:" + containerId + " - killing all");
}
for (String pid : pids) {
try {
kill(userId, pid, signal);
} catch (Exception ee) {
agentLogger.warn("killChildProcesses", null, "Unable to kill child process with PID:"
+ pid + " from cgroup:" + containerId + "\n" + ee);
}
}
}
return childCount;
}
private String adjustSubsystems() {
// if cpu and cpuacct are sym linked to the same dir, remove cpuacct part
// from the submodule list as it causes the cgdelete to throw an error.
if (cpuInfoSymlinked && cgroupSubsystems.indexOf(",cpuacct") > -1) {
return cgroupSubsystems.substring(0, cgroupSubsystems.indexOf(",cpuacct"));
/*
* StringBuffer sb = new StringBuffer(); if ( cgroupSubsystems.indexOf("cpuacct") > -1 ) {
* String[] subsystems = cgroupSubsystems.trim().split(","); //StringUtils.j Iterator<String>
* it = Arrays.asList(subsystems).iterator(); if ( it.hasNext()) { while(it.hasNext() ) {
* String subsystem = it.next(); if ( !"cpuacct".equals(subsystem)) { sb.append(subsystem); if
* ( it.hasNext()) { sb.append(","); } } } } return sb.toString(); }
*/
}
return cgroupSubsystems;
}
/**
* Removes cgroup container with a given id. Cgroups are implemented as a virtual file system. All
* is needed here is just rmdir.
*
* @param containerId
* - cgroup to remove
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean destroyContainer(String containerId, String userId, int signal) throws Exception {
try {
if (cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {
if (signal == NodeAgent.SIGTERM) {
agentLogger.info("destroyContainer", null,
"Destroying Container " + containerId + " Using signal:" + signal
+ " to kill child processes if any still exist in cgroups container");
// before removing cgroup container, make sure to kill
// all processes that still may be there. User process
// may have created child processes that may still be running.
// First use kill -15, than wait and any process still standing
// will be killed hard via kill -9
int childProcessCount = killChildProcesses(containerId, userId, NodeAgent.SIGTERM);
if (childProcessCount > 0) {
agentLogger.info("destroyContainer", null,
"Killed " + childProcessCount + "Child Processes with kill -15");
synchronized (waitForLockObject) {
try {
waitForLockObject.wait(maxTimeToWaitForProcessToStop);
} catch (InterruptedException ie) {
}
}
}
}
// Any process remaining in a cgroup will be killed hard
killChildProcesses(containerId, userId, NodeAgent.SIGKILL);
// String subsystems =cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );
String subsystems = adjustSubsystems();
String[] command = new String[] { cgroupUtilsDir + "/cgdelete",
subsystems + ":" + SYSTEM + "/" + containerId };
int retCode = launchCommand(command);
if (cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {
agentLogger.info("destroyContainer", null, "Failed to remove Container " + containerId
+ " Using cgdelete command. Exit code:" + retCode);
return false;
} else {
containerIds.remove(containerId);
return true;
}
}
return true; // nothing to do, cgroup does not exist
} catch (Exception e) {
agentLogger.info("destroyContainer", null, e);
return false;
}
}
public String getUserGroupName(String userName) throws Exception {
String groupName = "";
InputStreamReader isr = null;
BufferedReader reader = null;
try {
String cmd[] = { "/usr/bin/id", "-g", "-n", userName };
;// System.getProperty("user.name")};
StringBuffer sb = new StringBuffer();
for (String s : cmd) {
sb.append(s).append(" ");
}
agentLogger.info("getuserGroupName", null,
"Launching Process - Commandline:" + sb.toString());
ProcessBuilder processLauncher = new ProcessBuilder();
processLauncher.command(cmd);
processLauncher.redirectErrorStream(true);
java.lang.Process process = processLauncher.start();
isr = new InputStreamReader(process.getInputStream());
reader = new BufferedReader(isr);
String line;
agentLogger.info("getUserGroupName", null, "Consuming Process Streams");
while ((line = reader.readLine()) != null) {
agentLogger.info("getUserGroupName", null, ">>>>" + line);
System.out.println(line);
groupName = line.trim();
}
agentLogger.info("getUserGroupName", null, "Waiting for Process to Exit");
int retCode = process.waitFor();
agentLogger.info("getUserGroupName", null, "Pocess Exit Code=" + retCode);
} catch (Exception e) {
agentLogger.error("getUserGroupName", null, e);
} finally {
if (reader != null) {
reader.close();
}
}
return groupName;
}
private int launchCommand(String[] command/* , String userId */) throws Exception {
int retryCount = 0;
Object sleepMonitor = new Object();
if (command == null) {
return -1;
}
synchronized (CGroupsManager.class) {
long delay = delayFactor;//
while (retryCount <= retryMax) {
String message = "";
InputStreamReader in = null;
BufferedReader reader = null;
StringBuffer sb = new StringBuffer();
if (command != null) {
for (int i = 0; i < command.length; i++) {
sb.append(command[i]).append(" ");
}
}
try {
agentLogger.info("launchCommand", null,
"Launching Process - Commandline:" + sb.toString());
ProcessBuilder processLauncher = new ProcessBuilder();
processLauncher.command(command);
processLauncher.redirectErrorStream(true);
java.lang.Process process = processLauncher.start();
in = new InputStreamReader(process.getInputStream());
reader = new BufferedReader(in);
String line;
agentLogger.info("launchCommand", null, "Consuming Process Streams");
while ((line = reader.readLine()) != null) {
// per team discussin 6/23/ dont need to log "Operation not permitted"
// which is logged by cgcreate erroneously. The cgroup is actually created
// but cgcreate still dumps this msg to stdout. If we log this, a user
// may get confused. If thercannot remove groupe is a legitimate problem a subsequent
// test
// for existence of cgroup will catch a missing cgroup and report it as
// error.
if (line.indexOf("Operation not permitted") > -1) {
continue; // dont log if the above string is in the stdout stream
} else if (line.indexOf("cannot remove group") > -1) {
continue; // could be false positive. Validation will catch if unable to remove
}
agentLogger.info("launchCommand", null, ">>>>" + line);
System.out.println(line);
}
agentLogger.info("launchCommand", null, "Waiting for Process to Exit");
int retCode = process.waitFor();
// Starting with libcgroup v.0.38, the cgcreate fails
// with exit code = 96 even though the cgroup gets
// created! The following code treats such return code
// as success. In case there is an error, subsequent
// cgset or cgexec will fail.
if (retCode == 0 || retCode == 96) {
System.out.println("--------- Returning Code:" + retCode + " Command:" + sb.toString());
return retCode;
} else {
message = ">>>>" + "FAILURE - return code:" + retCode + " Unable To exec command:"
+ sb.toString() + " Retrying in " + delay + " millis - retry#"
+ (retryCount + 1);
}
} catch (Exception e) {
e.printStackTrace();
message = ">>>>" + "FAILURE - Unable To exec command:" + sb.toString() + " Retrying in "
+ delay + " millis - retry#" + (retryCount + 1);
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception exx) {
}
}
}
if (retryMax == 0) {
agentLogger.error("launchCommand", null,
">>>>" + "Not configured to retry command:" + sb.toString());
break;
}
agentLogger.error("launchCommand", null, message);
System.out.println(message);
try {
synchronized (sleepMonitor) {
sleepMonitor.wait(delay);
}
} catch (InterruptedException ie) {
}
retryCount++;
delay += delayFactor;
} // while
}
return -1; // failure
}
/**
* Return a Set of existing cgroup Ids found in the filesystem identified by 'cgroupBaseDir'.
*
* @return - set of cgroup ids
*
* @throws Exception
*/
public Set<String> collectExistingContainers() throws Exception {
// File duccCGroupBaseDir = new File(cgroupBaseDir);
File duccCGroupBaseDir = new File(getCGroupLocation(CGDuccMemoryPath));
if (duccCGroupBaseDir.exists()) {
File[] existingCGroups = duccCGroupBaseDir.listFiles();
if (existingCGroups != null) {
for (File cgroup : existingCGroups) {
if (cgroup.isDirectory()) {
containerIds.add(cgroup.getName());
}
}
}
}
return containerIds;
}
public String getDuccCGroupBaseDir() {
return cgroupBaseDir;
}
// UIMA-5405 Include the installed "ducc" id
public String getSubsystems() {
return cgroupSubsystems + ":" + SYSTEM + "/";
}
public boolean cgroupExists(String cgroup) throws Exception {
File duccCGroupBaseDir = new File(cgroup);
return duccCGroupBaseDir.exists();
}
public Set<NodeProcessInfo> getProcessesOnNode() throws Exception {
String location = "getProcessesOnNode";
Set<NodeProcessInfo> processList = new HashSet<NodeProcessInfo>();
InputStream stream = null;
BufferedReader reader = null;
try {
ProcessBuilder pb = new ProcessBuilder("ps", "-Ao", "user:32,pid,ppid,args,stat",
"--no-heading");
pb.redirectErrorStream(true);
java.lang.Process proc = pb.start();
// spawn ps command and scrape the output
stream = proc.getInputStream();
reader = new BufferedReader(new InputStreamReader(stream));
String line;
String regex = "\\s+";
// read the next line from ps output
while ((line = reader.readLine()) != null) {
String tokens[] = line.split(regex);
String user = tokens[0];
String pid = tokens[1];
String ppid = tokens[2];
String stat = tokens[4];
if (tokens.length > 0) {
processList.add(new NodeProcessInfo(pid, ppid, user, stat));
}
}
} catch (Exception e) {
if (agentLogger == null) {
e.printStackTrace();
} else {
agentLogger.error(location, null, e);
}
} finally {
if (reader != null) {
reader.close();
}
}
return processList;
}
public class NodeProcessInfo {
private String pid;
private String ppid;
private String userid;
private String stat;
NodeProcessInfo(String pid, String ppid, String uid, String stat) {
this.pid = pid;
this.ppid = ppid;
this.userid = uid;
this.stat = stat;
}
public boolean isZombie() {
return (stat == "Z") ? true : false;
}
public String getPid() {
return pid;
}
public String getPpid() {
return ppid;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
}
public class CGroupsException extends RuntimeException {
private static final long serialVersionUID = 1L;
private String command;
private String msg;
public CGroupsException() {
}
public CGroupsException(Exception e) {
super(e);
}
public CGroupsException addCommand(String command) {
this.command = command;
return this;
}
public CGroupsException addMessage(String msg) {
this.msg = msg;
return this;
}
public String getCommand() {
return command;
}
public String getMessage() {
return msg;
}
}
public class Validator {
private CGroupsManager cgmgr = null;
String containerId;
String userName;
String userGroupName;
boolean useDuccling;
String cgroupsBaseDir;
Validator(CGroupsManager instance, String cgroupsBaseDir, String containerId, String uid,
String usergroup, boolean useDuccling) {
cgmgr = instance;
this.containerId = containerId;
this.userName = uid;
this.useDuccling = useDuccling;
this.userGroupName = usergroup;
this.cgroupsBaseDir = cgroupsBaseDir;
}
public Validator cgcreate() throws CGroupsException {
String msg1 = "------- CGroups cgcreate failed to create a cgroup - disabling cgroups";
String msg2 = "------- CGroups cgcreate failed to validate a cgroup - disabling cgroups";
String msg3 = "------- CGroups cgcreate failed - disabling cgroups";
try {
if (!cgmgr.createContainer(containerId, userName, userGroupName, useDuccling)) {
throw new CGroupsException().addCommand(CGroupCommand.CGCREATE.cmd()).addMessage(msg1);
}
// if (!cgmgr.cgroupExists(cgroupsBaseDir + "/memory/ducc/" + containerId)) {
if (!cgmgr.cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {
throw new CGroupsException().addCommand(CGroupCommand.CGCREATE.cmd()).addMessage(msg2);
}
} catch (Exception e) {
throw new CGroupsException(e).addCommand(CGroupCommand.CGCREATE.cmd()).addMessage(msg3);
}
return this;
}
public Validator cgset(long cpuShares) throws CGroupsException {
String msg1 = "------- Check cgconfig.conf CPU control. The cgset failed to set cpu.shares";
String msg2 = "------- Check cgconfig.conf CPU control. The cgset failed to find cpu.shares file";
String msg3 = "------- Check cgconfig.conf CPU control. The cgset failed to write to cpu.shares file. Expected 100 shares found ";
BufferedReader reader = null;
String shares = "";
try {
if (!cgmgr.setContainerCpuShares(containerId, userName, useDuccling, cpuShares)) {
throw new CGroupsException().addCommand(CGroupCommand.CGSET.cmd()).addMessage(msg1);
}
// now try to read created file
// File f = new File(cgroupsBaseDir + "/cpu/ducc/" + "test/cpu.shares");
File f = new File(getCGroupLocation(CGDuccCpuPath) + containerId + "/cpu.shares");
reader = new BufferedReader(new FileReader(f));
// read 1st line. It should be equal to cpuShares
if (reader != null) {
shares = reader.readLine();
if (shares != null) {
shares = shares.trim();
}
}
System.out.println("----- Cgroup cgset verifier - cpu.shares read from file:" + shares);
if (!String.valueOf(cpuShares).equals(shares)) {
throw new CGroupsException().addCommand(CGroupCommand.CGSET.cmd())
.addMessage(msg3 + shares);
}
} catch (FileNotFoundException e) {
// e.printStackTrace();
throw new CGroupsException(e).addCommand(CGroupCommand.CGSET.cmd()).addMessage(msg2);
} catch (Exception e) {
// e.printStackTrace();
throw new CGroupsException(e).addCommand(CGroupCommand.CGSET.cmd())
.addMessage(msg3 + shares);
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception ee) {
}
}
}
return this;
}
}
}