[maven-release-plugin] copy for tag uima-ducc-2.2.0
git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/tags/uima-ducc-2.2.0@1779637 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/RELEASE_NOTES.html b/RELEASE_NOTES.html
index d5d0fbd..ca7b154 100755
--- a/RELEASE_NOTES.html
+++ b/RELEASE_NOTES.html
@@ -79,6 +79,12 @@
Details are in the INSTALL document and the DuccBook.
<p>
</p>
+<h2><a name="limitations">4. Limitations</a></h2>
+On some systems cgroups swap accounting is not enabled and duccmon will show N/A for swap. To
+confirm, please check memory.stat file in <cgroups base dir>/ducc/ folder. If swap accounting is
+enabled there should be "swap" property defined. If it's missing, you need to add a kernel parameter
+swapaccount=1. Details of how to do this can be found <a href="http://unix.stackexchange.com/questions/147158/how-to-enable-swap-accounting-for-memory-cgroup-in-archlinux">here</a>.
+
</body>
</html>
diff --git a/src/main/resources/default.ducc.properties b/src/main/resources/default.ducc.properties
index 5abd17c..973a5ed 100644
--- a/src/main/resources/default.ducc.properties
+++ b/src/main/resources/default.ducc.properties
@@ -892,9 +892,6 @@
# the above will exclude node from using cgroups and/or prevent deployment of APs
ducc.agent.exclusion.file=${DUCC_HOME}/resources/exclusion.nodes
-# Define cgroup control subsystems used to enforce fair share on a node
-ducc.agent.launcher.cgroups.subsystems=memory,cpu
-
# Define script which will collect total swap used by a given process. This
# script is launched by an agent via duccling and running as the owner
# of the process.
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
index 98d929a..48e5124 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
@@ -53,6 +53,7 @@
import org.apache.uima.ducc.agent.launcher.CGroupsManager;
import org.apache.uima.ducc.agent.launcher.Launcher;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
+import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority;
import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
@@ -141,6 +142,8 @@
private volatile boolean stopping = false;
+ private Object stopLock = new Object();
+
private RogueProcessReaper rogueProcessReaper = new RogueProcessReaper(logger, 5, 10);
public volatile boolean useCgroups = false;
@@ -287,6 +290,7 @@
}
}
}
+ // scan /proc/mounts for base cgroup dir
String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");
if ( cgUtilsPath == null ) {
@@ -298,19 +302,10 @@
} else {
logger.info("nodeAgent",null,"Agent found cgroups runtime in "+cgUtilsPath+" cgroups base dir="+cgroupsBaseDir);
- // get the top level cgroup folder from ducc.properties. If
- // not defined, use /cgroup/ducc as default
- //String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir");
-// if (cgroupsBaseDir == null) {
-// cgroupsBaseDir = "/cgroup/ducc";
-// }
- // get the cgroup subsystems. If not defined, default to the
- // memory and cpu subsystem
- String cgroupsSubsystems = System.getProperty("ducc.agent.launcher.cgroups.subsystems");
- if (cgroupsSubsystems == null) {
- cgroupsSubsystems = "memory,cpu";
- }
- long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+ // if cpuacct is configured in cgroups, the subsystems list will be updated
+ String cgroupsSubsystems = "memory,cpu";
+
+ long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
if (configurationFactory.processStopTimeout != null) {
maxTimeToWaitForProcessToStop = Long
.valueOf(configurationFactory.processStopTimeout);
@@ -1368,35 +1363,17 @@
}
};
class ProcessRunner implements Runnable {
- String pid = "";
- SIGNAL signal;
+ ManagedProcess deployedProcess;
- public ProcessRunner(final String pid, SIGNAL signal ) {
- this.pid = pid;
- this.signal = signal;
+ public ProcessRunner(final ManagedProcess deployedProcess) {//final String pid, SIGNAL signal ) {
+ this.deployedProcess = deployedProcess;
}
public void run() {
- String methodName = "ProcesRunner.run";
- String[] sigTermCmd = {"/bin/kill",signal.get(), pid};
- ProcessBuilder pb = new ProcessBuilder(sigTermCmd);
- try {
- // launch kill SIGTERM
- final Process process = pb.start();
- Thread inputStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getInputStream()) );
- inputStreamConsumerThread.start();
-
- Thread errorStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getErrorStream()) );
- errorStreamConsumerThread.start();
-
- process.waitFor();
- } catch ( Exception e ) {
- e.printStackTrace();
- logger.warn(methodName, null, e);
- }
+ stopProcess(deployedProcess.getDuccProcess());
}
}
- private boolean sendSIGTERM(ManagedProcess process) {
+ private boolean runnable(ManagedProcess process) {
return ( process.getDuccProcess().getProcessState().equals(ProcessState.Initializing) ||
process.getDuccProcess().getProcessState().equals(ProcessState.Starting) ||
process.getDuccProcess().getProcessState().equals(ProcessState.Running) );
@@ -1414,16 +1391,16 @@
try {
for (ManagedProcess deployedProcess : deployedProcesses) {
String pid = deployedProcess.getDuccProcess().getPID();
- if (pid == null || pid.trim().length() == 0 || !sendSIGTERM(deployedProcess) ) {
+ if (pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
continue;
}
logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+ " PID:" + pid+" Sending SIGTERM Process State:"+deployedProcess.getDuccProcess().getProcessState().toString());
wait = true;
- deployedProcess.setStopping();
+ deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
+ // Stop each child process in its own thread to parallelize SIGTERM requests
ExecutorService executor = Executors.newSingleThreadExecutor();
- executor.execute( new ProcessRunner(pid,SIGNAL.SIGTERM));
-
+ executor.execute( new ProcessRunner(deployedProcess) ); //pid,SIGNAL.SIGTERM));
}
} catch( Exception e) {
@@ -1431,6 +1408,48 @@
}
return wait;
}
+
+ private void killChildProcesses() {
+ String methodName = "killChildProcesses";
+
+
+ try {
+ if ( useCgroups ) {
+ logger.info("stop", null, "CgroupsManager.cleanup() before ");
+ // Since SIGTERM may not be enough to take down a process, use cgroups to find
+ // any process still standing and do hard kill
+ cgroupsManager.cleanup();
+ logger.info("stop", null, "CgroupsManager.cleanup() after ");
+ } else {
+ for (ManagedProcess deployedProcess : deployedProcesses) {
+ String pid = deployedProcess.getDuccProcess().getPID();
+ if (pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
+ continue;
+ }
+ logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+ + " PID:" + pid+" Sending SIGKILL Process State:"+deployedProcess.getDuccProcess().getProcessState().toString());
+ ICommandLine cmdLine;
+ if (Utils.isWindows()) {
+ cmdLine = new NonJavaCommandLine("taskkill");
+ cmdLine.addArgument("/PID");
+ } else {
+ cmdLine = new NonJavaCommandLine("/bin/kill");
+ cmdLine.addArgument("-9");
+ }
+ cmdLine.addArgument(pid);
+
+ deployedProcess.setStopping();
+ deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
+
+ launcher.launchProcess(this, getIdentity(),deployedProcess.getDuccProcess(), cmdLine, this, deployedProcess);
+ }
+
+ }
+ } catch( Exception e) {
+ logger.warn(methodName, null, e);
+ }
+
+ }
/**
* Kills a given process
*
@@ -1782,122 +1801,75 @@
}
public void stop() throws Exception {
- if (stopping) {
- return;
- }
- stopping = true;
- logger.info("stop", null, "Agent stopping managed processes");
- // Stop monitor thread watching for Agent pings
- // if ( nodeMonitor != null ) {
- // nodeMonitor.shutdown();
- // }
- // if ( configurationFactory.getAgentPingDispatcher() != null ) {
- // configurationFactory.getAgentPingDispatcher().stop();
- // }
-
- // Dispatch SIGTERM to all child processes
- boolean wait = stopChildProcesses();
-
-
- /*
- synchronized (monitor) {
- Iterator<ManagedProcess> it = deployedProcesses.iterator();
- while (it.hasNext()) {
- ManagedProcess mp = it.next();
- // mp.kill();
- stopProcess(mp.getDuccProcess());
- }
- }
- logger.info("stop", null, "Agent dispatched STOP to all managed processes");
-
- // wait until all JPs stop
- while (true) {
- try {
- // break if no processes in the inventory
- if (deployedProcesses.size() == 0) {
- break;
+ synchronized(stopLock) {
+ logger.info("stop", null, "Agent stop()");
+ if (stopping) {
+ return;
}
- boolean atLeastOneProcessStillRunning = false;
- synchronized (monitor) {
- // check state of each process. If there is a process that
- // is not dead yet
- // just wait a little while and check the state again.
- Iterator<ManagedProcess> pit = deployedProcesses.iterator();
- // find at least one process that is not dead yet
- while (pit.hasNext()) {
- ManagedProcess mp = pit.next();
- // if the process is not dead,
- if (!mp.getDuccProcess().getProcessState().equals(ProcessState.Stopped)
- && !mp.getDuccProcess().getProcessState().equals(ProcessState.Failed)
- && !mp.getDuccProcess().getProcessState().equals(ProcessState.Killed)) {
- atLeastOneProcessStillRunning = true;
- break;
- }
- }
- }
- // if there are no running processes just break from the
- // 'while(true)' loop
- if (atLeastOneProcessStillRunning == false) {
- break;
- }
- synchronized (this) {
- wait(100);
- }
- } catch (Exception e) {
- }
- }
-*/
-
- // Stop publishing inventory. Once the route is down the agent forces last publication
- // sending an empty process map.
- configurationFactory.stopInventoryRoute();
+ stopping = true;
+
+ // Send an empty process map as the final inventory
+ HashMap<DuccId, IDuccProcess> emptyMap =
+ new HashMap<DuccId, IDuccProcess>();
+ DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity());
+ inventoryDispatcher.dispatch(duccEvent);
+ logger.info("stop", null, "Agent published final inventory");
+
+ configurationFactory.stopRoutes();
+
+ logger.info("stop", null, "Agent stopping managed processes");
+ // Dispatch SIGTERM to all child processes
+ boolean wait = stopChildProcesses();
+
+ // Stop publishing inventory. Once the route is down the agent forces last publication
+ // sending an empty process map.
+ //configurationFactory.stopInventoryRoute();
- if ( wait && deployedProcesses.size() > 0 ) {
- logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
- // wait for awhile
- synchronized (this) {
- long waittime = 60000;
- if (configurationFactory.processStopTimeout != null ) {
- try {
- waittime = Long.parseLong(configurationFactory.processStopTimeout);
- } catch( NumberFormatException e) {
- logger.warn("stop", null, e);
- }
- }
- logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
- wait(waittime);
- }
- }
+ if ( wait && deployedProcesses.size() > 0 ) {
+ logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
+ // wait for awhile
+ synchronized (this) {
+ long waittime = 60000;
+ if (configurationFactory.processStopTimeout != null ) {
+ try {
+ waittime = Long.parseLong(configurationFactory.processStopTimeout);
+ } catch( NumberFormatException e) {
+ logger.warn("stop", null, e);
+ }
+ }
+ logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
+ wait(waittime);
+ }
+ }
- // Send an empty process map as the final inventory
- HashMap<DuccId, IDuccProcess> emptyMap =
- new HashMap<DuccId, IDuccProcess>();
- DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity());
- inventoryDispatcher.dispatch(duccEvent);
- logger.info("stop", null, "Agent published final inventory");
-
- // Since SIGTERM may not be enough to take down a process, use cgroups to find
- // any process still standing and do hard kill
- cgroupsManager.cleanup();
-
- // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry
- // mode trying to recover a connection
- Thread t = new Thread( new Runnable() {
- public void run() {
- try {
- logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) ");
- Thread.sleep(10000);
- } catch(Exception e ) {
- logger.info("stop", null, e);
- } finally{
- logger.info("stop", null, "Agent calling System.exit(1) ... ");
- System.exit(1);
- }
- }
- });
- t.start();
-
- super.stop();
+ // send kill -9 to any child process still running
+ killChildProcesses();
+/*
+ // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry
+ // mode trying to recover a connection
+ Thread t = new Thread( new Runnable() {
+ public void run() {
+ try {
+ logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) ");
+ Thread.sleep(10000);
+ } catch(Exception e ) {
+ logger.info("stop", null, e);
+ } finally{
+ logger.info("stop", null, "Agent calling System.exit(1) ... ");
+ System.exit(1);
+ }
+ }
+ });
+ t.start();
+ t.join(10000);
+ super.stop();
+ logger.info("stop", null, "Reaper thread finished - calling super.stop()");
+ }
+*/
+ stopLock.wait(2000);
+ super.stop();
+ logger.info("stop", null, "Reaper thread finished - calling super.stop()");
+ }
}
public Future<?> getDeployedJPFuture(IDuccId duccId) {
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
index 82406fd..ab303ec 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
@@ -23,10 +23,13 @@
import javax.annotation.PostConstruct;
+import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.xstream.XStreamDataFormat;
import org.apache.camel.impl.DefaultClassResolver;
@@ -471,19 +474,19 @@
camelContext.addRoutes(metricsRouteBuilder);
}
-
+ public void stopRoutes() throws Exception {
+ camelContext.stop();
+ logger.info("AgentConfigureation.stopRoutes", null,"Camel Context stopped");
+
+ }
@Bean
@PostConstruct
-// public NodeMetricsProcessor nodeMetricsProcessor(NodeAgent agent) throws Exception {
public NodeMetricsProcessor nodeMetricsProcessor() throws Exception {
if (Utils.isLinux()) {
-// return new LinuxNodeMetricsProcessor(agent, "/proc/meminfo", "/proc/loadavg");
nodeMetricsProcessor = new LinuxNodeMetricsProcessor();
((LinuxNodeMetricsProcessor)nodeMetricsProcessor).initMemInfo("/proc/meminfo");
((LinuxNodeMetricsProcessor)nodeMetricsProcessor).initLoadAvg("/proc/loadavg");
- //agent, "/proc/meminfo", "/proc/loadavg");
} else {
-// return new DefaultNodeMetricsProcessor(agent);
nodeMetricsProcessor = new DefaultNodeMetricsProcessor();
}
return nodeMetricsProcessor;
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
index 421d7bf..0e8b3ea 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
@@ -25,6 +25,9 @@
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
@@ -57,8 +60,6 @@
private static final String CGDuccMemoryPath = "/memory/"+SYSTEM+"/";
private static final String CGDuccCpuPath = "/cpu/"+SYSTEM+"/";
private static final String CGProcsFile = "/cgroup.procs";
-// private static final String CGDuccCpuAcctPath = "/cpu/"+SYSTEM+"/";
-
// legacy means that the cgonfig points to <cgroup location>/ducc
private boolean legacyCgConfig = false;
@@ -81,6 +82,9 @@
// stores cgroup utils location like cgcreate, cgset, etc
private String cgroupUtilsDir=null;
// stores comma separated list of subsystems like cpu,memory
+
+ private boolean cpuInfoSymlinked = true;
+
private String cgroupSubsystems = ""; // comma separated list of subsystems
private long retryMax = 4;
private long delayFactor = 2000; // 2 secs in millis
@@ -88,29 +92,37 @@
private static String fetchCgroupsBaseDir(String mounts) {
String cbaseDir=null;
BufferedReader br = null;
+ List<String> cgroupsEntries = new ArrayList<String>();
try {
- FileInputStream fis = new FileInputStream(mounts);
- //Construct BufferedReader from InputStreamReader
- br = new BufferedReader(new InputStreamReader(fis));
-
- String line = null;
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- if ( line.trim().startsWith("cgroup") ) {
+ List<String> lines = Files.readAllLines(Paths.get(mounts),Charset.defaultCharset());
+ for( String line : lines ) {
+ // trim the list to just cgroups
+ if ( line.trim().startsWith("cgroup") ) {
+ cgroupsEntries.add(line);
+ }
+ }
+ // check if this is legacy cgroups installation looking like
+ // cgroup /cgroup cgroup rw,relatime,memory,cpuacct,cpu 0 0
+ if ( cgroupsEntries.size() == 1) {
+ String[] cgroupsInfo = cgroupsEntries.get(0).split(" ");
+ // return the mount point minus the memory part
+ cbaseDir = cgroupsInfo[1].trim();
+ } else {
+ // check if this is recent cgroups installation looking like
+ // cgroup /cgroup/memory cgroup rw,relatime,memory 0 0
+ // OR
+ // cgroup /sys/fs/cgroup/memory cgroup rw ...
+
+ // return the mount point minus the memory part
+ for(String line : cgroupsEntries) {
String[] cgroupsInfo = line.split(" ");
- if ( cgroupsInfo[1].indexOf("/memory") > -1 ) {
- // return the mount point minus the memory part
+ if ( cgroupsInfo[1].indexOf("/memory") != -1 ) {
cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory") );
- } else if ( cgroupsInfo[1].indexOf("cpu") > -1){
- // return the mount point minus the memory part
- cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/cpu") );
- } else {
- cbaseDir = cgroupsInfo[1].trim();
- }
- break;
- }
- }
-
+ break;
+ }
+ }
+ }
+
} catch( Exception e) {
e.printStackTrace();
} finally {
@@ -126,45 +138,81 @@
* @param args
*/
public static void main(String[] args) {
- try {
+ String cgroupsUtilsDirs = "/usr/bin,/bin"; //System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
+ String cgUtilsPath = null;
+ DuccLogger logger = DuccLogger.getLogger(CGroupsManager.class, "CGroupsManager");
+ //boolean useCgroups = false;
-// String cgBaseDir = "/cgroup"; //"/sys/fs/cgroup";//fetchCgroupsBaseDir("/proc/mounts");
- String cgBaseDir = "/sys/fs/cgroup";//fetchCgroupsBaseDir("/proc/mounts");
-
- CGroupsManager cgMgr = new CGroupsManager("/usr/bin",cgBaseDir, "memory",
- null, 10000);
- System.out.println("Cgroups Accounting Enabled:"+cgMgr.isCpuReportingEnabled());
-
- /*
- cgMgr.validator(cgBaseDir, "test2", System.getProperty("user.name"),false)
- .cgcreate();
+ String[] paths = cgroupsUtilsDirs.split(",");
+ for( String path : paths ) {
+ File file = new File(path.trim()+"/cgexec");
+ if ( file.exists() ) {
+ cgUtilsPath = path;
+ break;
+ }
+ }
- System.out.println("Cgroups Installed:"
- + cgMgr.cgroupExists("/cgroup/ducc"));
- Set<String> containers = cgMgr.collectExistingContainers();
- for (String containerId : containers) {
- System.out.println("Existing CGroup Container ID:"
- + containerId);
- }
- cgMgr.createContainer(args[0], args[2], cgMgr.getUserGroupName(args[2]),true);
- cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true,
- Long.parseLong(args[1]));
- synchronized (cgMgr) {
- cgMgr.wait(60000);
- }
- cgMgr.destroyContainer(args[0], args[2], NodeAgent.SIGKILL);
-*/
- } catch (Exception e) {
- e.printStackTrace();
- }
+ String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");
+
+ if ( cgUtilsPath == null ) {
+ System.out.println("------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties");
+ } else if ( cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) {
+ System.out.println("------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts");
+
+ } else {
+ System.out.println("Agent found cgroups runtime in "+cgUtilsPath+" cgroups base dir="+cgroupsBaseDir);
+
+ String cgroupsSubsystems = "memory,cpu";
+
+ long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+
+ CGroupsManager cgroupsManager =
+ new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);
+ // check if cgroups base directory exists in the filesystem
+ // which means that cgroups
+ // and cgroups convenience package are installed and the
+ // daemon is up and running.
+
+ try {
+ if (cgroupsManager.cgroupExists(cgroupsBaseDir)) {
+ System.out.println("Agent found cgroup base directory in "+cgroupsBaseDir);
+
+ String containerId = "test";
+ // validate cgroups by creating a dummy cgroup. The code checks if cgroup actually got created by
+ // verifying existence of test cgroup file. The second step in verification is to check if
+ // CPU control is working. Configured in cgconfig.conf, the CPU control allows for setting
+ // cpu.shares. The code will attempt to set the shares and subsequently tries to read the
+ // value from cpu.shares file to make sure the values match. Any exception in the above steps
+ // will cause cgroups to be disabled.
+ //
+ cgroupsManager.validator(cgroupsBaseDir, containerId, System.getProperty("user.name"),false)
+ .cgcreate()
+ .cgset(100); // write cpu.shares=100 and validate
+
+ // cleanup dummy cgroup
+ cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"), -9);
+ }
+
+ } catch( Exception ee) {
+ ee.printStackTrace();
+ }
+ }
}
- public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String cgroupSubsystems,
+ public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String subsystems,
DuccLogger agentLogger, long maxTimeToWaitForProcessToStop) {
this.cgroupUtilsDir = cgroupUtilsDir;
this.cgroupBaseDir = cgroupBaseDir;
- this.cgroupSubsystems = cgroupSubsystems;
+ this.cgroupSubsystems = subsystems;
this.agentLogger = agentLogger;
this.maxTimeToWaitForProcessToStop = maxTimeToWaitForProcessToStop;
+
+ // on some systems cpu and cpuacct may be linked to the same directory. In such
+ // cases we need adjust cgdelete command to only include memory,cpu as submodules:
+ //
+
+
+
+
// determine what cgroup base location should be. For legacy cgconfig
// it will be <cgroup folder>/ducc
try {
@@ -183,6 +231,37 @@
// if there is an error here, the new cgconfig is assumed and subject
// to additional testing on agent startup.
}
+ cpuInfoSymlinked = symbolicLinksForCpu();
+ String location = getCGroupLocation("cpuacct").trim();
+ if ( !legacyCgConfig ) {
+ if (!location.endsWith(System.getProperty("file.separator") )) {
+ location += System.getProperty("file.separator");
+ }
+ location += SYSTEM+System.getProperty("file.separator");
+ }
+ System.out.println("------------- Location:"+location);
+ /*
+ if ( !legacyCgConfig ) {
+ location = cgroupBaseDir; //SYSTEM+System.getProperty("file.separator");
+ if ( !location.endsWith(System.getProperty("file.separator"))) {
+ location = location + System.getProperty("file.separator");
+ }
+ location += SYSTEM+System.getProperty("file.separator");
+ } else {
+ location = getCGroupLocation("cpuacct").trim();
+ if ( !location.endsWith(System.getProperty("file.separator"))) {
+ location = location + System.getProperty("file.separator");
+ }
+
+ }
+*/
+
+
+ File cpuacctUsageFile = new File(location+"cpuacct.usage");
+ if ( cpuacctUsageFile.exists()) {
+ System.out.println("Got cpuacct.usage file");
+ this.cgroupSubsystems += ",cpuacct";
+ }
}
/**
@@ -208,7 +287,75 @@
}
return location + subsystem;
}
-
+ private boolean isCpuSubmodule(String[] parts, String submoduleName) {
+ return ( parts.length > 10 && parts[10].equals(submoduleName) );
+ }
+ private boolean symbolicLinksForCpu() {
+// File f = new File("/sys/fs/cgroup");
+ File f = new File(cgroupBaseDir);
+ if ( !f.exists()) {
+ return false;
+ }
+ InputStreamReader isr = null;
+ BufferedReader reader = null;
+ try {
+// String cmd[] = {"/usr/bin/ls","-l","/sys/fs/cgroup"};
+ String cmd[] = {"/bin/ls","-l",cgroupBaseDir};
+ StringBuffer sb = new StringBuffer();
+ for (String s : cmd) {
+ sb.append(s).append(" ");
+ }
+ agentLogger.info("symbolicLinksForCpu", null, "Launching Process - Commandline:"+sb.toString());
+
+ ProcessBuilder processLauncher = new ProcessBuilder();
+ processLauncher.command(cmd);
+ processLauncher.redirectErrorStream(true);
+ java.lang.Process process = processLauncher.start();
+ isr = new InputStreamReader(process.getInputStream());
+ reader = new BufferedReader(isr);
+ String line;
+ String cpuLinkDir = "";
+ String cpuacctLinkDir = "";
+
+ agentLogger.info("symbolicLinksForCpu", null, "Consuming Process Streams");
+ while ((line = reader.readLine()) != null) {
+ agentLogger.info("symbolicLinksForCpu", null, ">>>>" + line);
+ //groupName = line.trim();
+ String parts[] = line.split(" ");
+ if ( parts.length > 0 && parts[0].charAt(0)=='l') { // link
+ if ( isCpuSubmodule(parts,"cpu") ) {
+ cpuLinkDir = parts[parts.length-1];
+ } else if (isCpuSubmodule(parts,"cpuacct") ) {
+ cpuacctLinkDir = parts[parts.length-1];
+ }
+ // check if we got what we were looking for. If so, no need to iterate more
+ if ( cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0 ) {
+ break;
+ }
+ }
+ }
+
+ agentLogger.info("symbolicLinksForCpu", null, "Waiting for Process to Exit");
+ int retCode = process.waitFor();
+ agentLogger.info("symbolicLinksForCpu", null, "Pocess Exit Code="+retCode);
+ if ( cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0) {
+ if ( cpuLinkDir.trim().equals(cpuLinkDir.trim())) {
+ return true; // both cpu and cpuacct link to the same dir
+ }
+ }
+
+ } catch( Exception e) {
+ agentLogger.error("symbolicLinksForCpu", null, e);
+
+ } finally {
+ if ( reader != null ) {
+ try {
+ reader.close();
+ } catch(Exception e) {}
+ }
+ }
+ return false;
+ }
public void configure(NodeAgent agent ) {
if ( agent != null ) {
if ( agent.configurationFactory.maxRetryCount != null ) {
@@ -509,6 +656,17 @@
}
return location+id+"cpuacct.usage";
}
+ private String composeMemoryStatFileName(String id) {
+ String location = getCGroupLocation("memory").trim();
+ if ( !location.endsWith(System.getProperty("file.separator"))) {
+ location = location + System.getProperty("file.separator");
+ }
+ if ( !legacyCgConfig ) {
+ location += SYSTEM+System.getProperty("file.separator");
+ }
+ return location+id+"memory.stat";
+ }
+
public boolean isCpuReportingEnabled() {
// String file = getCGroupLocation("cpuacct")+System.getProperty("file.separator")+"cpuacct.usage";
@@ -551,7 +709,63 @@
usage = -1; // cgroups accounting not configured
}
return usage;
+
}
+ public enum CgroupMemoryStat {
+ RSS("rss"),
+ SWAP("swap"),
+ FAULTS("pgpgin");
+
+ String key;
+
+ private CgroupMemoryStat(String aKey) {
+ this.key = aKey;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+
+ }
+ public long getUsageForMemoryStat(CgroupMemoryStat stat, String containerId ) throws Exception {
+ long usage = -1;
+
+ if (!containerId.endsWith(System.getProperty("file.separator"))) {
+ containerId = containerId + System.getProperty("file.separator");
+ }
+ String file = composeMemoryStatFileName(containerId.trim());
+ agentLogger.info("getUsageForMemoryStat", null, "MEMORY.STAT file:"+file);
+ File f = new File(file);
+ if ( f.exists() ) {
+ InputStreamReader isr = new InputStreamReader(new FileInputStream(f));
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ try {
+ while ((line = br.readLine()) != null) {
+ agentLogger.trace("getUsageForMemoryStat", null, "MEMORY.STAT Line:"+line);
+ if ( line.startsWith(stat.getKey())) {
+ usage = Long.parseLong(line.trim().split(" ")[1]);
+ break;
+ }
+ }
+ } catch ( Exception e) {
+ agentLogger.error("getUsageForMemoryStat", null, e);
+ }
+ finally {
+ if (isr != null) {
+ isr.close();
+ }
+ agentLogger.trace("getUsageForMemoryStat", null, "Done Reading memory.stat file:"+file);
+ }
+ } else {
+ agentLogger.info("getUsageForMemoryStat", null, "MEMORY.STAT file:"+file+" Not Found - Process RSS Usage is Unavailable");
+
+ usage = -1; // cgroups accounting not configured
+ }
+ return usage;
+ }
+
+
/**
* Sets the max memory use for an existing cgroup container.
*
@@ -700,6 +914,39 @@
}
return childCount;
}
+ private String adjustSubsystems() {
+
+
+ // if cpu and cpuacct are sym linked to the same dir, remove cpuacct part
+ // from the submodule list as it causes the cgdelete to throw an error.
+ if ( cpuInfoSymlinked && cgroupSubsystems.indexOf(",cpuacct") > -1 ) {
+ return cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );
+
+
+ /*
+ StringBuffer sb = new StringBuffer();
+ if ( cgroupSubsystems.indexOf("cpuacct") > -1 ) {
+ String[] subsystems = cgroupSubsystems.trim().split(",");
+ //StringUtils.j
+ Iterator<String> it = Arrays.asList(subsystems).iterator();
+ if ( it.hasNext()) {
+ while(it.hasNext() ) {
+ String subsystem = it.next();
+ if ( !"cpuacct".equals(subsystem)) {
+ sb.append(subsystem);
+ if ( it.hasNext()) {
+ sb.append(",");
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+ */
+ }
+
+ return cgroupSubsystems;
+ }
/**
* Removes cgroup container with a given id. Cgroups are implemented as a
* virtual file system. All is needed here is just rmdir.
@@ -733,7 +980,10 @@
}
// Any process remaining in a cgroup will be killed hard
killChildProcesses(containerId, userId, NodeAgent.SIGKILL);
- String[] command = new String[] { cgroupUtilsDir+"/cgdelete",cgroupSubsystems + ":"+SYSTEM+"/" + containerId };
+ //String subsystems =cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );
+ String subsystems = adjustSubsystems();
+
+ String[] command = new String[] { cgroupUtilsDir+"/cgdelete",subsystems + ":"+SYSTEM+"/" + containerId };
int retCode = launchCommand(command);
if ( cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {
agentLogger.info("destroyContainer", null, "Failed to remove Container "+containerId+" Using cgdelete command. Exit code:"+retCode);
@@ -742,12 +992,6 @@
containerIds.remove(containerId);
return true;
}
-// if (retCode == 0) {
-// containerIds.remove(containerId);
-// return true;
-// } else {
-// return false;
-// }
}
return true; // nothing to do, cgroup does not exist
} catch (Exception e) {
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
index f372b44..fca5bbb 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
@@ -95,12 +95,7 @@
if (cgroupsBaseDir == null) {
cgroupsBaseDir = "/cgroup/ducc";
}
- // get the cgroup subsystems. If not defined, default to the
- // memory and cpu subsystem
- String cgroupsSubsystems = System.getProperty("ducc.agent.launcher.cgroups.subsystems");
- if (cgroupsSubsystems == null) {
- cgroupsSubsystems = "memory,cpu";
- }
+ String cgroupsSubsystems = "memory,cpu";
long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
cgroupsManager =
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
index 48a1f0f..a1d8793 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
@@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.container.FlagsHelper;
import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -44,6 +45,7 @@
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IProcessState;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
@@ -303,101 +305,119 @@
}
}
+ private boolean processInRunningOrInitializingState() {
+ return ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Running) ||
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ .equals(ProcessState.Initializing)
+ );
+ }
private void stopProcess(ICommandLine cmdLine, String[] cmd)
throws Exception {
String methodName = "stopProcess";
-
-
- Future<?> future = ((ManagedProcess) managedProcess).getFuture();
- if (future == null) {
- throw new Exception(
- "Future Object not Found. Unable to Stop Process with PID:"
- + ((ManagedProcess) managedProcess).getPid());
- }
- // for stop to work, PID must be provided
- if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null
- || ((ManagedProcess) managedProcess).getDuccProcess().getPID()
- .trim().length() == 0) {
- throw new Exception(
- "Process Stop Command Failed. PID not provided.");
- }
- long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
- if (super.agent.configurationFactory.processStopTimeout != null) {
- maxTimeToWaitForProcessToStop = Long
- .valueOf(super.agent.configurationFactory.processStopTimeout);
- }
- try {
- // NEW Code
- logger.info(methodName,
- ((ManagedProcess) super.managedProcess).getDuccId(),
- ">>>>>>>>>>>>>>> Stopping Process:"
- + ((ManagedProcess) managedProcess).getPid());
- ICommandLine cmdL;
- if (Utils.isWindows()) {
- cmdL = new NonJavaCommandLine("taskkill");
- cmdL.addArgument("/PID");
- } else {
- cmdL = new NonJavaCommandLine("/bin/kill");
- cmdL.addArgument("-15");
- }
- cmdL.addArgument(((ManagedProcess) managedProcess)
- .getDuccProcess().getPID());
-
- String[] sigTermCmdLine = getDeployableCommandLine(cmdL,
- new HashMap<String, String>());
- doExec(new ProcessBuilder(sigTermCmdLine), sigTermCmdLine,
- true);
-
- try {
- logger.info(methodName,
- ((ManagedProcess) super.managedProcess)
- .getDuccId(),
- "------------ Agent Starting Killer Timer Task For Process with PID:"
- + ((ManagedProcess) managedProcess)
- .getDuccProcess().getPID()
- + " Process State: "
- + ((ManagedProcess) managedProcess)
- .getDuccProcess()
- .getProcessState());
- future.get(maxTimeToWaitForProcessToStop,
- TimeUnit.MILLISECONDS);
-
- } catch(TimeoutException te) {
-
- logger.info(
- methodName,
- ((ManagedProcess) super.managedProcess)
- .getDuccId(),
- "------------ Agent Timed-out Waiting for Process with PID:"
- + ((ManagedProcess) managedProcess)
- .getDuccProcess().getPID()
- + " to Stop. Process State:"
- + ((ManagedProcess) managedProcess)
- .getDuccProcess()
- .getProcessState()
- + " .Process did not stop in allotted time of "
- + maxTimeToWaitForProcessToStop
- + " millis");
+
+ if ( processInRunningOrInitializingState() ) {
+ Future<?> future = ((ManagedProcess) managedProcess).getFuture();
+ if (future == null) {
+ throw new Exception(
+ "Future Object not Found. Unable to Stop Process with PID:"
+ + ((ManagedProcess) managedProcess).getPid());
+ }
+ // for stop to work, PID must be provided
+ if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null
+ || ((ManagedProcess) managedProcess).getDuccProcess().getPID()
+ .trim().length() == 0) {
+ throw new Exception(
+ "Process Stop Command Failed. PID not provided.");
+ }
+ try {
+ // NEW Code
logger.info(methodName,
- ((ManagedProcess) super.managedProcess)
- .getDuccId(),
- ">>>>>>>>>>>>>>> Killing Process:"
- + ((ManagedProcess) managedProcess)
- .getDuccProcess().getPID()
- + " .Process State:"
- + ((ManagedProcess) managedProcess)
- .getDuccProcess()
- .getProcessState());
- doExec(new ProcessBuilder(cmd), cmd, true);
+ ((ManagedProcess) super.managedProcess).getDuccId(),
+ ">>>>>>>>>>>>>>> Stopping Process:"
+ + ((ManagedProcess) managedProcess).getPid());
+ ICommandLine cmdL;
+ if (Utils.isWindows()) {
+ cmdL = new NonJavaCommandLine("taskkill");
+ cmdL.addArgument("/PID");
+ } else {
+ cmdL = new NonJavaCommandLine("/bin/kill");
+ cmdL.addArgument("-15");
+ }
+ cmdL.addArgument(((ManagedProcess) managedProcess)
+ .getDuccProcess().getPID());
+ String[] sigTermCmdLine = getDeployableCommandLine(cmdL,
+ new HashMap<String, String>());
+ doExec(new ProcessBuilder(sigTermCmdLine), sigTermCmdLine,
+ true);
+ // if agent receives admin STOP request, all managed processes should
+ // be stopped without each waiting for 60 secs. The agent
+ // blasts SIGTERM in parallel to all running child processes
+ if (!StopPriority.DONT_WAIT.equals(((ManagedProcess) managedProcess).getStopPriority()) ) {
+ long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+ if (super.agent.configurationFactory.processStopTimeout != null) {
+ maxTimeToWaitForProcessToStop = Long
+ .valueOf(super.agent.configurationFactory.processStopTimeout);
+ }
+
+ try {
+ logger.info(methodName,
+ ((ManagedProcess) super.managedProcess)
+ .getDuccId(),
+ "------------ Agent Starting Killer Timer Task For Process with PID:"
+ + ((ManagedProcess) managedProcess)
+ .getDuccProcess().getPID()
+ + " Process State: "
+ + ((ManagedProcess) managedProcess)
+ .getDuccProcess()
+ .getProcessState());
+ future.get(maxTimeToWaitForProcessToStop,
+ TimeUnit.MILLISECONDS);
+
+ } catch(TimeoutException te) {
+ if ( !((ManagedProcess) managedProcess).
+ getDuccProcess().getProcessState().equals(ProcessState.Stopped) ) {
+ logger.info(
+ methodName,
+ ((ManagedProcess) super.managedProcess)
+ .getDuccId(),
+ "------------ Agent Timed-out Waiting for Process with PID:"
+ + ((ManagedProcess) managedProcess)
+ .getDuccProcess().getPID()
+ + " to Stop. Process State:"
+ + ((ManagedProcess) managedProcess)
+ .getDuccProcess()
+ .getProcessState()
+ + " .Process did not stop in allotted time of "
+ + maxTimeToWaitForProcessToStop
+ + " millis");
+ logger.info(methodName,
+ ((ManagedProcess) super.managedProcess)
+ .getDuccId(),
+ ">>>>>>>>>>>>>>> Killing Process:"
+ + ((ManagedProcess) managedProcess)
+ .getDuccProcess().getPID()
+ + " .Process State:"
+ + ((ManagedProcess) managedProcess)
+ .getDuccProcess()
+ .getProcessState());
+ doExec(new ProcessBuilder(cmd), cmd, true);
+
+ }
+
+ }
+
+
+ }
+ } catch (Exception e) { // InterruptedException, ExecutionException
+ logger.error(methodName,
+ ((ManagedProcess) super.managedProcess).getDuccId(), e,
+ new Object[] {});
}
- } catch (Exception e) { // InterruptedException, ExecutionException
- logger.error(methodName,
- ((ManagedProcess) super.managedProcess).getDuccId(), e,
- new Object[] {});
+
}
-
}
private void startProcess(ICommandLine cmdLine, String[] cmd,
@@ -473,7 +493,7 @@
}
}
- private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd)
+ public void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd)
throws Exception {
String methodName = "doExec";
int exitCode = 0;
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
index 21394e3..fc384f5 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
@@ -60,9 +60,11 @@
};
public static enum StopPriority {
- KILL_9, QUIESCE_AND_STOP
+ NONE,KILL_9, QUIESCE_AND_STOP, DONT_WAIT
}
+ StopPriority stopPriority = StopPriority.NONE;
+
private volatile boolean destroyed = false;
private IDuccProcess duccProcess;
// Used to kill a process after it reports its PID.
@@ -161,7 +163,12 @@
public void setMetricsProcessor(LinuxProcessMetricsProcessor processor) {
metricsProcessor = processor;
}
-
+ public void setStopPriority(StopPriority sp ) {
+ stopPriority = sp;
+ }
+ public StopPriority getStopPriority() {
+ return stopPriority;
+ }
public LinuxProcessMetricsProcessor getMetricsProcessor() {
return metricsProcessor;
}
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
index 962c33c..52c5d48 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
@@ -44,45 +44,7 @@
}
private long collect() throws Exception{
-
+ // use cgroups manager to collect cpu usage
return cgm.getCpuUsage(containerId);
-
}
-/*
- private String execTopShell() throws Exception {
- List<String> command = new ArrayList<String>();
- command.add("top");
- command.add("-b");
- command.add("-n");
- command.add("1");
- command.add("-p");
- command.add(pid);
-
- ProcessBuilder builder = new ProcessBuilder(command);
- Process process = builder.start();
- InputStream is = process.getInputStream();
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- String line;
- int count = 0;
- String cpu = "";
- try {
- while ((line = br.readLine()) != null) {
- if (count == 7) {
- String[] values = line.trim().split("\\s+");
- cpu = values[9];
- process.destroy();
- break;
- }
- count++;
- }
- } finally {
- if (is != null) {
- is.close();
- }
- }
- process.waitFor();
- return cpu;
- }
- */
}
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
index fcb02e5..1d462aa 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
@@ -20,26 +20,32 @@
import java.util.concurrent.Callable;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat;
import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;
import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
import org.apache.uima.ducc.common.utils.DuccLogger;
public class ProcessMajorFaultCollector implements
Callable<ProcessMemoryPageLoadUsage> {
- String pid;
+ private CGroupsManager cgm=null;
+ private String containerId=null;
- public ProcessMajorFaultCollector(DuccLogger logger, String pid ) {
- this.pid = pid;
+ public ProcessMajorFaultCollector(DuccLogger logger, CGroupsManager mgr, String containerId ) {
+ this.cgm = mgr;
+ this.containerId = containerId;
}
public ProcessMemoryPageLoadUsage call() throws Exception {
try {
- //super.parseMetricFile();
- return new DuccProcessMemoryPageLoadUsage(pid);
+ return new DuccProcessMemoryPageLoadUsage(collect());
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
-
+ private long collect() throws Exception{
+ // use cgroups manager to collect rss usage
+ return cgm.getUsageForMemoryStat(CgroupMemoryStat.FAULTS,containerId);
+ }
}
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
index fee0ec6..3de1a49 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
@@ -18,23 +18,30 @@
*/
package org.apache.uima.ducc.agent.metrics.collectors;
-import java.io.RandomAccessFile;
import java.util.concurrent.Callable;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat;
import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory;
import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
+import org.apache.uima.ducc.common.utils.DuccLogger;
-public class ProcessResidentMemoryCollector extends AbstractMetricCollector
+public class ProcessResidentMemoryCollector
implements Callable<ProcessResidentMemory> {
- public ProcessResidentMemoryCollector(RandomAccessFile fileHandle,
- int howMany, int offset) {
- super(fileHandle, howMany, offset);
+ private String containerId=null;
+ private CGroupsManager cgm=null;
+
+ public ProcessResidentMemoryCollector(DuccLogger logger, CGroupsManager mgr, String jobId) {
+ this.containerId = jobId;
+ this.cgm = mgr;
}
public ProcessResidentMemory call() throws Exception {
- super.parseMetricFile();
- return new DuccProcessResidentMemory(super.metricFileContents,
- super.metricFieldOffsets, super.metricFieldLengths);
+ return new DuccProcessResidentMemory(collect());
+ }
+ private long collect() throws Exception{
+ // use cgroups manager to collect rss usage
+ return cgm.getUsageForMemoryStat(CgroupMemoryStat.RSS,containerId);
}
}
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
index e93ec65..9ebe421 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
@@ -18,65 +18,34 @@
*/
package org.apache.uima.ducc.agent.metrics.collectors;
-import java.io.RandomAccessFile;
import java.util.concurrent.Callable;
-import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;
-import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat;
+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage;
import org.apache.uima.ducc.common.utils.DuccLogger;
public class ProcessSwapUsageCollector implements
- Callable<ProcessMemoryPageLoadUsage> {
- String pid;
-
- public ProcessSwapUsageCollector(DuccLogger logger, String pid,
- RandomAccessFile fileHandle, int howMany, int offset) {
- this.pid = pid;
+ Callable<ProcessSwapSpaceUsage> {
+ private CGroupsManager cgm=null;
+ private String containerId=null;
+
+ public ProcessSwapUsageCollector(DuccLogger logger, CGroupsManager mgr, String jobId ) {
+ this.containerId = jobId;
+ this.cgm = mgr;
}
- public ProcessMemoryPageLoadUsage call() throws Exception {
+ public ProcessSwapSpaceUsage call() throws Exception {
try {
- return new DuccProcessMemoryPageLoadUsage(pid);
+ return new DuccProcessSwapSpaceUsage(collect());
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
-/*
- private String execTopShell() throws Exception {
- List<String> command = new ArrayList<String>();
- command.add("top");
- command.add("-b");
- command.add("-n");
- command.add("1");
- command.add("-p");
- command.add(pid);
-
- ProcessBuilder builder = new ProcessBuilder(command);
- Process process = builder.start();
- InputStream is = process.getInputStream();
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- String line;
- int count = 0;
- String cpu = "";
- try {
- while ((line = br.readLine()) != null) {
- if (count == 7) {
- String[] values = line.trim().split("\\s+");
- cpu = values[9];
- process.destroy();
- break;
- }
- count++;
- }
- } finally {
- if (is != null) {
- is.close();
- }
- }
- process.waitFor();
- return cpu;
+ private long collect() throws Exception{
+ // use cgroups manager to collect rss usage
+ return cgm.getUsageForMemoryStat(CgroupMemoryStat.SWAP,containerId);
}
- */
}
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
index 90af38d..6b4bce7 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
@@ -31,10 +31,13 @@
import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector;
+import org.apache.uima.ducc.agent.metrics.collectors.ProcessSwapUsageCollector;
import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
+import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory;
import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;
import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
@@ -47,17 +50,12 @@
ProcessMetricsProcessor {
private RandomAccessFile statmFile;
- // private RandomAccessFile nodeStatFile;
private RandomAccessFile processStatFile;
- //private long totalCpuInitUsage = 0;
-
private long previousCPUReadingInMillis = 0;
private long previousSnapshotTime = 0;
- //private boolean initializing = true;
-
private final ExecutorService pool;
private IDuccProcess process;
@@ -76,17 +74,16 @@
private volatile boolean closed = true;
- //private long clockAtStartOfRun = 0;
private long percentCPU = 0;
-
+
+
public LinuxProcessMetricsProcessor(DuccLogger logger,
IDuccProcess process, NodeAgent agent, String statmFilePath,
String nodeStatFilePath, String processStatFilePath,
ManagedProcess managedProcess) throws FileNotFoundException {
this.logger = logger;
statmFile = new RandomAccessFile(statmFilePath, "r");
- // nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r");
processStatFile = new RandomAccessFile(processStatFilePath, "r");
this.managedProcess = managedProcess;
this.agent = agent;
@@ -148,335 +145,269 @@
return true;
}
- public void process(Exchange e) {
- if (closed) { // files closed
+ private long getSwapUsage() throws Exception {
+ long swapUsage = -1;
+ if (agent.useCgroups) {
+
+ String containerId = agent.cgroupsManager
+ .getContainerId(managedProcess);
+
+ ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(
+ logger, agent.cgroupsManager, containerId);
+ logger.info("LinuxProcessMetricsProcessor.getSwapUsage", null,
+ "Fetching Swap Usage PID:" + process.getPID());
+ Future<ProcessSwapSpaceUsage> processFaults = pool
+ .submit(processSwapCollector);
+ swapUsage = processFaults.get().getSwapUsage();
+ logger.info("LinuxProcessMetricsProcessor.getSwapUsage", null,
+ " Process Swap Usage:" + swapUsage);
+ }
+ return swapUsage;
+ }
+
+ private long getFaults() throws Exception {
+ long faults = -1;
+ if (agent.useCgroups) {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ ProcessMajorFaultCollector processFaultsCollector =
+ new ProcessMajorFaultCollector(logger, agent.cgroupsManager, containerId);
+ logger.info("LinuxProcessMetricsProcessor.getFaults",null,"Fetching Page Faults PID:"+process.getPID());
+ Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
+ faults = processFaults.get().getMajorFaults();
+ logger.info(
+ "LinuxProcessMetricsProcessor.getFaults",null," Process Faults (pgpgin):"+faults);
+ }
+ return faults;
+ }
+ private long getRss() throws Exception {
+ long rss = -1;
+ if (agent.useCgroups) {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ ProcessResidentMemoryCollector processRSSCollector =
+ new ProcessResidentMemoryCollector(logger, agent.cgroupsManager, containerId);
+ logger.info("LinuxProcessMetricsProcessor.getRss",null,"Fetching RSS Usage for PID:"+process.getPID());
+ Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
+ rss = processRss.get().get();
+ logger.info(
+ "LinuxProcessMetricsProcessor.getRss",null," Process RSS:"+rss);
+ }
+ return rss;
+ }
+
+ private long getCpuUsage() throws Exception {
+ long cpuUsage=-1;
+ if (agent.useCgroups) {
+ String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+ Future<ProcessCpuUsage> processCpuUsage = null;
+ ProcessCpuUsageCollector processCpuUsageCollector =
+ new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
+ logger.info("LinuxProcessMetricsProcessor.getCpuUsage",null,"Fetching CPU Usage for PID:"+process.getPID());
+ processCpuUsage = pool
+ .submit(processCpuUsageCollector);
+ long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
+ if ( cpuUsageInNanos >= 0 ) {
+ // cpuUsage comes from cpuacct.usage and is in nanos
+ cpuUsage = Math.round( cpuUsageInNanos / 1000000 ); // normalize into millis
+ }
+ logger.info(
+ "LinuxProcessMetricsProcessor.getCpuUsage",null,
+ "CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+cpuUsage);
+ }
+ return cpuUsage;
+ }
+
+ private long getCpuTime( long totalCpuUsageInMillis) throws Exception {
+ long cp = -1;
+ if (managedProcess.getDuccProcess().getProcessState()
+ .equals(ProcessState.Running) ||
+ managedProcess.getDuccProcess().getProcessState()
+ .equals(ProcessState.Initializing)
+ ) {
+ if (agent.useCgroups && totalCpuUsageInMillis != -1) {
+
+ long timeRunning = 1;
+ if ( process.getTimeWindowInit() != null ) {
+ timeRunning = process.getTimeWindowInit().getElapsedMillis();
+ }
+ if ( process.getTimeWindowRun() != null ) {
+ timeRunning += process.getTimeWindowRun().getElapsedMillis();
+ }
+ // normalize time in running state into seconds
+ percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
+ cp = percentCPU;
+ }
+ } else {
+ cp = percentCPU;
+ }
+ return cp;
+ }
+
+ private long getCurrentCpu(long totalCpuUsageInMillis ) {
+ long currentCpu=-1;
+ // publish current CPU usage by computing a delta from the last time
+ // CPU data was fetched.
+ if ( totalCpuUsageInMillis > 0 ) {
+ double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0;
+ double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0;
+ currentCpu = Math.round(100*(millisCPU/millisRun) ) ;
+ previousCPUReadingInMillis = totalCpuUsageInMillis;
+ previousSnapshotTime = System.currentTimeMillis();
+ } else {
+ if (agent.useCgroups && totalCpuUsageInMillis != -1 ) {
+ currentCpu = 0;
+ }
+ }
+ return currentCpu;
+ }
+
+
+ private void killProcsIfExceedingMemoryThreshold() throws Exception {
+ if ( !agent.useCgroups ) {
return;
}
+ if (process.getSwapUsage() > 0
+ && process.getSwapUsage() > managedProcess
+ .getMaxSwapThreshold()) {
+ } else {
+ String containerId = agent.cgroupsManager
+ .getContainerId(managedProcess);
+ String[] cgroupPids = agent.cgroupsManager
+ .getPidsInCgroup(containerId);
+ logger.info("LinuxProcessMetricsProcessor.process",null,"Container ID:"+containerId+" cgroup pids "+cgroupPids.length);
- // if process is stopping or already dead dont collect metrics. The
- // Camel
- // route has just been stopped.
- if (!collectStats(process.getProcessState())) {
- return;
- }
- if (process.getProcessState().equals(ProcessState.Initializing)
- || process.getProcessState().equals(ProcessState.Running))
- try {
+ // Use Memory Guard only if cgroups are disabled and fudge
+ // factor > -1
- // executes script
- // DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up
- // swap used by
- // a process
- long totalSwapUsage = 0;
- long totalFaults = 0;
- long totalCpuUsageInMillis = 0;
- long totalRss = 0;
- //int currentCpuUsage = 0;
- Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null;
- Future<ProcessCpuUsage> processCpuUsage = null;
- String[] cgroupPids = new String[0];
- try {
- String swapUsageScript = System
- .getProperty("ducc.agent.swap.usage.script");
+ if ( fudgeFactor > -1
+ && managedProcess.getProcessMemoryAssignment()
+ .getMaxMemoryWithFudge() > 0) {
- if (agent.useCgroups) {
- String containerId = agent.cgroupsManager
- .getContainerId(managedProcess);
- cgroupPids = agent.cgroupsManager
- .getPidsInCgroup(containerId);
- for (String pid : cgroupPids) {
- // the swap usage script is defined in
- // ducc.properties
- if (swapUsageScript != null) {
- DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
- pid, managedProcess.getOwner(),
- swapUsageScript, logger);
- totalSwapUsage += processSwapSpaceUsage
- .getSwapUsage();
- }
+ long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
- ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
- logger, pid);
- // if process is stopping or already dead dont
- // collect metrics. The Camel
- // route has just been stopped.
- if (!collectStats(process.getProcessState())) {
- return;
- }
-
- processMajorFaultUsage = pool
- .submit(processMajorFaultUsageCollector);
- totalFaults += processMajorFaultUsage.get()
- .getMajorFaults();
- try {
- if (!collectStats(process.getProcessState())) {
- return;
- }
-
- } catch( Exception ee) {
- logger.warn(
- "LinuxProcessMetricsProcessor.process",
- null,ee);
- }
- RandomAccessFile rStatmFile = null;
- try {
- rStatmFile = new RandomAccessFile("/proc/"
- + pid + "/statm", "r");
- } catch (FileNotFoundException fnfe) {
- logger.info(
- "LinuxProcessMetricsProcessor.process",
- null,
- "Statm File:"
- + "/proc/"
- + pid
- + "/statm *Not Found*. Process must have already exited");
- return;
- }
- ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
- rStatmFile, 2, 0);
- // if process is stopping or already dead dont
- // collect metrics. The Camel
- // route has just been stopped.
- if (!collectStats(process.getProcessState())) {
- return;
- }
-
- Future<ProcessResidentMemory> prm = pool
- .submit(collector);
-
- totalRss += prm.get().get();
-
- rStatmFile.close();
- }
-
- ProcessCpuUsageCollector processCpuUsageCollector =
- new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
- processCpuUsage = pool
- .submit(processCpuUsageCollector);
- long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
- if ( cpuUsageInNanos >= 0 ) {
- // cpuUsage comes from cpuacct.usage and is in nanos
- totalCpuUsageInMillis = Math.round( cpuUsageInNanos / 1000000 ); // normalize into millis
- } else {
- totalCpuUsageInMillis = -1;
- }
- logger.info(
- "LinuxProcessMetricsProcessor.process",null,
- "CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+totalCpuUsageInMillis);
- } else {
- if (swapUsageScript != null) {
- DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
- process.getPID(),
- managedProcess.getOwner(), swapUsageScript,
- logger);
- totalSwapUsage = processSwapSpaceUsage
- .getSwapUsage();
- }
-
- ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
- logger, process.getPID());
-
- // if process is stopping or already dead dont collect
- // metrics. The Camel
- // route has just been stopped.
- if (!collectStats(process.getProcessState())) {
- return;
- }
- processMajorFaultUsage = pool
- .submit(processMajorFaultUsageCollector);
- totalFaults = processMajorFaultUsage.get()
- .getMajorFaults();
- // Cgroups are not available so percent CPU is not available
- totalCpuUsageInMillis = -1; // -1 stands for N/A
- //currentCpuUsage = -1; // -1 stands for N/A
-
- ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
- statmFile, 2, 0);
- // if process is stopping or already dead dont collect
- // metrics. The Camel
- // route has just been stopped.
- if (!collectStats(process.getProcessState())) {
- return;
- }
-
- Future<ProcessResidentMemory> prm = pool
- .submit(collector);
- totalRss = prm.get().get();
- }
-
- } catch (Exception exc) {
- if (!collectStats(process.getProcessState())) {
- return;
- }
- logger.error("LinuxProcessMetricsProcessor.process", null,
- exc);
- }
-
- // report cpu utilization while the process is running
- if (managedProcess.getDuccProcess().getProcessState()
- .equals(ProcessState.Running) ||
- managedProcess.getDuccProcess().getProcessState()
- .equals(ProcessState.Initializing)
- ) {
- if (agent.useCgroups && totalCpuUsageInMillis != -1) {
-
- long timeRunning = 1;
- if ( process.getTimeWindowInit() != null ) {
- timeRunning = process.getTimeWindowInit().getElapsedMillis();
- }
- if ( process.getTimeWindowRun() != null ) {
- timeRunning += process.getTimeWindowRun().getElapsedMillis();
- }
- // normalize time in running state into seconds
- percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
- process.setCpuTime( percentCPU );
- } else {
- process.setCpuTime(-1); // -1 stands for N/A
- percentCPU = -1;
- }
- } else {
- // if process is not dead, report the last known percentCPU
- process.setCpuTime(percentCPU);
- }
- // publish current CPU usage by computing a delta from the last time
- // CPU data was fetched.
- if ( totalCpuUsageInMillis > 0 ) {
- double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0;
- double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0;
- process.setCurrentCPU(Math.round(100*(millisCPU/millisRun) ) );
- previousCPUReadingInMillis = totalCpuUsageInMillis;
- previousSnapshotTime = System.currentTimeMillis();
-
- } else {
- if (agent.useCgroups && totalCpuUsageInMillis != -1 ) {
- process.setCurrentCPU(0);
- } else {
- process.setCurrentCPU(-1); // -1 stands for N/A
- }
- }
- logger.info(
- "process",
- null,
- "----------- PID:" + process.getPID()
- + " Total CPU Time (%):" + process.getCpuTime()
- + " Delta CPU Time (%):" +process.getCurrentCPU() );
- // collects process Major faults (swap in memory)
- process.setMajorFaults(totalFaults);
- // Current Process Swap Usage in bytes
- long st = System.currentTimeMillis();
- long processSwapUsage = totalSwapUsage * 1024;
- // collects swap usage from /proc/<PID>/smaps file via a script
- // DUCC_HOME/admin/collect_process_swap_usage.sh
- process.setSwapUsage(processSwapUsage);
- logger.info(
+ logger.trace(
"process",
null,
- "----------- PID:" + process.getPID()
- + " Major Faults:" + totalFaults
- + " Process Swap Usage:" + processSwapUsage
- + " Max Swap Usage Allowed:"
- + managedProcess.getMaxSwapThreshold()
- + " Time to Collect Swap Usage:"
- + (System.currentTimeMillis() - st));
- if (processSwapUsage > 0
- && processSwapUsage > managedProcess
- .getMaxSwapThreshold()) {
- } else {
- // Use Memory Guard only if cgroups are disabled and fudge
- // factor > -1
-
- if (!agent.useCgroups
- && fudgeFactor > -1
- && managedProcess.getProcessMemoryAssignment()
- .getMaxMemoryWithFudge() > 0) {
- // RSS is in terms of pages(blocks) which size is system
- // dependent. Default 4096 bytes
- long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize
- // RSS
- // into
- // MB
- logger.trace(
- "process",
- null,
- "*** Process with PID:"
- + managedProcess.getPid()
- + " Assigned Memory (MB): "
- + managedProcess
- .getProcessMemoryAssignment()
- + " MBs. Current RSS (MB):" + rss);
- // check if process resident memory exceeds its memory
- // assignment calculate in the PM
- if (rss > managedProcess.getProcessMemoryAssignment()
- .getMaxMemoryWithFudge()) {
- logger.error(
- "process",
- null,
- "\n\n********************************************************\n\tProcess with PID:"
- + managedProcess.getPid()
- + " Exceeded its max memory assignment (including a fudge factor) of "
- + managedProcess
- .getProcessMemoryAssignment()
- .getMaxMemoryWithFudge()
- + " MBs. This Process Resident Memory Size: "
- + rss
- + " MBs .Killing process ...\n********************************************************\n\n");
- try {
- managedProcess.kill(); // mark it for death
- process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
- .toString());
- agent.stopProcess(process);
-
- if (agent.useCgroups) {
- for (String pid : cgroupPids) {
- // skip the main process that was just
- // killed above. Only kill
- // its child processes.
- if (pid.equals(managedProcess
- .getDuccProcess().getPID())) {
- continue;
- }
- killChildProcess(pid, "-15");
- }
- }
- } catch (Exception ee) {
- if (!collectStats(process.getProcessState())) {
- return;
- }
- logger.error("process", null, ee);
- }
- return;
- }
- }
-
- }
- // Publish resident memory
- process.setResidentMemory((totalRss * blockSize));
- // dont collect GC metrics for POPs. May not be java or may not
- // be a jmx enabled java process
- if (!process.getProcessType().equals(ProcessType.Pop)) {
- ProcessGarbageCollectionStats gcStats = gcStatsCollector
- .collect();
- process.setGarbageCollectionStats(gcStats);
- logger.info(
+ "*** Process with PID:"
+ + managedProcess.getPid()
+ + " Assigned Memory (MB): "
+ + managedProcess
+ .getProcessMemoryAssignment()
+ + " MBs. Current RSS (MB):" + rss);
+ // check if process resident memory exceeds its memory
+ // assignment calculate in the PM
+ if (rss > managedProcess.getProcessMemoryAssignment()
+ .getMaxMemoryWithFudge()) {
+ logger.error(
"process",
null,
- "PID:" + process.getPID()
- + " Total GC Collection Count :"
- + gcStats.getCollectionCount()
- + " Total GC Collection Time :"
- + gcStats.getCollectionTime());
- }
+ "\n\n********************************************************\n\tProcess with PID:"
+ + managedProcess.getPid()
+ + " Exceeded its max memory assignment (including a fudge factor) of "
+ + managedProcess
+ .getProcessMemoryAssignment()
+ .getMaxMemoryWithFudge()
+ + " MBs. This Process Resident Memory Size: "
+ + rss
+ + " MBs .Killing process ...\n********************************************************\n\n");
+ try {
+ managedProcess.kill(); // mark it for death
+ process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
+ .toString());
+ agent.stopProcess(process);
- } catch (Exception ex) {
- // if the child process is not running dont log the exception.
- if (!collectStats(process.getProcessState())) {
+ if (agent.useCgroups) {
+ for (String pid : cgroupPids) {
+ // skip the main process that was just
+ // killed above. Only kill
+ // its child processes.
+ if (pid.equals(managedProcess
+ .getDuccProcess().getPID())) {
+ continue;
+ }
+ killChildProcess(pid, "-15");
+ }
+ }
+ } catch (Exception ee) {
+ if (!collectStats(process.getProcessState())) {
+ return;
+ }
+ logger.error("process", null, ee);
+ }
return;
}
- logger.error("process", null, ex);
- ex.printStackTrace();
}
+ }
+
+ }
+
+ private ProcessGarbageCollectionStats getGCStats() throws Exception {
+ if (!process.getProcessType().equals(ProcessType.Pop)) {
+ ProcessGarbageCollectionStats gcStats = gcStatsCollector
+ .collect();
+ return gcStats;
+ }
+ return new ProcessGarbageCollectionStats();
+ }
+ public boolean processIsActive() {
+ return process.getProcessState().equals(ProcessState.Starting)
+ ||
+ process.getProcessState().equals(ProcessState.Initializing)
+ ||
+ process.getProcessState().equals(ProcessState.Running);
+ }
+ public void process(Exchange e) {
+ // if process is stopping or already dead dont collect metrics. The
+ // Camel route has just been stopped.
+ if (closed || !processIsActive()) {
+ logger.info("LinuxProcessMetricsProcessor.process", null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Returning");
+ return;
+ }
+ try {
+
+ process.setSwapUsage(getSwapUsage());
+ process.setMajorFaults(getFaults());
+
+ long rssInBytes = getRss();
+ process.setResidentMemory(rssInBytes);
+
+ long totalCpuUsageInMillis = getCpuUsage();
+
+ // set CPU time in terms of %
+ process.setCpuTime(getCpuTime(totalCpuUsageInMillis));
+
+ process.setCurrentCPU(getCurrentCpu(totalCpuUsageInMillis));
+
+ ProcessGarbageCollectionStats gcStats = getGCStats();
+ process.setGarbageCollectionStats(gcStats);
+ logger.info(
+ "process",
+ null,
+ "----------- PID:" + process.getPID() + " RSS:"
+ + ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024))+ " MB" : "-1")
+ + " Total CPU Time (%):" + process.getCpuTime()
+ + " Delta CPU Time (%):" + process.getCurrentCPU()
+ + " Major Faults:" + process.getMajorFaults()
+ + " Process Swap Usage:" + process.getSwapUsage()
+ + " Max Swap Usage Allowed:"
+ + managedProcess.getMaxSwapThreshold()
+ + " Total GC Collection Count :"
+ + gcStats.getCollectionCount()
+ + " Total GC Collection Time :"
+ + gcStats.getCollectionTime());
+
+ killProcsIfExceedingMemoryThreshold();
+
+ } catch (Exception exc) {
+ if (!collectStats(process.getProcessState())) {
+ return;
+ }
+ logger.error("LinuxProcessMetricsProcessor.process", null, exc);
+ }
}
private void killChildProcess(final String pid, final String signal) {
diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java
index 85c4119..2c7ba7d 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java
@@ -18,25 +18,17 @@
*/
package org.apache.uima.ducc.common.agent.metrics.memory;
-import org.apache.uima.ducc.common.node.metrics.ByteBufferParser;
-
-public class DuccProcessResidentMemory extends ByteBufferParser implements
+public class DuccProcessResidentMemory implements
ProcessResidentMemory {
private static final long serialVersionUID = 8563460863767404377L;
- private static final int TOTAL = 0;
- private static final int RESIDENT = 1;
-
- public DuccProcessResidentMemory(byte[] memInfoBuffer,
- int[] memInfoFieldOffsets, int[] memInfoFiledLengths) {
- super(memInfoBuffer, memInfoFieldOffsets, memInfoFiledLengths);
+ long rss;
+
+ public DuccProcessResidentMemory(long rss) {
+ this.rss = rss;
}
public long get() {
- return super.getFieldAsLong(RESIDENT);
- }
-
- public long getTotal() {
- return super.getFieldAsLong(TOTAL);
+ return rss;
}
}
diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
index 99aed58..ff2723a 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
@@ -18,57 +18,13 @@
*/
package org.apache.uima.ducc.common.agent.metrics.swap;
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public class DuccProcessMemoryPageLoadUsage implements
- ProcessMemoryPageLoadUsage {
- String pid;
+public class DuccProcessMemoryPageLoadUsage implements ProcessMemoryPageLoadUsage{
+ long faults;
- public DuccProcessMemoryPageLoadUsage(String pid) {
- this.pid = pid;
+ public DuccProcessMemoryPageLoadUsage(long faults) {
+ this.faults = faults;
}
- public long getMajorFaults() throws Exception {
- return collectProcessMajorFaults();
+ public long getMajorFaults() {
+ return faults;
}
- private long collectProcessMajorFaults() throws Exception {
- String[] command = new String[] {"/bin/ps","-o","maj_flt",pid};
-
- ProcessBuilder builder = new ProcessBuilder(command);
- builder.redirectErrorStream(true);
- Process process = builder.start();
- InputStream is = process.getInputStream();
- if ( is != null ) {
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
- String line;
- int count = 0;
- String faults = null;
- try {
- while ((line = br.readLine()) != null) {
- // skip the header line
- if (count == 1) {
- faults = line.trim();
- }
- count++;
- }
- } finally {
- if (is != null) {
- is.close();
- }
- process.waitFor();
- process.destroy();
- }
- if ( faults != null) {
- return Long.parseLong(faults.trim());
- } else {
- return 0;
- }
-
- }
- return 0;
-
- }
-
}
diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
index b4501e5..1be47b5 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
@@ -19,72 +19,13 @@
package org.apache.uima.ducc.common.agent.metrics.swap;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-
-
-import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.common.utils.Utils;
-
public class DuccProcessSwapSpaceUsage implements ProcessSwapSpaceUsage {
- String pid=null;
- String execScript=null;
- DuccLogger logger=null;
- String[] command;
+ long swapusage;
- public DuccProcessSwapSpaceUsage( String pid, String owner, String execScript, DuccLogger logger) {
- this.pid = pid;
- this.execScript = execScript;
- this.logger = logger;
- String c_launcher_path =
- Utils.resolvePlaceholderIfExists(
- System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());
- command = new String[] { c_launcher_path,
- "-u", owner, "--", execScript, pid };
+ public DuccProcessSwapSpaceUsage( long swapusage) {
+ this.swapusage = swapusage;
}
public long getSwapUsage() {
- long swapusage=0;
- if ( pid != null && execScript != null ) {
- InputStreamReader in = null;
- try {
- ProcessBuilder pb = new ProcessBuilder();
- //String[] command = {execScript,pid};
- pb.command(command); //command);
-
- //logger.info("------------ getSwapUsage-", null, cmd);
- pb.redirectErrorStream(true);
- Process swapCollectorProcess = pb.start();
- in = new InputStreamReader(swapCollectorProcess.getInputStream());
- BufferedReader reader = new BufferedReader(in);
- String line=null;
- boolean skip = true;
- while ((line = reader.readLine()) != null) {
- try {
- if ( line.startsWith("1001")) {
- skip = false;
- continue;
- }
- if (!skip) {
- swapusage = Long.parseLong(line.trim());
- logger.info("getSwapUsage-",null, "PID:"+pid+" Swap Usage:"+line);
- }
- } catch( NumberFormatException e) {
- logger.error("getSwapUsage", null, line);
- }
- }
- } catch( Exception e) {
- logger.error("getSwapUsage", null, e);
- } finally {
- if ( in != null ) {
- try {
- in.close();
- } catch( Exception e) {
- logger.error("getSwapUsage", null, e);
- }
-
- }
- }
- }
return swapusage;
}
diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
index cbc7c0f..e830642 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
@@ -20,5 +20,5 @@
package org.apache.uima.ducc.common.agent.metrics.swap;
public interface ProcessMemoryPageLoadUsage {
- public long getMajorFaults() throws Exception;
+ public long getMajorFaults();
}
diff --git a/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex b/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex
index 40367d2..22a6aec 100644
--- a/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex
+++ b/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex
@@ -106,6 +106,7 @@
\item DUCC run with user {\em ducc} credentials.
\item libcgroup1-0.37+ on SLES, libcgroup-0.37+ on RHEL, and on Ubuntu all of cgroup-bin cgroup-lite libcgroup1
\item along with a customized /etc/cgconfig.conf
+ \item On some flavors of Linux, the cgroup swap accounting is not enabled and swap is reported as N/A. To enable swap accounting add swapaccount=1 kernel parameter. More information on this step is available here: \url{http://unix.stackexchange.com/questions/147158/how-to-enable-swap-accounting-for-memory-cgroup-in-archlinux}.
\end{itemize}
diff --git a/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java b/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
index 1280b0e..4e40c0e 100644
--- a/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
+++ b/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
@@ -74,7 +74,6 @@
System.out.println("Is this nuts or what, no logger!");
}
- forceCpuUsage();
if ( initComplete ) {
logger.log(Level.INFO, "Init bypassed in PID:TID " + pid + ":" + tid + ", already completed. ");
@@ -366,7 +365,10 @@
@Override
public void process(CAS cas) throws AnalysisEngineProcessException
{
- String data = cas.getSofaDataString();
+
+ forceCpuUsage();
+
+ String data = cas.getSofaDataString();
//
// Parms are in a single 4-token string:
diff --git a/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java b/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
index a0044ea..4655965 100644
--- a/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
+++ b/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
@@ -200,6 +200,9 @@
String methodName = "copyInventoryMajorFaults";
logger.trace(methodName, null, messages.fetch("enter"));
process.setMajorFaults(inventoryProcess.getMajorFaults());
+ DuccId jobid = dw.getDuccId();
+ DuccId processId = process.getDuccId();
+ logger.trace(methodName, jobid, processId, "MajofFaults:"+process.getMajorFaults());
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
@@ -208,6 +211,9 @@
String methodName = "copyInventoryRss";
logger.trace(methodName, null, messages.fetch("enter"));
process.setResidentMemory(inventoryProcess.getResidentMemory());
+ DuccId jobid = dw.getDuccId();
+ DuccId processId = process.getDuccId();
+ logger.trace(methodName, jobid, processId, "Rss:"+process.getResidentMemory());
logger.trace(methodName, null, messages.fetch("exit"));
return;
}
@@ -460,7 +466,6 @@
private void copyInventoryProcessState(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
String methodName = "copyInventoryProcessState";
logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
-
if(!compare(inventoryProcess.getProcessState().toString(),process.getProcessState().toString())) {
switch((JobState)job.getStateObject()) {
//case Initializing:
@@ -468,7 +473,6 @@
// break;
default:
process.advanceProcessState(inventoryProcess.getProcessState());
- logger.trace(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
if ( inventoryProcess.getProcessJmxUrl() != null && process.getProcessJmxUrl() == null) {
process.setProcessJmxUrl(inventoryProcess.getProcessJmxUrl());
}
@@ -476,6 +480,7 @@
break;
}
}
+ logger.trace(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
}
diff --git a/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java b/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
index de5dd80..26ce85f 100644
--- a/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
+++ b/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
@@ -86,7 +86,7 @@
// For a registered service, here is my registered id
DuccId id;
- HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
+ // UIMA-5244 HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
String history_key = IStateServices.SvcMetaProps.work_instances.pname();
String implementors_key = IStateServices.SvcMetaProps.implementors.pname();
@@ -136,6 +136,7 @@
long last_runnable = 0;
// The number of instances to maintain live.
+ // Pinger or manual start/stop may make the number of live instances differ from the registered number
int instances = 1;
int registered_instances;
@@ -171,8 +172,9 @@
String[] coOwners = null;
- String archive_key = "true";
- String archive_flag = IStateServices.SvcMetaProps.is_archived.columnName();
+ // Swapped these 2 values UIMA-5244
+ String archive_key = IStateServices.SvcMetaProps.is_archived.columnName();
+ String archive_flag = "true";
//
// Constructor for a registered service
@@ -803,11 +805,13 @@
return;
}
+ /* UIMA-5244 Can remove this as friendly_ids is always empty
String history = meta_props.getStringProperty(history_key, "");
for ( Long id : friendly_ids.keySet() ) {
history = history + " " + id.toString();
}
meta_props.put(history_key, history);
+ */
meta_props.put(archive_key, archive_flag);
try {
@@ -1000,6 +1004,14 @@
inst.update(share_id, host);
}
+ /**
+ *
+ * @param n the new value for the register instance count
+ *
+ * Called when the registration is updated via the CLI
+ * Does NOT update the desired number of running instances as this can be set by the pinger.
+ * Also modifications to the registration should not affect running instances.
+ */
synchronized void updateRegisteredInstances(int n)
{
meta_props.setProperty(IStateServices.SvcMetaProps.instances.pname(), Integer.toString(n));
@@ -1008,7 +1020,11 @@
/**
* @param n is the target number of instances we want running
- * @param update indicates whether tp match registration to the target
+ *
+ * This param dropped??
+ * @param update indicates whether to match registration to the target
+ *
+ * Called by doStart & doStop so may be making the running instances differ from the registered number
*/
synchronized void updateInstances(int n)
{
@@ -1337,8 +1353,12 @@
*/
boolean needNextStart(JobState old, JobState current)
{
- // UIMA-4587
+ // UIMA-4587 & UIMA-5244
String methodName="needNextStart";
+ // Can't do this before we handle the OR publication of "defunct" instances that were running when DUCC was shutdown
+ // They should not be counted as errors when the SM restarts.
+ /* if ( isDeregistered() || !enabled() ) {
+ logger.info(methodName, id, "Bypassing instance start because service is unregistered or disabled.");*/
if ( isDeregistered() ) {
logger.info(methodName, id, "Bypassing instance start because service is unregistered.");
return false;
@@ -2066,13 +2086,12 @@
* Save the id for possible reuse.
*
* It's an error, albeit non-fatal, if the instance is already stashed.
-
- * Note: this might be fatal for the instance, or the service, but it's not fatal for the SM
- * so we simply note it in the log but not crash SM
+ * Can happen if an instance dies while being stopped explicitly
* UIMA-4258
*/
synchronized void stash_instance_id(int instid)
{
+ /* UIMA-5244 No need to warn as is expected.
String methodName = "stash_intance_id";
if ( available_instance_ids.containsKey(instid) ) {
try {
@@ -2083,6 +2102,7 @@
}
return;
}
+ */
available_instance_ids.put(instid, instid);
}
@@ -2096,7 +2116,7 @@
*/
synchronized void conditionally_stash_instance_id(int instid)
{
- if ( available_instance_ids.containsKey(instid) ) {
+ if ( available_instance_ids.containsKey(instid) ) { // Is this necessary? Could simply let the put replace
return;
}
stash_instance_id(instid);
@@ -2106,13 +2126,15 @@
{
String methodName = "start";
- // UIMA-4587
- if ( isDeregistered() ) {
- logger.info(methodName, id, "Bypass start becuase service is unregistered.");
+ // UIMA-4587 & UIMA-5244
+ // Can't do this yet
+/* if ( isDeregistered() || !enabled()) {
+ logger.info(methodName, id, "Bypass start because service is unregistered or disabled.");*/
+ if ( isDeregistered()) {
+ logger.info(methodName, id, "Bypass start because service is unregistered.");
return;
}
-
if ( countImplementors() >= instances ) {
return;
}
@@ -2202,6 +2224,7 @@
synchronized void stopAll()
{
+ instances = 0; // Reduce the target count to 0 to ensure no more are started UIMA-5244
stop(implementors.size());
}
diff --git a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java
index c36a459..29f849f 100644
--- a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java
+++ b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java
@@ -136,7 +136,7 @@
private static Messages messages = Messages.getInstance();
private static DuccId jobid = null;
- // These keys may have large values and be displayed witg Show/Hide buttons.
+ // These keys may have large values and be displayed with Show/Hide buttons.
// ducc.js must be updated if more than 4 are needed (services may have 4)
private final String[] n = {"classpath", "service_ping_classpath", "process_executable_args", "process_jvm_args", "environment"};
private final Set<String> hideableKeys = new HashSet<String>(Arrays.asList(n));
@@ -145,6 +145,8 @@
private enum AllocationType { JD, MR, SPC, SPU, UIMA };
private enum LogType { POP, UIMA };
+ private String notAvailable = "N/A";
+
private DuccAuthenticator duccAuthenticator = DuccAuthenticator.getInstance();
private String duccVersion = duccContext+"/version";
@@ -782,15 +784,22 @@
}
catch(Exception e) {
}
- double swap = process.getSwapUsageMax();
- if((swap * faults) > 0) {
- sb.append("<span class=\"health_red\""+">");
+ if(faults < 0) {
+ sb.append("<span class=\"health_black\""+">");
+ sb.append(notAvailable);
+ sb.append("</span>");
}
else {
- sb.append("<span class=\"health_black\""+">");
+ double swap = process.getSwapUsageMax();
+ if((swap * faults) > 0) {
+ sb.append("<span class=\"health_red\""+">");
+ }
+ else {
+ sb.append("<span class=\"health_black\""+">");
+ }
+ sb.append(faults);
+ sb.append("</span>");
}
- sb.append(faults);
- sb.append("</span>");
break;
}
}
@@ -807,34 +816,47 @@
default:
if(!process.isActive()) {
double swap = process.getSwapUsageMax();
- swap = swap/Constants.GB;
- String displaySwap = formatter.format(swap);
- if(swap > 0) {
- sb.append("<span class=\"health_red\""+">");
+ if(swap < 0) {
+ sb.append("<span class=\"health_black\""+">");
+ sb.append(notAvailable);
+ sb.append("</span>");
}
else {
- sb.append("<span class=\"health_black\""+">");
+ swap = swap/Constants.GB;
+ String displaySwap = formatter.format(swap);
+ if(swap > 0) {
+ sb.append("<span class=\"health_red\""+">");
+ }
+ else {
+ sb.append("<span class=\"health_black\""+">");
+ }
+ sb.append(displaySwap);
+ sb.append("</span>");
}
- sb.append(displaySwap);
- sb.append("</span>");
}
else {
double swap = process.getSwapUsage();
- swap = swap/Constants.GB;
- String displaySwap = formatter.format(swap);
- double swapMax = process.getSwapUsageMax();
- swapMax = swapMax/Constants.GB;
- String displaySwapMax = formatter.format(swapMax);
- sb.append("<span title=\"max="+displaySwapMax+"\" align=\"right\" "+">");
- if(swap > 0) {
- sb.append("<span class=\"health_red\""+">");
+ if(swap < 0) {
+ sb.append("<span class=\"health_black\""+">");
+ sb.append(notAvailable);
+ sb.append("</span>");
}
else {
- sb.append("<span class=\"health_black\""+">");
+ swap = swap/Constants.GB;
+ String displaySwap = formatter.format(swap);
+ double swapMax = process.getSwapUsageMax();
+ swapMax = swapMax/Constants.GB;
+ String displaySwapMax = formatter.format(swapMax);
+ sb.append("<span title=\"max="+displaySwapMax+"\" align=\"right\" "+">");
+ if(swap > 0) {
+ sb.append("<span class=\"health_red\""+">");
+ }
+ else {
+ sb.append("<span class=\"health_black\""+">");
+ }
+ sb.append(displaySwap);
+ sb.append("</span>");
}
- sb.append(displaySwap);
- sb.append("</span>");
- sb.append("</span>");
}
break;
}
@@ -956,20 +978,35 @@
if(process != null) {
if(process.isComplete()) {
double rss = process.getResidentMemoryMax();
- rss = rss/Constants.GB;
- String displayRss = formatter.format(rss);
- sb.append(displayRss);
+ if(rss < 0) {
+ sb.append("<span class=\"health_black\""+">");
+ sb.append(notAvailable);
+ sb.append("</span>");
+ }
+ else {
+ rss = rss/Constants.GB;
+ String displayRss = formatter.format(rss);
+ sb.append(displayRss);
+ }
+
}
else {
double rss = process.getResidentMemory();
- rss = rss/Constants.GB;
- String displayRss = formatter.format(rss);
- double rssMax = process.getResidentMemoryMax();
- rssMax = rssMax/Constants.GB;
- String displayRssMax = formatter.format(rssMax);
- sb.append("<span title=\"max="+displayRssMax+"\" align=\"right\" "+">");
- sb.append(displayRss);
- sb.append("</span>");
+ if(rss < 0) {
+ sb.append("<span class=\"health_black\""+">");
+ sb.append(notAvailable);
+ sb.append("</span>");
+ }
+ else {
+ rss = rss/Constants.GB;
+ String displayRss = formatter.format(rss);
+ double rssMax = process.getResidentMemoryMax();
+ rssMax = rssMax/Constants.GB;
+ String displayRssMax = formatter.format(rssMax);
+ sb.append("<span title=\"max="+displayRssMax+"\" align=\"right\" "+">");
+ sb.append(displayRss);
+ sb.append("</span>");
+ }
}
}
return sb.toString();