blob: 9507fef7c7450c7f4e432ed4709fb6d07eedaf84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
import org.apache.uima.ducc.common.utils.DuccLogger;
/**
* Manages rogue processes on a node.
*
*
*/
public class RogueProcessReaper {
private Map<String, RogueProcessEntry> userRogueProcessMap = new TreeMap<String, RogueProcessEntry>();
private Map<String, Process> processMap = new HashMap<String, Process>();
private int counterValue = 1;
private int cleanupCounterValue = 5;
int maxSecondsBeforeEntryExpires = 120; // number of seconds a process entry is kept in
// the rogue process map before it is removed.
// Default: 2 minutes
private DuccLogger logger;
boolean doKillRogueProcess = false;
private String reaperScript;
public RogueProcessReaper(DuccLogger logger, int counterValue, int cleanupCounterValue) {
// this.counterValue = counterValue;
if (cleanupCounterValue > 0) {
this.cleanupCounterValue = cleanupCounterValue;
} else {
this.cleanupCounterValue = counterValue + 5;
}
reaperScript = System.getProperty("ducc.agent.rogue.process.reaper.script");
// check if purge delay is defined in ducc.properties.
if (System.getProperty("ducc.agent.rogue.process.purge.delay") != null) {
try {
maxSecondsBeforeEntryExpires = Integer
.valueOf(System.getProperty("ducc.agent.rogue.process.purge.delay"));
} catch (Exception e) {
if (logger == null) {
e.printStackTrace();
} else {
logger.error("RogueProcessReaper.ctor", null, e);
}
maxSecondsBeforeEntryExpires = 120; // defaulting to 2 minutes
}
}
this.logger = logger;
// final String kill = System.getProperty("ducc.agent.rogue.process.kill");
if (Boolean.getBoolean("ducc.agent.rogue.process.kill") == true) {
doKillRogueProcess = true;
}
if (logger == null) {
System.out.println("ducc.agent.rogue.process.kill=" + doKillRogueProcess);
} else {
logger.info("RogueProcessReaper.ctor", null,
"ducc.agent.rogue.process.kill=" + doKillRogueProcess);
}
}
public void submitRogueProcessForKill(String user, String pid, String ppid, boolean isJava) {
final String methodName = "RogueProcessReaper.submitRogueProcessForKill";
RogueProcessEntry entry = null;
if (userRogueProcessMap.containsKey(pid)) {
entry = userRogueProcessMap.get(pid);
} else {
if (cleanupCounterValue <= counterValue) {
cleanupCounterValue += counterValue;
}
entry = new RogueProcessEntry(counterValue, cleanupCounterValue, user,
maxSecondsBeforeEntryExpires, isJava, ppid);
userRogueProcessMap.put(pid, entry);
}
entry.markAsRogue(3);
if (!entry.isRogue()) {
if (logger == null) {
System.out
.println("PID:" + pid + " Not Rogue Yet - It takes 3 iterations to make it Rogue");
} else {
logger.info("submitRogueProcessForKill", null,
"PID:" + pid + " Not Rogue Yet - It takes 3 iterations to make it Rogue");
}
return;
}
if (reaperScript != null) {
try {
// Dont kill the process immediately. Kill if this method is called "counterValue"
// number of times.
long counter = 0;
if (logger != null) {
logger.info(methodName, null,
"Decrementing Counter - Current Value:" + entry.counter.getCount());
}
if (entry.counter.getCount() > 0) {
counter = entry.countDown();
}
// check if the rogue process needs to be killed
if (counter <= 0 && !entry.isKilled()) {
if (logger == null) {
System.out.println("Process Scheduled for Kill PID:" + pid + " Owner:" + user + " ");
} else {
logger.info(methodName, null,
"Process Scheduled for Kill PID:" + pid + " Owner:" + user + " ");
}
entry.resetCounter(counterValue);
kill(user, pid);
entry.killed();
} else {
if (logger == null) {
System.out
.println("Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:" + user);
} else {
logger.info(methodName, null,
"Process ***NOT*** Scheduled for Kill PID:" + pid + " Owner:" + user);
}
}
if (entry.isKilled() && entry.countDownCleanupCounter() == 0) {
if (logger == null) {
System.out.println(
"Removing Entry From RougeProcessMap for PID:" + pid + " Owner:" + user);
} else {
logger.info(methodName, null,
"Removing Entry From RougeProcessMap for PID:" + pid + " Owner:" + user);
}
userRogueProcessMap.remove(pid);
}
} catch (Exception e) {
e.printStackTrace();
}
} else {
if (logger == null) {
System.out.println("Ducc Not Configured to Kill Rogue Proces (PID:)" + pid + " Owner:"
+ user
+ ". Change (or define) ducc.agent.rogue.process.reaper.script property in ducc.properties if you want rogue processes to be cleaned up.");
} else {
logger.info(methodName, null, "Ducc Not Configured to Kill Rogue Proces (PID:)" + pid
+ " Owner:" + user
+ ". Change (or define) ducc.agent.rogue.process.reaper.script property in ducc.properties if you want rogue processes to be cleaned up.");
}
}
if (logger == null) {
System.out.println("UserRougeProcessMap size:" + userRogueProcessMap.size());
} else {
logger.info(methodName, null, "UserRougeProcessMap size:" + userRogueProcessMap.size());
}
}
public List<String> getUserRogueProcesses(String user) {
List<String> rogues = new ArrayList<String>();
for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
if (entry.getValue().getUser().equals(user) && entry.getValue().isRogue()) {
rogues.add(entry.getKey());
}
}
return rogues;
}
public boolean removeRogueProcess(String pid) {
if (userRogueProcessMap.containsKey(pid)) {
userRogueProcessMap.remove(pid);
return true;
}
return false;
}
public void removeDeadRogueProcesses(List<String> currentPids) {
List<String> deadPIDs = new ArrayList<String>();
for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
if (!currentPids.contains(entry.getKey())) {
deadPIDs.add(entry.getKey());
}
}
for (String deadPID : deadPIDs) {
userRogueProcessMap.remove(deadPID);
}
}
public void copyAllUserRogueProcesses(TreeMap<String, NodeUsersInfo> map) {
// List containing old entries which should be deleted from userRogueProcessMap
List<String> entryCleanupList = new ArrayList<String>();
for (Map.Entry<String, RogueProcessEntry> entry : userRogueProcessMap.entrySet()) {
if (!entry.getValue().isRogue()) {
continue;
}
NodeUsersInfo nui;
if (map.containsKey(entry.getValue().getUser())) {
nui = map.get(entry.getValue().getUser());
} else {
nui = new NodeUsersInfo(entry.getValue().getUser());
map.put(entry.getValue().getUser(), nui);
}
nui.addRogueProcess(entry.getKey(), entry.getValue().getPpid(), entry.getValue().isJava());
}
for (String entryToRemove : entryCleanupList) {
if (logger == null) {
System.out.println("Removing Expired Entry From RogueProcessMap for PID:" + entryToRemove);
} else {
logger.info("copyAllUserRogueProcesses", null,
"Removing Expired Entry From RogueProcessMap for PID:" + entryToRemove);
}
userRogueProcessMap.remove(entryToRemove);
}
}
/**
* This method checks if ducc is configured to kill rogue processes and if so, proceeds to kill
* via -9.
*
* @param user
* - process owner
* @param pid
* - process id
* @throws Exception
*/
public void kill(final String user, final String pid) throws Exception {
final String methodName = "RogueProcessReaper.kill.run()";
try {
// if the previously started shell script is still alive, kill it before starting a
// new one.
synchronized (this) {
if (processMap.containsKey(pid)) {
Process p = processMap.get(pid);
if (p != null) {
p.destroy();
}
}
}
} catch (Exception e) {
logger.error(methodName, null, e);
}
new Thread(new Runnable() {
public void run() {
InputStream is = null;
BufferedReader reader = null;
try {
String[] repearScriptCommand = new String[] { reaperScript, pid };
ProcessBuilder pb = new ProcessBuilder(repearScriptCommand);
pb.redirectErrorStream(true);
Process shellProcess = pb.start();
synchronized (this) {
processMap.put(pid, shellProcess);
}
StringBuffer sb = new StringBuffer();
for (String part : repearScriptCommand) {
sb.append(part).append(" ");
}
if (logger == null) {
System.out.println("--------- Started Rogue Process Reaper Script For Pid:" + pid
+ " Owned by:" + user + " Command:" + sb.toString());
} else {
logger.info(methodName, null, "--------- Started Rogue Process Reaper Script For Pid:"
+ pid + " Owned by:" + user + " Command:" + sb.toString());
}
is = shellProcess.getInputStream();
reader = new BufferedReader(new InputStreamReader(is));
String scriptOutput = "";
// read the next line from stdout and stderr
while ((scriptOutput = reader.readLine()) != null) {
if (Objects.nonNull(logger)) {
logger.info(methodName, null, scriptOutput);
} else {
System.out.println(">>>>" + scriptOutput);
}
}
sb.setLength(0);
if (logger == null) {
System.out.println("--------- Rogue Process Reaper (for PID:" + pid + ") Terminated");
} else {
logger.info(methodName, null,
"--------- Rogue Process Reaper (for PID:" + pid + ") Terminated");
}
} catch (Exception e) {
logger.error(methodName, null, e);
} finally {
synchronized (this) {
processMap.remove(pid);
}
if (reader != null) {
try {
reader.close();
} catch (Exception exx) {
}
}
}
}
}).start();
}
private static class RogueProcessEntry {
CountDownLatch counter;
CountDownLatch cleanupCounter;
String user;
boolean killed;
boolean java;
String ppid;
AtomicInteger pendingCounter = new AtomicInteger(1);
boolean rogue;
public RogueProcessEntry(int counterValue, int cleanupCounterValue, String user,
int maxSecondsBeforeEntryExpires, boolean isJava, String ppid) {
counter = new CountDownLatch(counterValue);
cleanupCounter = new CountDownLatch(cleanupCounterValue);
this.user = user;
this.java = isJava;
this.ppid = ppid;
}
public String getPpid() {
return ppid;
}
public boolean isRogue() {
return rogue;
}
public void killed() {
killed = true;
}
public boolean isKilled() {
return killed;
}
public String getUser() {
return user;
}
public long countDown() {
counter.countDown();
return counter.getCount();
}
public void resetCounter(int counterValue) {
counter = new CountDownLatch(counterValue);
}
public long countDownCleanupCounter() {
cleanupCounter.countDown();
return cleanupCounter.getCount();
}
public void markAsRogue(int ceiling) {
if (pendingCounter.get() < ceiling) {
pendingCounter.addAndGet(1);
} else {
rogue = true;
}
}
public boolean isJava() {
return java;
}
}
public static void main(String[] args) {
}
}