blob: 6f43928266df31328dc54c2b50ad8e1e432fa0bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.metrics.collectors;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.uima.ducc.agent.Agent;
import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo.NodeProcess;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.common.utils.id.IDuccId;
/**
* Spawns "ps -ef --no-heading" cmd and scrapes the output to collect user processes.
*
* Detects and filters out Ducc daemon processes and AMQ broker.
*
* Detects rogue processes which are processes that are not associated with any job
* and not belonging to a user node reservation.
*
*/
public class NodeUsersCollector implements CallableNodeUsersCollector {
DuccLogger logger;
Agent agent;
int gidMax = 500;
public NodeUsersCollector(Agent agent, DuccLogger logger) {
this.agent = agent;
this.logger = logger;
String tmp;
// SYSTEM_GID_MAX
if ((tmp = System
.getProperty("ducc.agent.node.metrics.sys.gid.max")) != null) {
try {
gidMax = Integer.valueOf(tmp);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
}
/**
* Returns true if a given userId belongs to an exclusion list defined in ducc.properties.
* This list contains user IDs the Agent should exclude while determining rogue processes.
* System owned (root, nfs, etc) processes should not be reported as rogue.
*
* @param userId
* @return boolean - true if a given user should be excluded. False otherwise
*/
public boolean excludeUser(String userId ) {
String userFilter =
System.getProperty("ducc.agent.rogue.process.user.exclusion.filter");
if ( userFilter != null) {
// exclusion list contains comma separated user ids
String[] excludedUsers = userFilter.split(",");
for ( String excludedUser : excludedUsers ) {
if ( excludedUser.equals(userId)) {
return true;
}
}
}
return false;
}
/**
* Returns true if a given process belongs to an exclusion list defined in ducc.properties.
* This list contains process names the Agent should exclude while determining rogue processes.
*
* @param process
* @return boolean - true if a given process should be excluded. False otherwise
*/
public boolean excludeProcess(String process ) {
String processFilter =
System.getProperty("ducc.agent.rogue.process.exclusion.filter");
if ( processFilter != null ) {
// exclusion list contains comma separated user ids
String[] excludedProcesses = processFilter.split(",");
for ( String excludedProcess : excludedProcesses ) {
if ( excludedProcess.equals(process)) {
return true;
}
}
}
return false;
}
private void aggregate( Set<NodeUsersCollector.ProcessInfo> processList, NodeUsersCollector.ProcessInfo cpi ) {
boolean added = false;
for( NodeUsersCollector.ProcessInfo pi: processList ) {
// PIDs below are ints so safe to use ==
if ( pi.getPid() == cpi.getPPid() ) { // is the current process a child of another Process?
pi.getChildren().add(cpi); // add current process as a child
added = true;
if ( pi.isRogue() ) { // if parent is rogue, a child is rogue as well
cpi.setRogue(true);
break;
}
} else if ( pi.getChildren().size() > 0 ) {
for(NodeUsersCollector.ProcessInfo childpi : pi.getChildren() ) {
// is the current process a child of another Process?
if ( childpi.getPid() == cpi.getPPid() ) {
added = true; // dont add the child here as it will cause ConcurrentModificationException
// just mark it for inclusion in a child list below
if ( childpi.isRogue() ) { // if parent is rogue, a child is rogue as well
cpi.setRogue(true);
}
break; // stop iterating over children
}
}
}
if ( added ) {
pi.getChildren().add(cpi); // add current process as a child
if ( logger == null ) {
//System.out.println("********* Adding Child Process With PID:"+cpi.getPid()+ " As Child of Process:"+cpi.getPPid());
} else {
logger.info("aggregate", null, "********* Adding Child Process With PID:"+cpi.getPid()+ " As Child of Process:"+cpi.getPPid());
}
break;
}
}
// not added to the list in the code above
if ( !added ) {
processList.add(cpi);
if ( logger == null ) {
//System.out.println("********* Adding Process With PID:"+cpi.getPid()+ " NO PARENT");
} else {
logger.info("aggregate", null, "********* Adding Process With PID:"+cpi.getPid()+ " NO PARENT");
}
}
}
private boolean duccDaemon(String[] tokens) {
String location = "duccDaemon";
for( String token : tokens ) {
if ( token.startsWith("-Dducc.deploy.components")) {
int pos = token.indexOf("=");
if ( pos > -1 ) {
String component = token.substring(pos+1);
// if a process is uima-as service need to check if this is a rogue process
if ( component.trim().startsWith("uima-as")) {
break; // will check if rogue process below
}
}
if ( logger == null ) {
// System.out.println(
// "********** Process with PID:"+tokens[1]+ " Is a Ducc Daemon:"+token+". Skipping....");
} else {
logger.trace(location, null, "********** Process with PID:"+tokens[1]+ " Is a Ducc Daemon:"+token+". Skipping....");
}
return true;
} else if (token.startsWith("-Dactivemq.base")) {
if ( logger == null ) {
// System.out.println(
// "********** Process with PID:"+tokens[1]+ " Is an ActiveMQ Broker:"+token+". Skipping....");
} else {
logger.trace(location, null, "********** Process with PID:"+tokens[1]+ " Is an ActiveMQ Broker:"+token+". Skipping....");
}
return true;
}
}
return false;
}
/**
*
* @param pid
* @param list
* @return
*/
private boolean processAncestorIsOwnedByDucc(String ppid, Set<RunningProcess> list) {
for( RunningProcess pi : list ) {
if ( pi.getPid().equals(ppid) ) {
if ( pi.getOwner().equalsIgnoreCase("ducc") ) {
return true;
} else {
return processAncestorIsOwnedByDucc(pi.getPpid(), list);
}
}
}
return false;
}
public TreeMap<String,NodeUsersInfo> call() throws Exception {
String location = "call";
TreeMap<String,NodeUsersInfo> map = new TreeMap<String,NodeUsersInfo>();
List<String> currentPids = new ArrayList<String>();
InputStream stream = null;
BufferedReader reader = null;
try {
ProcessBuilder pb;
if ( Utils.isMac() ) {
pb = new ProcessBuilder("ps","-Ao","user=,pid=,ppid=,uid=,args=");
} else {
pb = new ProcessBuilder("ps","-Ao","user:12,pid,ppid,uid,args", "--no-heading");
}
pb.redirectErrorStream(true);
Process proc = pb.start();
// spawn ps command and scrape the output
stream = proc.getInputStream();
reader = new BufferedReader(new InputStreamReader(stream));
String line;
String regex = "\\s+";
if ( agent != null ) {
// copy all known reservations reported by the OR
agent.copyAllUserReservations(map);
}
if ( logger == null ) {
// System.out.println(
// "********** User Process Map Size After copyAllUserReservations:"+map.size());
} else {
logger.info(location, null, "********** User Process Map Size After copyAllUserReservations:"+map.size());
}
if ( agent != null ) {
// copy all known rogue processes detected previously
agent.getRogueProcessReaper().copyAllUserRogueProcesses(map);
}
if ( logger == null ) {
//System.out.println(
// "********** User Process Map Size After copyAllUserRougeProcesses:"+map.size());
} else {
logger.info(location, null, "********** User Process Map Size After copyAllUserRougeProcesses:"+map.size());
}
// Add all running processes to this list. Will use this list to determine if a process has a parent
// which is a rogue process.
Set<NodeUsersCollector.ProcessInfo> processList =
new HashSet<NodeUsersCollector.ProcessInfo>();
Set<RunningProcess> tempProcessList =
new HashSet<RunningProcess>();
// To detect rogues there are two scans through process list:
// #1 - fills tempProcessList which will be used to check each
// process parent if its own by ducc.
// #2 - the actual rogue process detection loop
List<String> procList = new ArrayList<String>();
// read the next line from ps output
while ((line = reader.readLine()) != null) {
// save line for subsequent processing in the for..loop below
procList.add(line);
String tokens[] = line.split(regex);
if ( tokens.length > 0 ) {
RunningProcess p =
new RunningProcess(tokens[1],tokens[2],tokens[0]);
// add process to a list which is used to look up each process parent
tempProcessList.add(p);
}
}
// the above loop filled tempProcessList, so now detect rogue processes.
for( String procInfo : procList) {
String tokens[] = procInfo.split(regex);
String user = tokens[0];
String pid = tokens[1];
String ppid = tokens[2];
String uid = tokens[3];
String cmd = tokens[4];
if ( tokens.length > 0 ) {
try {
// by convention processes owned by uid < gidMax are system processes thus not rogue
if ( Integer.valueOf(uid) < gidMax ) {
continue;
}
} catch( NumberFormatException nfe) {
}
// walk up the tree of ancestor processes to check if any is owned by ducc. If so, this
// process is not rogue.
if ( processAncestorIsOwnedByDucc(pid, tempProcessList)) {
continue; // skip as this is not a rogue process
}
// Detect and skip all ducc daemons except uima-as service
if ( duccDaemon(tokens)) {
continue;
}
if ( logger == null ) {
//System.out.print(line);
} else {
logger.trace(location, null, line);
}
// Check if current process is owned by a user that should be excluded
// from rogue process detection. A list of excluded users is in ducc.properties
// Dont include root, nfs, and other system owned processes. Also exclude
// processes that are defined in the process exclusion list in ducc.properties
if ( excludeUser(user) || excludeProcess(cmd) || Utils.getPID().equals(pid)) {
continue; // skip this process
}
if ( agent != null ) {
// check if this process is in any of the cgroups. If so, this process is not rogue
//if ( ((NodeAgent)agent).useCgroups && ((NodeAgent)agent).cgroupsManager.isPidInCGroup(pid) ) {
//continue; // not rogue, this process is in a cgroup
//}
NodeUsersInfo nui = null;
// Check if user record is already in the map. May have been done above in
// copyAllUserReservations().
if ( map.containsKey(user)) {
nui = map.get(user);
} else {
nui = new NodeUsersInfo(user);
map.put(user, nui);
}
if ( logger == null ) {
// System.out.println(
// "User:"+user+" Reservations:"+nui.getReservations().size()+" Rogue Processes:"+nui.getRogueProcesses().size());
} else {
logger.info(location, null, "User:"+user+" Reservations:"+nui.getReservations().size()+" Rogue Processes:"+nui.getRogueProcesses().size());
}
// add a process to a list of processes currently running on the node. The list will be used
// to remove stale rogue processes at the end of this method
// currentPids.add(tokens[1]);
currentPids.add(pid);
if ( logger == null ) {
} else {
logger.trace(location, null,"Current Process (Before Calling aggregate() - PID:"+pid+" PPID:"+ppid+" Process List Size:"+processList.size());
}
NodeUsersCollector.ProcessInfo pi =
new NodeUsersCollector.ProcessInfo(Integer.parseInt(pid),Integer.parseInt(ppid));
// add the process to the list of processes. If this process has a parent, it will be added as a child. Compose
// hierarchy of processes so that we can use it later to determine if any given process has a parent that is rogue
aggregate(processList, pi);
// fetch user reservations
List<IDuccId> userReservations = nui.getReservations();
// if user has reservations on the node, any process found is not a rogue process
if ( userReservations.size() > 0 ) {
boolean found = false;
// check if this process has previously been marked as rogue
for( NodeProcess rogue : nui.getRogueProcesses() ) {
if ( rogue.getPid().equals(pid)) {
found = true;
break;
}
}
if ( !found && !agent.isManagedProcess(processList, pi)) {
// code keeps count of java and non-java processes separately, so pass the type of process (java or not)
// to allow distinct accounting
nui.addPid(pid, ppid, cmd.endsWith("java"));
}
continue; // all we know that the user has a reservation and there is a process running. If there
// are reservations, we cant determine which user process is a rogue process
}
// detect if this is a rogue process and add it to the rogue process list. First check if the current process
// has a parent and if so, check if the parent is rogue. Second, if parent is not rogue (or no parent)
// check if the process is in agent's inventory. If its not, we have a rogue process.
if ( agent.isRogueProcess(user, processList, pi) ) {
if ( nui.getRogueProcesses().size() == 0 || !inRogueList(nui.getRogueProcesses(),pid) ) {
pi.setRogue(true);
// agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid, ppid, cmd.endsWith("java"));
}
agent.getRogueProcessReaper().submitRogueProcessForKill(user, pid, ppid, cmd.endsWith("java"));
}
}
}
}
}
catch (Exception e) {
if ( logger == null ) {
e.printStackTrace();
} else {
logger.error(location, null, e);
}
} finally {
if ( reader != null ) {
try {
reader.close();
} catch( Exception exx){}
}
}
StringBuffer sb = new StringBuffer();
// if no processes found, clear rogue process list and list of processes associated with a reserve
if ( currentPids.isEmpty()) {
for( Map.Entry<String,NodeUsersInfo> entry : map.entrySet()) {
entry.getValue().getReserveProcesses().clear();
entry.getValue().getRogueProcesses().clear();
}
}
for( Map.Entry<String,NodeUsersInfo> entry : map.entrySet()) {
sb.append(entry.getValue().toString()).append("\n");
}
if ( logger == null ) {
System.out.println(sb.toString());
System.out.println(
"***************************************************************************************");
} else {
logger.info(location, null, sb.toString());
logger.info(location, null, "******************************************************************************");
}
// remove any rogue processes that are not in the list of current processes just collected
agent.getRogueProcessReaper().removeDeadRogueProcesses(currentPids);
return map;
}
private boolean inRogueList(List<NodeProcess> rogueProcesses, String pid) {
for( NodeProcess rogue : rogueProcesses ) {
if ( rogue.getPid().equals(pid)) {
return true;
}
}
return false;
}
public class ProcessInfo {
private int pid;
private int ppid;
boolean rogue;
Set<NodeUsersCollector.ProcessInfo> childProcesses =
new HashSet<NodeUsersCollector.ProcessInfo>();
ProcessInfo(int pid, int ppid) {
this.pid = pid;
this.ppid = ppid;
}
public int getPid() {
return pid;
}
public int getPPid() {
return ppid;
}
public boolean isRogue() {
return rogue;
}
public void setRogue(boolean rogue) {
this.rogue = rogue;
}
public Set<NodeUsersCollector.ProcessInfo> getChildren() {
return childProcesses;
}
}
public class RunningProcess {
String pid;
String ppid;
public RunningProcess(String pid, String ppid, String owner) {
this.pid = pid;
this.ppid = ppid;
this.owner = owner;
}
public String getPid() {
return pid;
}
public String getPpid() {
return ppid;
}
public String getOwner() {
return owner;
}
String owner;
}
/*
private void dump(Set<NodeUsersCollector.ProcessInfo> processList) {
for( NodeUsersCollector.ProcessInfo pi: processList ) {
if ( logger == null ) {
System.out.println("Process PID:"+pi.getPid()+" PPID:"+pi.getPPid());
} else {
logger.trace("dump",null,"Process PID:"+pi.getPid()+" PPID:"+pi.getPPid());
}
if ( pi.getChildren().size() > 0 ) {
if ( logger == null ) {
System.out.println("\t=>");
} else {
logger.trace("dump",null,"\t=>");
}
for(NodeUsersCollector.ProcessInfo childpi : pi.getChildren() ) {
if ( logger == null ) {
System.out.println("PID:"+childpi.getPid()+" PPID:"+childpi.getPPid()+" | ");
} else {
logger.trace("dump",null,"PID:"+childpi.getPid()+" PPID:"+childpi.getPPid()+" | ");
}
}
if ( logger == null ) {
System.out.println("\n");
} else {
logger.trace("dump",null,"\n");
}
}
}
}
*/
public static void main(String[] args) {
/*
try {
Set<NodeUsersCollector.ProcessInfo> processList = new HashSet<NodeUsersCollector.ProcessInfo>();
NodeUsersCollector.ProcessInfo pi1 = new NodeUsersCollector.ProcessInfo(102,100);
NodeUsersCollector.ProcessInfo pi2 = new NodeUsersCollector.ProcessInfo(103,110);
NodeUsersCollector.ProcessInfo pi11 = new NodeUsersCollector.ProcessInfo(104,102);
NodeUsersCollector.ProcessInfo pi12 = new NodeUsersCollector.ProcessInfo(105,102);
NodeUsersCollector.ProcessInfo pi3 = new NodeUsersCollector.ProcessInfo(106,111);
NodeUsersCollector collector = new NodeUsersCollector(null);
collector.aggregate(processList, pi1);
// collector.dump(processList);
collector.aggregate(processList, pi2);
collector.aggregate(processList, pi11);
collector.aggregate(processList, pi12);
collector.aggregate(processList, pi3);
collector.dump(processList);
} catch( Exception e) {
e.printStackTrace();
}
*/
}
}