blob: 1c96d9b6d2e784f4fa398f4505969dc6101f73d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.launcher;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
/**
* Manages cgroup container on a node
*
* Supported operations: - cgcreate - creates cgroup container - cgset - sets
* max memory limit for an existing container
*
*
*/
public class CGroupsManager {
private DuccLogger agentLogger = null;
private Set<String> containerIds = new LinkedHashSet<String>();
private String cgroupBaseDir = "";
private String cgroupUtilsDir=null;
private String cgroupSubsystems = ""; // comma separated list of subsystems
// eg. memory,cpu
/**
* @param args
*/
public static void main(String[] args) {
try {
CGroupsManager cgMgr = new CGroupsManager("/usr/bin","/cgroup/ducc", "memory",
null);
System.out.println("Cgroups Installed:"
+ cgMgr.cgroupExists("/cgroup/ducc"));
Set<String> containers = cgMgr.collectExistingContainers();
for (String containerId : containers) {
System.out.println("Existing CGroup Container ID:"
+ containerId);
}
cgMgr.createContainer(args[0], args[2], true);
cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true,
Long.parseLong(args[1]));
synchronized (cgMgr) {
cgMgr.wait(60000);
}
cgMgr.destroyContainer(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
public String getCGroupsUtilsDir( ){
return cgroupUtilsDir;
}
public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String cgroupSubsystems,
DuccLogger agentLogger) {
this.cgroupUtilsDir = cgroupUtilsDir;
this.cgroupBaseDir = cgroupBaseDir;
this.cgroupSubsystems = cgroupSubsystems;
this.agentLogger = agentLogger;
}
public String[] getPidsInCgroup(String cgroupName) throws Exception {
File f = new File(cgroupBaseDir + "/" + cgroupName + "/cgroup.procs");
// collect all pids
return readPids(f);
}
private String[] readPids(File f) throws Exception {
List<String> pids = new ArrayList<String>();
BufferedReader br = new BufferedReader(new FileReader(f));
String line;
while ((line = br.readLine()) != null) {
pids.add(line.trim());
}
br.close();
return pids.toArray(new String[pids.size()]);
}
/**
* Finds all stale CGroups in /cgroup/ducc folder and cleans them
* up. The code only cleans up cgroups folders with names that follow
* ducc's cgroup naming convention: <id>.<id>.<id>.
* First, each cgroup is checked for still running processes in the
* cgroup by looking at /cgroup/ducc/<id>/cgroup.proc file which
* includes PIDs of processes associated with the cgroups. If
* processes are found, each one is killed via -9 and the cgroup
* is removed.
*
* @throws Exception
*/
public void cleanupOnStartup() throws Exception {
Set<NodeProcessInfo> processes = getProcessesOnNode();
// Match any folder under /cgroup/ducc that has syntax
// <number>.<number>.<number>
// This syntax is assigned by ducc to each cgroup
Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
File cgroupsFolder = new File(cgroupBaseDir);
String[] files = cgroupsFolder.list();
for (String cgroupFolder : files) {
Matcher m = p.matcher(cgroupFolder);
// only look at ducc's cgroups
if (m.find()) {
try {
// open proc file which may include PIDs if processes are
// still running
File f = new File(cgroupBaseDir + "/" + cgroupFolder
+ "/cgroup.procs");
// collect all pids
String[] pids = readPids(f);
// kill each runnig process via -9
if (pids != null && pids.length > 0) {
for (String pid : pids) {
// Got cgroup processes still running. Kill them
for (NodeProcessInfo proc : processes) {
if (proc.getPid().equals(pid)) {
kill( proc.getUserid(), proc.getPid());
}
}
}
// it may take some time for the cgroups to udate accounting. Just cycle until
// the procs file becomes empty under a given cgroup
while( true ) {
pids = readPids(f);
if ( pids == null || pids.length == 0) {
break;
} else {
try {
synchronized(this) {
agentLogger.info("cleanupOnStartup", null,
"--- CGroup:" + cgroupFolder+ " procs file still showing processes running. Wait until CGroups updates acccounting");
wait(200);
}
} catch( InterruptedException ee) {}
}
}
}
destroyContainer(cgroupFolder);
agentLogger.info("cleanupOnStartup", null,
"--- Agent Removed Empty CGroup:" + cgroupFolder);
} catch (Exception e) {
agentLogger.error("cleanupOnStartup", null, e);
}
}
}
}
public boolean isPidInCGroup(String pid) throws Exception {
String[] pids = getAllCGroupPids();
for( String p : pids ) {
if ( p.equals(pid)) {
return true;
}
}
return false;
}
/**
* Returns an array of PIDs managed by cgroups.
*
* @return - String array of PIDs
* @throws Exception
*/
public String[] getAllCGroupPids() throws Exception {
List<String> cgroupPids = new ArrayList<String>();
// Match any folder under /cgroup/ducc that has syntax
// <number>.<number>.<number>
// This syntax is assigned by ducc to each cgroup
Pattern p = Pattern.compile("((\\d+)\\.(\\d+)\\.(\\d+))");
File cgroupsFolder = new File(cgroupBaseDir);
String[] files = cgroupsFolder.list();
for (String cgroupFolder : files) {
Matcher m = p.matcher(cgroupFolder);
// only look at ducc's cgroups
if (m.find()) {
try {
// open proc file which may include PIDs if processes are
// still running
File f = new File(cgroupBaseDir + "/" + cgroupFolder
+ "/cgroup.procs");
// collect all pids
String[] pids = readPids(f);
for( String pid : pids ) {
cgroupPids.add(pid);
}
} catch (Exception e) {
agentLogger.error("getAllCGroupPids", null, e);
throw e;
}
}
}
String[] pids = new String[cgroupPids.size()];
return cgroupPids.toArray(pids);
}
public void kill(final String user, final String pid) {
final String methodName = "kill";
InputStream is = null;
BufferedReader reader = null;
try {
String c_launcher_path = Utils.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
System.getProperties());
String cmdLine;
String arg;
boolean useDuccling = false;
if (Utils.isWindows()) {
cmdLine = "taskkill";
arg = "/PID";
} else {
String useSpawn = System
.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
useDuccling = true;
}
cmdLine = "/bin/kill";
arg = "-9";
}
String[] duccling_nolog;
if (useDuccling) {
duccling_nolog = new String[] { c_launcher_path, "-u", user,
"--", cmdLine, arg, pid };
} else {
duccling_nolog = new String[] { cmdLine, arg, pid };
}
// if (kill != null && Boolean.parseBoolean(kill) == true) {
ProcessBuilder pb = new ProcessBuilder(duccling_nolog);
pb.redirectErrorStream(true);
java.lang.Process killedProcess = pb.start();
is = killedProcess.getInputStream();
reader = new BufferedReader(
new InputStreamReader(is));
// String line = null;
// read the next line from kill command
while (reader.readLine() != null) {
// dont care about the output, just drain the buffers
}
is.close();
StringBuffer sb = new StringBuffer();
for (String part : duccling_nolog) {
sb.append(part).append(" ");
}
if (agentLogger == null) {
System.out.println("--------- Killed Process:" + pid
+ " Owned by:" + user + " Command:" + sb.toString());
} else {
agentLogger.info(methodName, null,
"--------- Killed CGroup Process:" + pid + " Owned by:" + user
+ " Command:" + sb.toString());
}
} catch (Exception e) {
agentLogger.error(methodName, null,e );
} finally {
if ( reader != null ) {
try {
reader.close();
} catch( Exception e) {}
}
}
}
public String getContainerId(ManagedProcess managedProcess) {
String containerId;
if ( managedProcess.getDuccProcess().getProcessType().equals(ProcessType.Service)) {
containerId = String.valueOf(managedProcess.getDuccProcess().getCGroup().getId());
} else {
containerId = managedProcess.getWorkDuccId().getFriendly()+"."+managedProcess.getDuccProcess().getCGroup().getId();
}
return containerId;
}
/**
* Creates cgroup container with a given id and owner.
*
* @param containerId
* - new cgroup container id
* @param userId
* - owner of the cgroup container
* @param useDuccSpawn
* - use duccling to run 'cgcreate' command
*
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean createContainer(String containerId, String userId,
boolean useDuccSpawn) throws Exception {
try {
String[] command = new String[] { cgroupUtilsDir+"/cgcreate", "-t",
"ducc", "-a", "ducc", "-g",
cgroupSubsystems + ":ducc/" + containerId };
int retCode = launchCommand(command, useDuccSpawn, "ducc",
containerId);
if (retCode == 0) {
containerIds.add(containerId);
agentLogger.info("createContainer", null, ">>>>"
+ "SUCCESS - Created CGroup Container:" + containerId);
return true;
} else {
agentLogger.info("createContainer", null, ">>>>"
+ "FAILURE - Unable To Create CGroup Container:"
+ containerId);
return false;
}
} catch (Exception e) {
agentLogger.error("createContainer", null, ">>>>"
+ "FAILURE - Unable To Create CGroup Container:"
+ containerId, e);
return false;
}
}
/**
* Sets the max memory use for an existing cgroup container.
*
* @param containerId
* - existing container id for which limit will be set
* @param userId
* - container owner
* @param useDuccSpawn
* - run 'cgset' command as a user
* @param containerMaxSize
* - max memory limit
*
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean setContainerMaxMemoryLimit(String containerId,
String userId, boolean useDuccSpawn, long containerMaxSize)
throws Exception {
try {
///usr/bin
String[] command = new String[] { cgroupUtilsDir+"/cgset", "-r",
"memory.limit_in_bytes=" + containerMaxSize,
"ducc/" + containerId };
int retCode = launchCommand(command, useDuccSpawn, "ducc",
containerId);
if (retCode == 0) {
agentLogger.info("setContainerMaxMemoryLimit", null, ">>>>"
+ "SUCCESS - Created CGroup Limit on Container:"
+ containerId);
return true;
} else {
agentLogger.info("setContainerMaxMemoryLimit", null, ">>>>"
+ "FAILURE - Unable To Create CGroup Container:"
+ containerId);
return false;
}
} catch (Exception e) {
agentLogger.error("setContainerMaxMemoryLimit", null, ">>>>"
+ "FAILURE - Unable To Set Limit On CGroup Container:"
+ containerId, e);
return false;
}
}
/**
* Removes cgroup container with a given id. Cgroups are implemented as a
* virtual file system. All is needed here is just rmdir.
*
* @param containerId
* - cgroup to remove
* @return - true on success, false otherwise
*
* @throws Exception
*/
public boolean destroyContainer(String containerId) throws Exception {
try {
if (cgroupExists(cgroupBaseDir + "/" + containerId)) {
String[] command = new String[] { "/bin/rmdir",
cgroupBaseDir + "/" + containerId };
int retCode = launchCommand(command, false, "ducc", containerId);
if (retCode == 0) {
containerIds.remove(containerId);
return true;
} else {
return false;
}
}
return true; // nothing to do, cgroup does not exist
} catch (Exception e) {
return false;
}
}
private int launchCommand(String[] command, boolean useDuccSpawn,
String userId, String containerId) throws Exception {
String[] commandLine = null;
InputStreamReader in = null;
BufferedReader reader = null;
try {
//
// Use ducc_ling (c code) as a launcher for the actual process. The
// ducc_ling
// allows the process to run as a specified user in order to write
// out logs in
// user's space as oppose to ducc space.
String c_launcher_path = Utils.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
System.getProperties());
if (useDuccSpawn && c_launcher_path != null) {
commandLine = new String[4 + command.length];
commandLine[0] = c_launcher_path;
commandLine[1] = "-u";
commandLine[2] = userId;
commandLine[3] = "--";
int j = 0;
for (int i = 4; i < commandLine.length; i++) {
commandLine[i] = command[j++];
}
} else {
commandLine = command;
}
ProcessBuilder processLauncher = new ProcessBuilder();
processLauncher.command(commandLine);
processLauncher.redirectErrorStream();
java.lang.Process process = processLauncher.start();
in = new InputStreamReader(
process.getInputStream());
reader = new BufferedReader(in);
String line;
while ((line = reader.readLine()) != null) {
agentLogger.info("launchCommand", null, ">>>>" + line);
}
int retCode = process.waitFor();
return retCode;
} catch (Exception e) {
StringBuffer sb = new StringBuffer();
if (commandLine != null) {
for (String cmdPart : commandLine) {
sb.append(cmdPart).append(" ");
}
}
if (agentLogger != null) {
agentLogger.error("launchCommand", null,
"Unable to Launch Command:" + sb.toString(), e);
} else {
System.out
.println("CGroupsManager.launchCommand()- Unable to Launch Command:"
+ sb.toString());
e.printStackTrace();
}
} finally {
if ( reader != null ) {
try {
reader.close();
} catch( Exception exx) {}
}
}
return -1; // failure
}
/**
* Return a Set of existing cgroup Ids found in the filesystem identified by
* 'cgroupBaseDir'.
*
* @return - set of cgroup ids
*
* @throws Exception
*/
public Set<String> collectExistingContainers() throws Exception {
File duccCGroupBaseDir = new File(cgroupBaseDir);
if (duccCGroupBaseDir.exists()) {
File[] existingCGroups = duccCGroupBaseDir.listFiles();
for (File cgroup : existingCGroups) {
if (cgroup.isDirectory()) {
containerIds.add(cgroup.getName());
}
}
}
return containerIds;
}
public String getDuccCGroupBaseDir() {
return cgroupBaseDir;
}
public String getSubsystems() {
return cgroupSubsystems;
}
public boolean cgroupExists(String cgroup) throws Exception {
File duccCGroupBaseDir = new File(cgroup);
return duccCGroupBaseDir.exists();
}
public Set<NodeProcessInfo> getProcessesOnNode() throws Exception {
String location = "getProcessesOnNode";
Set<NodeProcessInfo> processList = new HashSet<NodeProcessInfo>();
InputStream stream = null;
BufferedReader reader = null;
try {
ProcessBuilder pb = new ProcessBuilder("ps", "-Ao",
"user:12,pid,ppid,args", "--no-heading");
pb.redirectErrorStream(true);
java.lang.Process proc = pb.start();
// spawn ps command and scrape the output
stream = proc.getInputStream();
reader = new BufferedReader(new InputStreamReader(
stream));
String line;
String regex = "\\s+";
// read the next line from ps output
while ((line = reader.readLine()) != null) {
String tokens[] = line.split(regex);
String user = tokens[0];
String pid = tokens[1];
String ppid = tokens[2];
if (tokens.length > 0) {
processList.add(new NodeProcessInfo(pid, ppid, user));
}
}
} catch (Exception e) {
if (agentLogger == null) {
e.printStackTrace();
} else {
agentLogger.error(location, null, e);
}
} finally {
if ( reader != null ) {
reader.close();
}
}
return processList;
}
public class NodeProcessInfo {
private String pid;
private String ppid;
private String userid;
NodeProcessInfo(String pid, String ppid, String uid) {
this.pid = pid;
this.ppid = ppid;
userid = uid;
}
public String getPid() {
return pid;
}
public String getPpid() {
return ppid;
}
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
}
}