[maven-release-plugin]  copy for tag uima-ducc-2.2.0

git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/tags/uima-ducc-2.2.0@1779637 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/RELEASE_NOTES.html b/RELEASE_NOTES.html
index d5d0fbd..ca7b154 100755
--- a/RELEASE_NOTES.html
+++ b/RELEASE_NOTES.html
@@ -79,6 +79,12 @@
 Details are in the INSTALL document and the DuccBook.
 <p>
 </p>
+<h2><a name="limitations">4. Limitations</a></h2>
+On some systems cgroups swap accounting is not enabled and duccmon will show N/A for swap. To 
+confirm, please check memory.stat file in <cgroups base dir>/ducc/ folder. If swap accounting is 
+enabled there should be "swap" property defined. If it's missing, you need to add a kernel parameter 
+swapaccount=1. Details of how to do this can be found <a href="http://unix.stackexchange.com/questions/147158/how-to-enable-swap-accounting-for-memory-cgroup-in-archlinux">here</a>.
+
 
 </body>
 </html>
diff --git a/src/main/resources/default.ducc.properties b/src/main/resources/default.ducc.properties
index 5abd17c..973a5ed 100644
--- a/src/main/resources/default.ducc.properties
+++ b/src/main/resources/default.ducc.properties
@@ -892,9 +892,6 @@
 # the above will exclude node from using cgroups and/or prevent deployment of APs
 ducc.agent.exclusion.file=${DUCC_HOME}/resources/exclusion.nodes
 
-# Define cgroup control subsystems used to enforce fair share on a node
-ducc.agent.launcher.cgroups.subsystems=memory,cpu
-
 # Define script which will collect total swap used by a given process. This
 # script is launched by an agent via duccling and running as the owner
 # of the process.  
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
index 98d929a..48e5124 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
@@ -53,6 +53,7 @@
 import org.apache.uima.ducc.agent.launcher.CGroupsManager;
 import org.apache.uima.ducc.agent.launcher.Launcher;
 import org.apache.uima.ducc.agent.launcher.ManagedProcess;
+import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority;
 import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
 import org.apache.uima.ducc.common.Node;
 import org.apache.uima.ducc.common.NodeIdentity;
@@ -141,6 +142,8 @@
 
   private volatile boolean stopping = false;
 
+  private Object stopLock = new Object();
+  
   private RogueProcessReaper rogueProcessReaper = new RogueProcessReaper(logger, 5, 10);
 
   public volatile boolean useCgroups = false;
@@ -287,6 +290,7 @@
             		}
             	}
             }
+            // scan /proc/mounts for base cgroup dir
             String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");
             
             if ( cgUtilsPath == null ) {
@@ -298,19 +302,10 @@
             	
             } else {
             	logger.info("nodeAgent",null,"Agent found cgroups runtime in "+cgUtilsPath+" cgroups base dir="+cgroupsBaseDir);
-                // get the top level cgroup folder from ducc.properties. If
-                // not defined, use /cgroup/ducc as default
-            	//String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir");
-//                if (cgroupsBaseDir == null) {
-//                  cgroupsBaseDir = "/cgroup/ducc";
-//                }
-                // get the cgroup subsystems. If not defined, default to the
-                // memory and cpu subsystem
-                String cgroupsSubsystems = System.getProperty("ducc.agent.launcher.cgroups.subsystems");
-                if (cgroupsSubsystems == null) {
-                  cgroupsSubsystems = "memory,cpu";
-                }
-        		long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+            	// if cpuacct is configured in cgroups, the subsystems list will be updated
+            	String cgroupsSubsystems = "memory,cpu";
+
+                long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
         		if (configurationFactory.processStopTimeout != null) {
         			maxTimeToWaitForProcessToStop = Long
         					.valueOf(configurationFactory.processStopTimeout);
@@ -1368,35 +1363,17 @@
 	}
   };
   class ProcessRunner implements Runnable {
-	  String pid = "";
-	  SIGNAL signal;
+	  ManagedProcess deployedProcess;
 	  
-	  public ProcessRunner(final String pid, SIGNAL signal ) {
-		  this.pid = pid;
-		  this.signal = signal;
+	  public ProcessRunner(final ManagedProcess deployedProcess) {//final String pid, SIGNAL signal ) {
+		  this.deployedProcess = deployedProcess;
 	  }
 	  public void run() {
-		  String methodName = "ProcesRunner.run";
-            String[] sigTermCmd = {"/bin/kill",signal.get(), pid};
-            ProcessBuilder pb = new ProcessBuilder(sigTermCmd);
-            try {
-            	// launch kill SIGTERM
-            	final Process process = pb.start();
-                Thread inputStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getInputStream()) );
-                inputStreamConsumerThread.start();
-            	
-                Thread errorStreamConsumerThread = new Thread( new AgentStreamConsumer(process.getErrorStream()) );
-                errorStreamConsumerThread.start();
-                
-                process.waitFor();
-            } catch ( Exception e ) {
-            	e.printStackTrace();
-        		logger.warn(methodName, null, e);  
-            }
+		  stopProcess(deployedProcess.getDuccProcess());
  	  }
   }
   
-  private boolean sendSIGTERM(ManagedProcess process) {
+  private boolean runnable(ManagedProcess process) {
 	  return ( process.getDuccProcess().getProcessState().equals(ProcessState.Initializing) ||
 			   process.getDuccProcess().getProcessState().equals(ProcessState.Starting) ||
 			   process.getDuccProcess().getProcessState().equals(ProcessState.Running) );
@@ -1414,16 +1391,16 @@
 	  try {
 	      for (ManagedProcess deployedProcess : deployedProcesses) {
             String pid = deployedProcess.getDuccProcess().getPID();
-            if (pid == null || pid.trim().length() == 0 || !sendSIGTERM(deployedProcess) ) {
+            if (pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
             	continue;
             }
             logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
 	                    + " PID:" + pid+" Sending SIGTERM Process State:"+deployedProcess.getDuccProcess().getProcessState().toString());
 			wait = true;
-            deployedProcess.setStopping();
+            deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
+            // Stop each child process in its own thread to parallelize SIGTERM requests
             ExecutorService executor = Executors.newSingleThreadExecutor();
-            executor.execute( new ProcessRunner(pid,SIGNAL.SIGTERM));
-	      
+            executor.execute( new ProcessRunner(deployedProcess) ); //pid,SIGNAL.SIGTERM));
 	      }
 	      
 	  } catch( Exception e) {
@@ -1431,6 +1408,48 @@
 	  }
 	  return wait;
   }
+      
+      private void killChildProcesses() {
+    	  String methodName = "killChildProcesses";
+    	  
+
+    	  try {
+      	    if ( useCgroups ) {
+    	        logger.info("stop", null, "CgroupsManager.cleanup() before ");
+    	        // Since SIGTERM may not be enough to take down a process, use cgroups to find
+    	        // any process still standing and do hard kill 
+    	        cgroupsManager.cleanup();
+    	        logger.info("stop", null, "CgroupsManager.cleanup() after ");
+    	    } else {
+      	      for (ManagedProcess deployedProcess : deployedProcesses) {
+                  String pid = deployedProcess.getDuccProcess().getPID();
+                  if (pid == null || pid.trim().length() == 0 || !runnable(deployedProcess) ) {
+                  	continue;
+                  }
+                  logger.info(methodName, null, "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+      	                    + " PID:" + pid+" Sending SIGKILL Process State:"+deployedProcess.getDuccProcess().getProcessState().toString());
+      	          ICommandLine cmdLine;
+      	            if (Utils.isWindows()) {
+      	              cmdLine = new NonJavaCommandLine("taskkill");
+      	              cmdLine.addArgument("/PID");
+      	            } else {
+      	              cmdLine = new NonJavaCommandLine("/bin/kill");
+      	              cmdLine.addArgument("-9");
+      	            }
+      	            cmdLine.addArgument(pid);
+
+      	        deployedProcess.setStopping();
+      			deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
+      			
+                  launcher.launchProcess(this, getIdentity(),deployedProcess.getDuccProcess(), cmdLine, this, deployedProcess);
+      	      }
+
+    	    }
+    	  } catch( Exception e) {
+    		logger.warn(methodName, null, e);  
+    	  }
+    	          	  
+      }
   /**
    * Kills a given process
    * 
@@ -1782,122 +1801,75 @@
   }
 
   public void stop() throws Exception {
-    if (stopping) {
-      return;
-    }
-    stopping = true;
-    logger.info("stop", null, "Agent stopping managed processes");
-    // Stop monitor thread watching for Agent pings
-    // if ( nodeMonitor != null ) {
-    // nodeMonitor.shutdown();
-    // }
-    // if ( configurationFactory.getAgentPingDispatcher() != null ) {
-    // configurationFactory.getAgentPingDispatcher().stop();
-    // }
-    
-    // Dispatch SIGTERM to all child processes
-    boolean wait = stopChildProcesses();
-    
-    
-    /*    
-    synchronized (monitor) {
-      Iterator<ManagedProcess> it = deployedProcesses.iterator();
-      while (it.hasNext()) {
-        ManagedProcess mp = it.next();
-        // mp.kill();
-        stopProcess(mp.getDuccProcess());
-      }
-    }
-    logger.info("stop", null, "Agent dispatched STOP to all managed processes");
-
-    // wait until all JPs stop
-    while (true) {
-      try {
-        // break if no processes in the inventory
-        if (deployedProcesses.size() == 0) {
-          break;
+	  synchronized(stopLock) {
+	    logger.info("stop", null, "Agent stop()");
+	    if (stopping) {
+	        return;
         }
-        boolean atLeastOneProcessStillRunning = false;
-        synchronized (monitor) {
-          // check state of each process. If there is a process that
-          // is not dead yet
-          // just wait a little while and check the state again.
-          Iterator<ManagedProcess> pit = deployedProcesses.iterator();
-          // find at least one process that is not dead yet
-          while (pit.hasNext()) {
-            ManagedProcess mp = pit.next();
-            // if the process is not dead,
-            if (!mp.getDuccProcess().getProcessState().equals(ProcessState.Stopped)
-                    && !mp.getDuccProcess().getProcessState().equals(ProcessState.Failed)
-                    && !mp.getDuccProcess().getProcessState().equals(ProcessState.Killed)) {
-              atLeastOneProcessStillRunning = true;
-              break;
-            }
-          }
-        }
-        // if there are no running processes just break from the
-        // 'while(true)' loop
-        if (atLeastOneProcessStillRunning == false) {
-          break;
-        }
-        synchronized (this) {
-          wait(100);
-        }
-      } catch (Exception e) {
-      }
-    }
-*/
-    
-    // Stop publishing inventory. Once the route is down the agent forces last publication
-    // sending an empty process map.
-    configurationFactory.stopInventoryRoute();
+		stopping = true;
+		
+	    // Send an empty process map as the final inventory 
+	    HashMap<DuccId, IDuccProcess> emptyMap = 
+	    		new HashMap<DuccId, IDuccProcess>();
+	    DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity());
+	    inventoryDispatcher.dispatch(duccEvent);
+	    logger.info("stop", null, "Agent published final inventory");
+	    
+	    configurationFactory.stopRoutes();
+	    
+	    logger.info("stop", null, "Agent stopping managed processes");
+	    // Dispatch SIGTERM to all child processes
+	    boolean wait = stopChildProcesses();
+	    
+	    // Stop publishing inventory. Once the route is down the agent forces last publication
+	    // sending an empty process map.
+	    //configurationFactory.stopInventoryRoute();
 
-    if ( wait && deployedProcesses.size() > 0 ) {
-        logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
-    	// wait for awhile 
-      synchronized (this) {
-    	  long waittime = 60000;
-    	  if (configurationFactory.processStopTimeout != null ) {
-  	         try {
-  	    		 waittime = Long.parseLong(configurationFactory.processStopTimeout);
-  	         } catch( NumberFormatException e) {
-  	        	 logger.warn("stop", null, e);
-  	         }
-    	  }
-         logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
-         wait(waittime);
-      }
-    }
+	    if ( wait && deployedProcesses.size() > 0 ) {
+	        logger.info("stop", null, "Agent Sent SIGTERM to ALL Child Processes - Number of Deployed Processes:"+deployedProcesses.size());
+	    	// wait for awhile 
+	      synchronized (this) {
+	    	  long waittime = 60000;
+	    	  if (configurationFactory.processStopTimeout != null ) {
+	  	         try {
+	  	    		 waittime = Long.parseLong(configurationFactory.processStopTimeout);
+	  	         } catch( NumberFormatException e) {
+	  	        	 logger.warn("stop", null, e);
+	  	         }
+	    	  }
+	         logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory.");
+	         wait(waittime);
+	      }
+	    }
 
-    // Send an empty process map as the final inventory 
-    HashMap<DuccId, IDuccProcess> emptyMap = 
-    		new HashMap<DuccId, IDuccProcess>();
-    DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap,getLastORSequence(), getIdentity());
-    inventoryDispatcher.dispatch(duccEvent);
-    logger.info("stop", null, "Agent published final inventory");
-
-    // Since SIGTERM may not be enough to take down a process, use cgroups to find
-    // any process still standing and do hard kill 
-    cgroupsManager.cleanup();
-   
-    // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry
-    // mode trying to recover a connection
-    Thread t = new Thread( new Runnable() {
-    	public void run() {
-    		try {
-    		    logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) ");
-    			Thread.sleep(10000);
-    		} catch(Exception e ) {
-    		    logger.info("stop", null, e);
-    		} finally{
-    		    logger.info("stop", null, "Agent calling System.exit(1) ... ");
-    			System.exit(1);
-    		}
-    	}
-    });
-    t.start();
-    
-    super.stop();
+	    // send kill -9 to any child process still running
+	    killChildProcesses();
+/*
+	    // Self destruct thread in case we loose AMQ broker and AMQ listener gets into retry
+	    // mode trying to recover a connection
+	    Thread t = new Thread( new Runnable() {
+	    	public void run() {
+	    		try {
+	    		    logger.info("stop", null, "Agent waiting for additional 10 seconds to allow for a clean shutdown before terminating itself via System.exit(1) ");
+	    			Thread.sleep(10000);
+	    		} catch(Exception e ) {
+	    		    logger.info("stop", null, e);
+	    		} finally{
+	    		    logger.info("stop", null, "Agent calling System.exit(1) ... ");
+	    			System.exit(1);
+	    		}
+	    	}
+	    });
+	    t.start();
+	    t.join(10000);
+	    super.stop();
+	    logger.info("stop", null, "Reaper thread finished - calling super.stop()");
+	  }
+*/	
+	    stopLock.wait(2000);
+	    super.stop();
+	    logger.info("stop", null, "Reaper thread finished - calling super.stop()");
+	  }
   }
 
   public Future<?> getDeployedJPFuture(IDuccId duccId) {
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
index 82406fd..ab303ec 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/config/AgentConfiguration.java
@@ -23,10 +23,13 @@
 
 import javax.annotation.PostConstruct;
 
+import org.apache.activemq.camel.component.ActiveMQComponent;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.dataformat.xstream.XStreamDataFormat;
 import org.apache.camel.impl.DefaultClassResolver;
@@ -471,19 +474,19 @@
       camelContext.addRoutes(metricsRouteBuilder);
   
   }
-  
+  public void stopRoutes() throws Exception {
+	  camelContext.stop();
+	  logger.info("AgentConfigureation.stopRoutes", null,"Camel Context stopped");
+	  
+  }
   @Bean
   @PostConstruct
-//  public NodeMetricsProcessor nodeMetricsProcessor(NodeAgent agent) throws Exception {
   public NodeMetricsProcessor nodeMetricsProcessor() throws Exception {
     if (Utils.isLinux()) {
-//      return new LinuxNodeMetricsProcessor(agent, "/proc/meminfo", "/proc/loadavg");
    	  nodeMetricsProcessor = new LinuxNodeMetricsProcessor();
 	  ((LinuxNodeMetricsProcessor)nodeMetricsProcessor).initMemInfo("/proc/meminfo");
 	  ((LinuxNodeMetricsProcessor)nodeMetricsProcessor).initLoadAvg("/proc/loadavg");
-  	  //agent, "/proc/meminfo", "/proc/loadavg");
     } else {
-//        return new DefaultNodeMetricsProcessor(agent);
     	nodeMetricsProcessor = new DefaultNodeMetricsProcessor();
     }
     return nodeMetricsProcessor;
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
index 421d7bf..0e8b3ea 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsManager.java
@@ -25,6 +25,9 @@
 import java.io.FileReader;

 import java.io.InputStream;

 import java.io.InputStreamReader;

+import java.nio.charset.Charset;

+import java.nio.file.Files;

+import java.nio.file.Paths;

 import java.util.ArrayList;

 import java.util.HashSet;

 import java.util.LinkedHashSet;

@@ -57,8 +60,6 @@
 	private static final String CGDuccMemoryPath = "/memory/"+SYSTEM+"/";

 	private static final String CGDuccCpuPath = "/cpu/"+SYSTEM+"/";

 	private static final String CGProcsFile = "/cgroup.procs";

-//	private static final String CGDuccCpuAcctPath = "/cpu/"+SYSTEM+"/";

-	

 	// legacy means that the cgonfig points to <cgroup location>/ducc

 	private boolean legacyCgConfig = false;

 	

@@ -81,6 +82,9 @@
 	// stores cgroup utils location like cgcreate, cgset, etc

 	private String cgroupUtilsDir=null;

 	// stores comma separated list of subsystems like cpu,memory

+	

+	private boolean cpuInfoSymlinked = true;

+

 	private String cgroupSubsystems = ""; // comma separated list of subsystems

 	private long retryMax = 4;

 	private long delayFactor = 2000;  // 2 secs in millis

@@ -88,29 +92,37 @@
     private static  String fetchCgroupsBaseDir(String mounts) {

   	  String cbaseDir=null;

   	  BufferedReader br = null;

+  	  List<String> cgroupsEntries = new ArrayList<String>();

   	  try {

-  		  FileInputStream fis = new FileInputStream(mounts);

-  			//Construct BufferedReader from InputStreamReader

-  		  br = new BufferedReader(new InputStreamReader(fis));

-  		 

-  		String line = null;

-  		while ((line = br.readLine()) != null) {

-  			System.out.println(line);

-  			if ( line.trim().startsWith("cgroup") ) { 

+  	  	  List<String> lines = Files.readAllLines(Paths.get(mounts),Charset.defaultCharset());

+  	  	  for( String line : lines ) {

+  	  		// trim the list to just cgroups  

+  	  		if ( line.trim().startsWith("cgroup") ) {

+  	  			cgroupsEntries.add(line);

+  	  		}

+  	  	  }

+  	      // check if this is legacy cgroups installation looking like

+		  // cgroup /cgroup cgroup rw,relatime,memory,cpuacct,cpu 0 0

+  	  	  if ( cgroupsEntries.size() == 1) {

+				String[] cgroupsInfo = cgroupsEntries.get(0).split(" ");

+				// return the mount point minus the memory part

+				cbaseDir = cgroupsInfo[1].trim();

+  	  	  } else {

+  	  	      // check if this is recent cgroups installation looking like

+			  // cgroup /cgroup/memory cgroup rw,relatime,memory 0 0

+              //     OR

+			  // cgroup /sys/fs/cgroup/memory cgroup rw ...

+				

+			  // return the mount point minus the memory part

+  	  		  for(String line : cgroupsEntries) {

   				String[] cgroupsInfo = line.split(" ");

-  				if ( cgroupsInfo[1].indexOf("/memory") > -1 ) {

-  					// return the mount point minus the memory part

+  			    if ( cgroupsInfo[1].indexOf("/memory") != -1 ) {

   					cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory") );

-  				} else if ( cgroupsInfo[1].indexOf("cpu") > -1){

-  					// return the mount point minus the memory part

-  					cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/cpu") );

-  				} else {

-  					cbaseDir = cgroupsInfo[1].trim();

-  				}

-  			    break;

-  			}

-  		}

-  		 

+  			    	break;

+  			    }

+  	  		  }

+  	  	  }

+		 

   	  } catch( Exception e) {

   		  e.printStackTrace();

   	  } finally {

@@ -126,45 +138,81 @@
 	 * @param args

 	 */

 	public static void main(String[] args) {

-		try {

+    	String cgroupsUtilsDirs = "/usr/bin,/bin"; //System.getProperty("ducc.agent.launcher.cgroups.utils.dir");

+    	String cgUtilsPath = null;

+    	DuccLogger logger = DuccLogger.getLogger(CGroupsManager.class, "CGroupsManager");

+    	//boolean useCgroups = false;

 

-//			String cgBaseDir = "/cgroup"; //"/sys/fs/cgroup";//fetchCgroupsBaseDir("/proc/mounts");

-			String cgBaseDir = "/sys/fs/cgroup";//fetchCgroupsBaseDir("/proc/mounts");

-			

-			CGroupsManager cgMgr = new CGroupsManager("/usr/bin",cgBaseDir, "memory",

-					null, 10000);

-			System.out.println("Cgroups Accounting Enabled:"+cgMgr.isCpuReportingEnabled());

-			

-			/*

-			cgMgr.validator(cgBaseDir, "test2", System.getProperty("user.name"),false)

-          .cgcreate();

+    	String[] paths = cgroupsUtilsDirs.split(",");

+    	for( String path : paths ) {

+    		File file = new File(path.trim()+"/cgexec");

+    		if ( file.exists() ) {

+    			cgUtilsPath = path;

+    			break;

+    		}

+    	}

 

-			System.out.println("Cgroups Installed:"

-					+ cgMgr.cgroupExists("/cgroup/ducc"));

-			Set<String> containers = cgMgr.collectExistingContainers();

-			for (String containerId : containers) {

-				System.out.println("Existing CGroup Container ID:"

-						+ containerId);

-			}

-			cgMgr.createContainer(args[0], args[2], cgMgr.getUserGroupName(args[2]),true);

-			cgMgr.setContainerMaxMemoryLimit(args[0], args[2], true,

-					Long.parseLong(args[1]));

-			synchronized (cgMgr) {

-				cgMgr.wait(60000);

-			}

-			cgMgr.destroyContainer(args[0], args[2], NodeAgent.SIGKILL);

-*/

-		} catch (Exception e) {

-			e.printStackTrace();

-		}

+    	String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");

+        

+        if ( cgUtilsPath == null ) {

+            System.out.println("------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties");

+        } else if ( cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) {

+            System.out.println("------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts");

+        	

+        } else {

+        	System.out.println("Agent found cgroups runtime in "+cgUtilsPath+" cgroups base dir="+cgroupsBaseDir);

+

+        	String cgroupsSubsystems = "memory,cpu";

+

+            long maxTimeToWaitForProcessToStop = 60000; // default 1 minute

+

+            CGroupsManager cgroupsManager = 

+            		new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop);

+            // check if cgroups base directory exists in the filesystem

+            // which means that cgroups

+            // and cgroups convenience package are installed and the

+            // daemon is up and running.

+              

+            try {

+               if (cgroupsManager.cgroupExists(cgroupsBaseDir)) {

+                    	System.out.println("Agent found cgroup base directory in "+cgroupsBaseDir);

+

+                   String containerId = "test";

+            	  // validate cgroups by creating a dummy cgroup. The code checks if cgroup actually got created by

+            	  // verifying existence of test cgroup file. The second step in verification is to check if 

+            	  // CPU control is working. Configured in cgconfig.conf, the CPU control allows for setting 

+            	  // cpu.shares. The code will attempt to set the shares and subsequently tries to read the

+            	  // value from cpu.shares file to make sure the values match. Any exception in the above steps

+            	  // will cause cgroups to be disabled.

+            	  //

+            	  cgroupsManager.validator(cgroupsBaseDir, containerId, System.getProperty("user.name"),false)

+            	              .cgcreate()

+            	              .cgset(100);   // write cpu.shares=100 and validate

+            	  

+            	  // cleanup dummy cgroup

+            	  cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"), -9);

+               }

+                    

+              } catch( Exception ee) {

+            	  ee.printStackTrace();

+              }

+        }

 	}

-	public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String cgroupSubsystems,

+	public CGroupsManager(String cgroupUtilsDir, String cgroupBaseDir, String subsystems,

 			DuccLogger agentLogger, long maxTimeToWaitForProcessToStop) {

 		this.cgroupUtilsDir = cgroupUtilsDir;

 		this.cgroupBaseDir = cgroupBaseDir;

-		this.cgroupSubsystems = cgroupSubsystems;

+		this.cgroupSubsystems = subsystems;

 		this.agentLogger = agentLogger;

 		this.maxTimeToWaitForProcessToStop = maxTimeToWaitForProcessToStop;

+

+		// on some systems cpu and cpuacct may be linked to the same directory. In such

+		// cases we need adjust cgdelete command to only include memory,cpu as submodules:

+		// 

+		

+		

+		

+		

 		// determine what cgroup base location should be. For legacy cgconfig

 		// it will be <cgroup folder>/ducc

 		try {

@@ -183,6 +231,37 @@
 			// if there is an error here, the new cgconfig is assumed and subject

 			// to additional testing on agent startup.

 		}

+		cpuInfoSymlinked = symbolicLinksForCpu();

+		String location = getCGroupLocation("cpuacct").trim();

+		if ( !legacyCgConfig ) {

+			if (!location.endsWith(System.getProperty("file.separator") )) {

+				location += System.getProperty("file.separator");

+			} 

+			location += SYSTEM+System.getProperty("file.separator");

+		}

+		System.out.println("------------- Location:"+location);

+		/*

+		if ( !legacyCgConfig ) {

+			location = cgroupBaseDir; //SYSTEM+System.getProperty("file.separator");

+			if ( !location.endsWith(System.getProperty("file.separator"))) {

+				location = location + System.getProperty("file.separator");

+			}

+			location += SYSTEM+System.getProperty("file.separator");

+		} else {

+			location = getCGroupLocation("cpuacct").trim();

+			if ( !location.endsWith(System.getProperty("file.separator"))) {

+				location = location + System.getProperty("file.separator");

+			}

+			

+		}

+*/

+		

+		

+		File cpuacctUsageFile = new File(location+"cpuacct.usage");

+		if ( cpuacctUsageFile.exists()) {

+			System.out.println("Got cpuacct.usage file");

+			this.cgroupSubsystems += ",cpuacct";

+		} 

 	}

 

 	/**

@@ -208,7 +287,75 @@
 		} 

 		return location + subsystem;

 	}

-	

+	private boolean isCpuSubmodule(String[] parts, String submoduleName) {

+		return ( parts.length > 10 && parts[10].equals(submoduleName) );

+	}

+	private boolean symbolicLinksForCpu() {

+//	   	File f = new File("/sys/fs/cgroup");

+	   	File f = new File(cgroupBaseDir);

+	   	if ( !f.exists()) {

+	   		return false;

+	   	}

+    	InputStreamReader isr = null;

+    	BufferedReader reader = null;

+    	try {

+//    		String cmd[] = {"/usr/bin/ls","-l","/sys/fs/cgroup"};

+    		String cmd[] = {"/bin/ls","-l",cgroupBaseDir};

+		    StringBuffer sb = new StringBuffer();

+		    for (String s : cmd) {

+			   sb.append(s).append(" ");

+			}

+			agentLogger.info("symbolicLinksForCpu", null, "Launching Process - Commandline:"+sb.toString());

+

+    		ProcessBuilder processLauncher = new ProcessBuilder();

+			processLauncher.command(cmd);

+			processLauncher.redirectErrorStream(true);

+			java.lang.Process process = processLauncher.start();

+			isr = new InputStreamReader(process.getInputStream());

+			reader = new BufferedReader(isr);

+			String line;

+			String cpuLinkDir = "";

+			String cpuacctLinkDir = "";

+			

+			agentLogger.info("symbolicLinksForCpu", null, "Consuming Process Streams");

+			while ((line = reader.readLine()) != null) {

+				agentLogger.info("symbolicLinksForCpu", null, ">>>>" + line);

+				//groupName = line.trim();

+				String parts[] = line.split(" ");

+				if ( parts.length > 0 && parts[0].charAt(0)=='l') {  // link

+					if ( isCpuSubmodule(parts,"cpu") ) {

+						cpuLinkDir = parts[parts.length-1];

+					} else if (isCpuSubmodule(parts,"cpuacct")	) {

+					    cpuacctLinkDir = parts[parts.length-1];

+					}

+					// check if we got what we were looking for. If so, no need to iterate more

+					if ( cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0 ) {

+						break;

+					}

+			    }

+			}

+			

+			agentLogger.info("symbolicLinksForCpu", null, "Waiting for Process to Exit");

+			int retCode = process.waitFor();

+			agentLogger.info("symbolicLinksForCpu", null, "Pocess Exit Code="+retCode);

+			if ( cpuLinkDir.length() > 0 && cpuacctLinkDir.length() > 0) {

+				if ( cpuLinkDir.trim().equals(cpuLinkDir.trim())) {

+					return true;  // both cpu and cpuacct link to the same dir

+				}

+			}

+    	    		

+    	} catch( Exception e) {

+			agentLogger.error("symbolicLinksForCpu", null, e);

+    	

+    	} finally {

+    		if ( reader != null ) {

+    			try {

+        			reader.close();

+    			} catch(Exception e) {}

+    		}

+    	}

+    	return false;

+	}

 	public void configure(NodeAgent agent ) {

 		if ( agent != null ) {

 			if ( agent.configurationFactory.maxRetryCount != null ) {

@@ -509,6 +656,17 @@
 		}

 		return location+id+"cpuacct.usage";

 	}

+	private String composeMemoryStatFileName(String id) {

+		String location = getCGroupLocation("memory").trim();

+		if ( !location.endsWith(System.getProperty("file.separator"))) {

+			location = location + System.getProperty("file.separator");

+		}

+		if ( !legacyCgConfig ) {

+			location += SYSTEM+System.getProperty("file.separator");

+		}

+		return location+id+"memory.stat";

+	}

+

 	public boolean isCpuReportingEnabled() {

 //		String file = getCGroupLocation("cpuacct")+System.getProperty("file.separator")+"cpuacct.usage";

 	

@@ -551,7 +709,63 @@
 			usage = -1;  // cgroups accounting not configured

 		}

 		return usage;

+

 	}

+	public enum CgroupMemoryStat {

+		RSS("rss"),

+		SWAP("swap"),

+		FAULTS("pgpgin");

+		

+		String key;

+		

+		private CgroupMemoryStat(String aKey) {

+			this.key = aKey;

+		}

+		

+		public String getKey() {

+			return this.key;

+		}

+		

+	}

+	public long getUsageForMemoryStat(CgroupMemoryStat stat, String containerId ) throws Exception {

+		long usage = -1;

+

+		if (!containerId.endsWith(System.getProperty("file.separator"))) {

+			containerId = containerId + System.getProperty("file.separator");

+		}

+		String file = composeMemoryStatFileName(containerId.trim());

+		agentLogger.info("getUsageForMemoryStat", null, "MEMORY.STAT file:"+file);

+		File f = new File(file);

+		if ( f.exists() ) {

+			InputStreamReader isr = new InputStreamReader(new FileInputStream(f));

+			BufferedReader br = new BufferedReader(isr);

+			String line;

+			try {

+				while ((line = br.readLine()) != null) {

+					agentLogger.trace("getUsageForMemoryStat", null, "MEMORY.STAT Line:"+line);

+					if ( line.startsWith(stat.getKey())) {

+						usage = Long.parseLong(line.trim().split(" ")[1]);

+						break;

+					}

+				}

+			} catch ( Exception e) {

+				agentLogger.error("getUsageForMemoryStat", null, e);

+			}

+			finally {

+				if (isr != null) {

+					isr.close();

+				}

+				agentLogger.trace("getUsageForMemoryStat", null, "Done Reading memory.stat file:"+file);

+			}

+		} else {

+			agentLogger.info("getUsageForMemoryStat", null, "MEMORY.STAT file:"+file+" Not Found - Process RSS Usage is Unavailable");

+

+			usage = -1;  // cgroups accounting not configured

+		}

+		return usage;

+	}

+

+	

 	/**

 	 * Sets the max memory use for an existing cgroup container.

 	 * 

@@ -700,6 +914,39 @@
 		}

 		return childCount;

 	}

+	private String adjustSubsystems() {

+		

+		

+		// if cpu and cpuacct are sym linked to the same dir, remove cpuacct part

+		// from the submodule list as it causes the cgdelete to throw an error.

+		if ( cpuInfoSymlinked && cgroupSubsystems.indexOf(",cpuacct") > -1 ) { 

+			return cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );

+			

+			

+			/*

+			StringBuffer sb = new StringBuffer();

+			if ( cgroupSubsystems.indexOf("cpuacct") > -1 ) {

+				String[] subsystems = cgroupSubsystems.trim().split(",");

+				//StringUtils.j

+				Iterator<String> it = Arrays.asList(subsystems).iterator();

+				if ( it.hasNext()) {

+					while(it.hasNext() ) {

+						String subsystem = it.next();

+						if ( !"cpuacct".equals(subsystem)) {

+							sb.append(subsystem);

+							if ( it.hasNext()) {

+								sb.append(",");

+							}

+						}

+					}

+				}

+				return sb.toString();

+			}

+			*/

+		}

+		

+		return cgroupSubsystems;

+	}

 	/**

 	 * Removes cgroup container with a given id. Cgroups are implemented as a

 	 * virtual file system. All is needed here is just rmdir.

@@ -733,7 +980,10 @@
 				}

 				// Any process remaining in a cgroup will be killed hard

 				killChildProcesses(containerId, userId, NodeAgent.SIGKILL);

-				String[] command = new String[] { cgroupUtilsDir+"/cgdelete",cgroupSubsystems + ":"+SYSTEM+"/" + containerId };

+				//String subsystems =cgroupSubsystems.substring(0,cgroupSubsystems.indexOf(",cpuacct") );

+				String subsystems = adjustSubsystems();

+

+				String[] command = new String[] { cgroupUtilsDir+"/cgdelete",subsystems + ":"+SYSTEM+"/" + containerId };

 				int retCode = launchCommand(command);

 				if ( cgroupExists(getCGroupLocation(CGDuccMemoryPath) + containerId)) {

 					agentLogger.info("destroyContainer", null, "Failed to remove Container "+containerId+" Using cgdelete command. Exit code:"+retCode);

@@ -742,12 +992,6 @@
 					containerIds.remove(containerId);

 					return true;

 				}

-//				if (retCode == 0) {

-//					containerIds.remove(containerId);

-//					return true;

-//				} else {

-//					return false;

-//				}

 			}

 			return true; // nothing to do, cgroup does not exist

 		} catch (Exception e) {

diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
index f372b44..fca5bbb 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/CGroupsTest.java
@@ -95,12 +95,7 @@
         if (cgroupsBaseDir == null) {
           cgroupsBaseDir = "/cgroup/ducc";
         }
-        // get the cgroup subsystems. If not defined, default to the
-        // memory and cpu subsystem
-        String cgroupsSubsystems = System.getProperty("ducc.agent.launcher.cgroups.subsystems");
-        if (cgroupsSubsystems == null) {
-          cgroupsSubsystems = "memory,cpu";
-        }
+        String cgroupsSubsystems = "memory,cpu";
 		long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
 
 		cgroupsManager = 
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
index 48a1f0f..a1d8793 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/DuccCommandExecutor.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.uima.ducc.agent.NodeAgent;
+import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority;
 import org.apache.uima.ducc.common.IDuccUser;
 import org.apache.uima.ducc.common.container.FlagsHelper;
 import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -44,6 +45,7 @@
 import org.apache.uima.ducc.transport.event.common.IDuccProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
 import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IProcessState;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 import org.apache.uima.ducc.transport.event.common.ITimeWindow;
 import org.apache.uima.ducc.transport.event.common.TimeWindow;
@@ -303,101 +305,119 @@
 		}
 	}
 
+	private boolean processInRunningOrInitializingState() {
+		return ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+				.equals(ProcessState.Running) ||
+				((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+				.equals(ProcessState.Initializing)	
+				); 	
+	}
 	private void stopProcess(ICommandLine cmdLine, String[] cmd)
 			throws Exception {
 		String methodName = "stopProcess";
-
-
-		Future<?> future = ((ManagedProcess) managedProcess).getFuture();
-		if (future == null) {
-		    throw new Exception(
-					"Future Object not Found. Unable to Stop Process with PID:"
-					+ ((ManagedProcess) managedProcess).getPid());
-		}
-		// for stop to work, PID must be provided
-		if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null
-		    || ((ManagedProcess) managedProcess).getDuccProcess().getPID()
-		    .trim().length() == 0) {
-		    throw new Exception(
-					"Process Stop Command Failed. PID not provided.");
-		}
-		long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
-		if (super.agent.configurationFactory.processStopTimeout != null) {
-		    maxTimeToWaitForProcessToStop = Long
-			.valueOf(super.agent.configurationFactory.processStopTimeout);
-		}
-		try {
-		    // NEW Code
-		    logger.info(methodName,
-				((ManagedProcess) super.managedProcess).getDuccId(),
-				">>>>>>>>>>>>>>> Stopping Process:"
-				+ ((ManagedProcess) managedProcess).getPid());
-		    ICommandLine cmdL;
-		    if (Utils.isWindows()) {
-			cmdL = new NonJavaCommandLine("taskkill");
-			cmdL.addArgument("/PID");
-		    } else {
-			cmdL = new NonJavaCommandLine("/bin/kill");
-			cmdL.addArgument("-15");
-		    }
-		    cmdL.addArgument(((ManagedProcess) managedProcess)
-				     .getDuccProcess().getPID());
-
-		    String[] sigTermCmdLine = getDeployableCommandLine(cmdL,
-								       new HashMap<String, String>());
-		    doExec(new ProcessBuilder(sigTermCmdLine), sigTermCmdLine,
-			   true);
-
-		    try {
-			logger.info(methodName,
-				    ((ManagedProcess) super.managedProcess)
-				    .getDuccId(),
-				    "------------ Agent Starting Killer Timer Task For Process with PID:"
-				    + ((ManagedProcess) managedProcess)
-				    .getDuccProcess().getPID()
-				    + " Process State: "
-				    + ((ManagedProcess) managedProcess)
-				    .getDuccProcess()
-				    .getProcessState());
-			future.get(maxTimeToWaitForProcessToStop,
-				   TimeUnit.MILLISECONDS);
-
-		    } catch(TimeoutException te) {
-			    
-			    logger.info(
-					methodName,
-					((ManagedProcess) super.managedProcess)
-					.getDuccId(),
-					"------------ Agent Timed-out Waiting for Process with PID:"
-					+ ((ManagedProcess) managedProcess)
-					.getDuccProcess().getPID()
-					+ " to Stop. Process State:"
-					+ ((ManagedProcess) managedProcess)
-					.getDuccProcess()
-					.getProcessState()
-					+ " .Process did not stop in allotted time of "
-					+ maxTimeToWaitForProcessToStop
-					+ " millis");
+		
+		if ( processInRunningOrInitializingState() ) {
+			Future<?> future = ((ManagedProcess) managedProcess).getFuture();
+			if (future == null) {
+			    throw new Exception(
+						"Future Object not Found. Unable to Stop Process with PID:"
+						+ ((ManagedProcess) managedProcess).getPid());
+			}
+			// for stop to work, PID must be provided
+			if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null
+			    || ((ManagedProcess) managedProcess).getDuccProcess().getPID()
+			    .trim().length() == 0) {
+			    throw new Exception(
+						"Process Stop Command Failed. PID not provided.");
+			}
+			try {
+			    // NEW Code
 			    logger.info(methodName,
-					((ManagedProcess) super.managedProcess)
-					.getDuccId(),
-					">>>>>>>>>>>>>>> Killing Process:"
-					+ ((ManagedProcess) managedProcess)
-					.getDuccProcess().getPID()
-					+ " .Process State:"
-					+ ((ManagedProcess) managedProcess)
-					.getDuccProcess()
-					.getProcessState());
-			    doExec(new ProcessBuilder(cmd), cmd, true);
+					((ManagedProcess) super.managedProcess).getDuccId(),
+					">>>>>>>>>>>>>>> Stopping Process:"
+					+ ((ManagedProcess) managedProcess).getPid());
+			    ICommandLine cmdL;
+			    if (Utils.isWindows()) {
+				cmdL = new NonJavaCommandLine("taskkill");
+				cmdL.addArgument("/PID");
+			    } else {
+				cmdL = new NonJavaCommandLine("/bin/kill");
+				cmdL.addArgument("-15");
+			    }
+			    cmdL.addArgument(((ManagedProcess) managedProcess)
+					     .getDuccProcess().getPID());
 
+			    String[] sigTermCmdLine = getDeployableCommandLine(cmdL,
+									       new HashMap<String, String>());
+			    doExec(new ProcessBuilder(sigTermCmdLine), sigTermCmdLine,
+				   true);
+                // if agent receives admin STOP request, all managed processes should
+			    // be stopped without each waiting for 60 secs. The agent
+			    // blasts SIGTERM in parallel to all running child processes
+			    if (!StopPriority.DONT_WAIT.equals(((ManagedProcess) managedProcess).getStopPriority()) ) {
+					long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
+					if (super.agent.configurationFactory.processStopTimeout != null) {
+					    maxTimeToWaitForProcessToStop = Long
+						.valueOf(super.agent.configurationFactory.processStopTimeout);
+					}
+
+				    try {
+					logger.info(methodName,
+						    ((ManagedProcess) super.managedProcess)
+						    .getDuccId(),
+						    "------------ Agent Starting Killer Timer Task For Process with PID:"
+						    + ((ManagedProcess) managedProcess)
+						    .getDuccProcess().getPID()
+						    + " Process State: "
+						    + ((ManagedProcess) managedProcess)
+						    .getDuccProcess()
+						    .getProcessState());
+					future.get(maxTimeToWaitForProcessToStop,
+						   TimeUnit.MILLISECONDS);
+
+				    } catch(TimeoutException te) {
+				    	if ( !((ManagedProcess) managedProcess).
+				    			getDuccProcess().getProcessState().equals(ProcessState.Stopped) ) {
+						    logger.info(
+									methodName,
+									((ManagedProcess) super.managedProcess)
+									.getDuccId(),
+									"------------ Agent Timed-out Waiting for Process with PID:"
+									+ ((ManagedProcess) managedProcess)
+									.getDuccProcess().getPID()
+									+ " to Stop. Process State:"
+									+ ((ManagedProcess) managedProcess)
+									.getDuccProcess()
+									.getProcessState()
+									+ " .Process did not stop in allotted time of "
+									+ maxTimeToWaitForProcessToStop
+									+ " millis");
+							    logger.info(methodName,
+									((ManagedProcess) super.managedProcess)
+									.getDuccId(),
+									">>>>>>>>>>>>>>> Killing Process:"
+									+ ((ManagedProcess) managedProcess)
+									.getDuccProcess().getPID()
+									+ " .Process State:"
+									+ ((ManagedProcess) managedProcess)
+									.getDuccProcess()
+									.getProcessState());
+							    doExec(new ProcessBuilder(cmd), cmd, true);
+				    		
+				    	}
+
+					}
+
+					
+				}
+			} catch (Exception e) { // InterruptedException, ExecutionException
+			    logger.error(methodName,
+					 ((ManagedProcess) super.managedProcess).getDuccId(), e,
+					 new Object[] {});
 			}
 
-		} catch (Exception e) { // InterruptedException, ExecutionException
-		    logger.error(methodName,
-				 ((ManagedProcess) super.managedProcess).getDuccId(), e,
-				 new Object[] {});
+			
 		}
-
 	}
 
 	private void startProcess(ICommandLine cmdLine, String[] cmd,
@@ -473,7 +493,7 @@
 		}
 	}
 
-	private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd)
+	public void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd)
 			throws Exception {
 		String methodName = "doExec";
 		int exitCode = 0;
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
index 21394e3..fc384f5 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/launcher/ManagedProcess.java
@@ -60,9 +60,11 @@
 	};
 
 	public static enum StopPriority {
-		KILL_9, QUIESCE_AND_STOP
+		NONE,KILL_9, QUIESCE_AND_STOP, DONT_WAIT
 	}
 
+	StopPriority stopPriority = StopPriority.NONE;
+	
 	private volatile boolean destroyed = false;
 	private IDuccProcess duccProcess;
 	// Used to kill a process after it reports its PID.
@@ -161,7 +163,12 @@
 	public void setMetricsProcessor(LinuxProcessMetricsProcessor processor) {
 		metricsProcessor = processor;
 	}
-
+	public void setStopPriority(StopPriority sp ) {
+		stopPriority = sp;
+	}
+	public StopPriority getStopPriority() {
+		return stopPriority;
+	}
 	public LinuxProcessMetricsProcessor getMetricsProcessor() {
 		return metricsProcessor;
 	}
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
index 962c33c..52c5d48 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessCpuUsageCollector.java
@@ -44,45 +44,7 @@
 	}
 	
 	private long collect() throws Exception{
-			
+		// use cgroups manager to collect cpu usage
 		return cgm.getCpuUsage(containerId);
-	
 	}
-/*
-	private String execTopShell() throws Exception {
-		List<String> command = new ArrayList<String>();
-		command.add("top");
-		command.add("-b");
-		command.add("-n");
-		command.add("1");
-		command.add("-p");
-		command.add(pid);
-
-		ProcessBuilder builder = new ProcessBuilder(command);
-		Process process = builder.start();
-		InputStream is = process.getInputStream();
-		InputStreamReader isr = new InputStreamReader(is);
-		BufferedReader br = new BufferedReader(isr);
-		String line;
-		int count = 0;
-		String cpu = "";
-		try {
-			while ((line = br.readLine()) != null) {
-				if (count == 7) {
-					String[] values = line.trim().split("\\s+");
-					cpu = values[9];
-					process.destroy();
-					break;
-				}
-				count++;
-			}
-		} finally {
-			if (is != null) {
-				is.close();
-			}
-		}
-		process.waitFor();
-		return cpu;
-	}
-	*/
 }
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
index fcb02e5..1d462aa 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessMajorFaultCollector.java
@@ -20,26 +20,32 @@
 

 import java.util.concurrent.Callable;

 

+import org.apache.uima.ducc.agent.launcher.CGroupsManager;

+import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat;

 import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;

 import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;

 import org.apache.uima.ducc.common.utils.DuccLogger;

 

 public class ProcessMajorFaultCollector implements

 		Callable<ProcessMemoryPageLoadUsage> {

-	String pid;

+	private CGroupsManager cgm=null;

+	private String containerId=null;

 	

-	public ProcessMajorFaultCollector(DuccLogger logger, String pid ) {

-		this.pid = pid;

+	public ProcessMajorFaultCollector(DuccLogger logger, CGroupsManager mgr, String containerId ) {

+		this.cgm = mgr;

+		this.containerId = containerId;

 	}

 

 	public ProcessMemoryPageLoadUsage call() throws Exception {

 		try {

-			//super.parseMetricFile();

-			return new DuccProcessMemoryPageLoadUsage(pid);

+			return new DuccProcessMemoryPageLoadUsage(collect());

 		} catch (Exception e) {

 			e.printStackTrace();

 			throw e;

 		}

 	}

-

+	private long collect() throws Exception{

+		// use cgroups manager to collect rss usage

+		return cgm.getUsageForMemoryStat(CgroupMemoryStat.FAULTS,containerId);

+	}

 }

diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
index fee0ec6..3de1a49 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessResidentMemoryCollector.java
@@ -18,23 +18,30 @@
 */
 package org.apache.uima.ducc.agent.metrics.collectors;
 
-import java.io.RandomAccessFile;
 import java.util.concurrent.Callable;
 
+import org.apache.uima.ducc.agent.launcher.CGroupsManager;
+import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat;
 import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory;
 import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
+import org.apache.uima.ducc.common.utils.DuccLogger;
 
 
-public class ProcessResidentMemoryCollector extends AbstractMetricCollector
+public class ProcessResidentMemoryCollector 
 		implements Callable<ProcessResidentMemory> {
-	public ProcessResidentMemoryCollector(RandomAccessFile fileHandle,
-			int howMany, int offset) {
-		super(fileHandle, howMany, offset);
+	private String containerId=null;
+	private CGroupsManager cgm=null;
+
+	public ProcessResidentMemoryCollector(DuccLogger logger, CGroupsManager mgr, String jobId) {
+		this.containerId = jobId;
+		this.cgm = mgr;
 	}
 
 	public ProcessResidentMemory call() throws Exception {
-		super.parseMetricFile();
-		return new DuccProcessResidentMemory(super.metricFileContents,
-				super.metricFieldOffsets, super.metricFieldLengths);
+		return new DuccProcessResidentMemory(collect());
+	}
+	private long collect() throws Exception{
+		// use cgroups manager to collect rss usage
+		return cgm.getUsageForMemoryStat(CgroupMemoryStat.RSS,containerId);
 	}
 }
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
index e93ec65..9ebe421 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/metrics/collectors/ProcessSwapUsageCollector.java
@@ -18,65 +18,34 @@
 */

 package org.apache.uima.ducc.agent.metrics.collectors;

 

-import java.io.RandomAccessFile;

 import java.util.concurrent.Callable;

 

-import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessMemoryPageLoadUsage;

-import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;

+import org.apache.uima.ducc.agent.launcher.CGroupsManager;

+import org.apache.uima.ducc.agent.launcher.CGroupsManager.CgroupMemoryStat;

+import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;

+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage;

 import org.apache.uima.ducc.common.utils.DuccLogger;

 

 public class ProcessSwapUsageCollector implements

-		Callable<ProcessMemoryPageLoadUsage> {

-	String pid;

-	

-	public ProcessSwapUsageCollector(DuccLogger logger, String pid,

-			RandomAccessFile fileHandle, int howMany, int offset) {

-		this.pid = pid;

+		Callable<ProcessSwapSpaceUsage> {

+	private CGroupsManager cgm=null;

+	private String containerId=null;

+

+	public ProcessSwapUsageCollector(DuccLogger logger, CGroupsManager mgr, String jobId ) {

+		this.containerId = jobId;

+		this.cgm = mgr;

 	}

 

-	public ProcessMemoryPageLoadUsage call() throws Exception {

+	public ProcessSwapSpaceUsage call() throws Exception {

 		try {

-			return new DuccProcessMemoryPageLoadUsage(pid);

+			return new DuccProcessSwapSpaceUsage(collect());

 		} catch (Exception e) {

 			e.printStackTrace();

 			throw e;

 		}

 	}

-/*

-	private String execTopShell() throws Exception {

-		List<String> command = new ArrayList<String>();

-		command.add("top");

-		command.add("-b");

-		command.add("-n");

-		command.add("1");

-		command.add("-p");

-		command.add(pid);

-

-		ProcessBuilder builder = new ProcessBuilder(command);

-		Process process = builder.start();

-		InputStream is = process.getInputStream();

-		InputStreamReader isr = new InputStreamReader(is);

-		BufferedReader br = new BufferedReader(isr);

-		String line;

-		int count = 0;

-		String cpu = "";

-		try {

-			while ((line = br.readLine()) != null) {

-				if (count == 7) {

-					String[] values = line.trim().split("\\s+");

-					cpu = values[9];

-					process.destroy();

-					break;

-				}

-				count++;

-			}

-		} finally {

-			if (is != null) {

-				is.close();

-			}

-		}

-		process.waitFor();

-		return cpu;

+	private long collect() throws Exception{

+		// use cgroups manager to collect rss usage

+		return cgm.getUsageForMemoryStat(CgroupMemoryStat.SWAP,containerId);

 	}

-	*/

 }

diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
index 90af38d..6b4bce7 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
@@ -31,10 +31,13 @@
 import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector;
 import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector;
 import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector;
+import org.apache.uima.ducc.agent.metrics.collectors.ProcessSwapUsageCollector;
 import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
+import org.apache.uima.ducc.common.agent.metrics.memory.DuccProcessResidentMemory;
 import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
 import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;
 import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
+import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage;
 import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.Utils;
@@ -47,17 +50,12 @@
 		ProcessMetricsProcessor {
 	private RandomAccessFile statmFile;
 
-	// private RandomAccessFile nodeStatFile;
 	private RandomAccessFile processStatFile;
 
-	//private long totalCpuInitUsage = 0;
-	
 	private long previousCPUReadingInMillis = 0;
 	
 	private long previousSnapshotTime = 0;
 
-	//private boolean initializing = true;
-
 	private final ExecutorService pool;
 
 	private IDuccProcess process;
@@ -76,17 +74,16 @@
 
 	private volatile boolean closed = true;
 
-	//private long clockAtStartOfRun = 0;
 
 	private long percentCPU = 0;
-
+	
+	
 	public LinuxProcessMetricsProcessor(DuccLogger logger,
 			IDuccProcess process, NodeAgent agent, String statmFilePath,
 			String nodeStatFilePath, String processStatFilePath,
 			ManagedProcess managedProcess) throws FileNotFoundException {
 		this.logger = logger;
 		statmFile = new RandomAccessFile(statmFilePath, "r");
-		// nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r");
 		processStatFile = new RandomAccessFile(processStatFilePath, "r");
 		this.managedProcess = managedProcess;
 		this.agent = agent;
@@ -148,335 +145,269 @@
 		return true;
 	}
 
-	public void process(Exchange e) {
-		if (closed) { // files closed
+	private long getSwapUsage() throws Exception {
+		long swapUsage = -1;
+		if (agent.useCgroups) {
+
+			String containerId = agent.cgroupsManager
+					.getContainerId(managedProcess);
+
+			ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(
+					logger, agent.cgroupsManager, containerId);
+			logger.info("LinuxProcessMetricsProcessor.getSwapUsage", null,
+					"Fetching Swap Usage PID:" + process.getPID());
+			Future<ProcessSwapSpaceUsage> processFaults = pool
+					.submit(processSwapCollector);
+			swapUsage = processFaults.get().getSwapUsage();
+			logger.info("LinuxProcessMetricsProcessor.getSwapUsage", null,
+					" Process Swap Usage:" + swapUsage);
+		}
+		return swapUsage;
+	}
+	
+	private long getFaults() throws Exception {
+		long faults = -1;
+		if (agent.useCgroups) {
+			String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+			ProcessMajorFaultCollector processFaultsCollector = 
+					new ProcessMajorFaultCollector(logger, agent.cgroupsManager, containerId);
+	        logger.info("LinuxProcessMetricsProcessor.getFaults",null,"Fetching Page Faults PID:"+process.getPID());
+	        Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
+		    faults = processFaults.get().getMajorFaults();
+			logger.info(
+					"LinuxProcessMetricsProcessor.getFaults",null," Process Faults (pgpgin):"+faults);
+		}
+		return faults;
+	}
+	private long getRss() throws Exception {
+		long rss = -1;
+		if (agent.useCgroups) {
+			String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+			ProcessResidentMemoryCollector processRSSCollector = 
+					new ProcessResidentMemoryCollector(logger, agent.cgroupsManager, containerId);
+	        logger.info("LinuxProcessMetricsProcessor.getRss",null,"Fetching RSS Usage for PID:"+process.getPID());
+	        Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
+		    rss = processRss.get().get();
+			logger.info(
+					"LinuxProcessMetricsProcessor.getRss",null," Process RSS:"+rss);
+		}
+		return rss;
+	}
+	
+	private long getCpuUsage() throws Exception {
+		long cpuUsage=-1;
+		if (agent.useCgroups) {
+			String containerId = agent.cgroupsManager.getContainerId(managedProcess);
+
+			Future<ProcessCpuUsage> processCpuUsage = null;
+			ProcessCpuUsageCollector processCpuUsageCollector = 
+					new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
+	        logger.info("LinuxProcessMetricsProcessor.getCpuUsage",null,"Fetching CPU Usage for PID:"+process.getPID());
+			processCpuUsage = pool
+					.submit(processCpuUsageCollector);
+			long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
+			if ( cpuUsageInNanos >= 0 ) {
+				// cpuUsage comes from cpuacct.usage and is in nanos
+				cpuUsage = Math.round( cpuUsageInNanos / 1000000 );  // normalize into millis
+			} 
+			logger.info(
+					"LinuxProcessMetricsProcessor.getCpuUsage",null,
+					"CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+cpuUsage);
+		}
+		return cpuUsage;
+	}
+	
+	private long getCpuTime( long totalCpuUsageInMillis) throws Exception {
+		long cp = -1;
+		if (managedProcess.getDuccProcess().getProcessState()
+				.equals(ProcessState.Running) ||
+				managedProcess.getDuccProcess().getProcessState()
+				.equals(ProcessState.Initializing)	
+				) {
+			if (agent.useCgroups && totalCpuUsageInMillis != -1) {
+				
+				long timeRunning = 1;
+				if ( process.getTimeWindowInit() != null ) {
+					timeRunning = process.getTimeWindowInit().getElapsedMillis();
+				}
+				if ( process.getTimeWindowRun() != null ) {
+					timeRunning += process.getTimeWindowRun().getElapsedMillis();
+				}
+				// normalize time in running state into seconds
+				percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
+				cp = percentCPU;
+			}
+		} else {
+			cp  = percentCPU;
+		}
+		return cp;
+	}
+
+	private long getCurrentCpu(long totalCpuUsageInMillis ) {
+		long currentCpu=-1;
+		// publish current CPU usage by computing a delta from the last time
+		// CPU data was fetched.
+		if ( totalCpuUsageInMillis > 0 ) {
+			double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0;
+			double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0;
+			currentCpu = Math.round(100*(millisCPU/millisRun) ) ;
+			previousCPUReadingInMillis = totalCpuUsageInMillis;
+			previousSnapshotTime = System.currentTimeMillis();
+		} else {
+			if (agent.useCgroups && totalCpuUsageInMillis != -1 ) {
+				currentCpu = 0;
+			}
+		}
+		return currentCpu;
+	}
+	
+
+	private void killProcsIfExceedingMemoryThreshold() throws Exception {
+		if ( !agent.useCgroups ) {
 			return;
 		}
+		if (process.getSwapUsage() > 0
+				&& process.getSwapUsage() > managedProcess
+						.getMaxSwapThreshold()) {
+		} else {
+			String containerId = agent.cgroupsManager
+					.getContainerId(managedProcess);
 
+			String[] cgroupPids = agent.cgroupsManager
+			        .getPidsInCgroup(containerId);
+            logger.info("LinuxProcessMetricsProcessor.process",null,"Container ID:"+containerId+" cgroup pids "+cgroupPids.length);
 
-		// if process is stopping or already dead dont collect metrics. The
-		// Camel
-		// route has just been stopped.
-		if (!collectStats(process.getProcessState())) {
-			return;
-		}
-		if (process.getProcessState().equals(ProcessState.Initializing)
-				|| process.getProcessState().equals(ProcessState.Running))
-			try {
+			// Use Memory Guard only if cgroups are disabled and fudge
+			// factor > -1
 
-				// executes script
-				// DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up
-				// swap used by
-				// a process
-				long totalSwapUsage = 0;
-				long totalFaults = 0;
-				long totalCpuUsageInMillis = 0;
-				long totalRss = 0;
-				//int currentCpuUsage = 0;
-				Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null;
-				Future<ProcessCpuUsage> processCpuUsage = null;
-				String[] cgroupPids = new String[0];
-				try {
-					String swapUsageScript = System
-							.getProperty("ducc.agent.swap.usage.script");
+			if ( fudgeFactor > -1
+				 && managedProcess.getProcessMemoryAssignment()
+							.getMaxMemoryWithFudge() > 0) {
 
-					if (agent.useCgroups) {
-						String containerId = agent.cgroupsManager
-								.getContainerId(managedProcess);
-						cgroupPids = agent.cgroupsManager
-								.getPidsInCgroup(containerId);
-						for (String pid : cgroupPids) {
-							// the swap usage script is defined in
-							// ducc.properties
-							if (swapUsageScript != null) {
-								DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
-										pid, managedProcess.getOwner(),
-										swapUsageScript, logger);
-								totalSwapUsage += processSwapSpaceUsage
-										.getSwapUsage();
-							}
+				long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
 
-							ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
-									logger, pid);
-							// if process is stopping or already dead dont
-							// collect metrics. The Camel
-							// route has just been stopped.
-							if (!collectStats(process.getProcessState())) {
-								return;
-							}
-
-							processMajorFaultUsage = pool
-									.submit(processMajorFaultUsageCollector);
-							totalFaults += processMajorFaultUsage.get()
-									.getMajorFaults();
-							try {
-								if (!collectStats(process.getProcessState())) {
-									return;
-								}
-								
-							} catch( Exception ee) {
-								logger.warn(
-										"LinuxProcessMetricsProcessor.process",
-										null,ee);
-							} 
-							RandomAccessFile rStatmFile = null;
-							try {
-								rStatmFile = new RandomAccessFile("/proc/"
-										+ pid + "/statm", "r");
-							} catch (FileNotFoundException fnfe) {
-								logger.info(
-										"LinuxProcessMetricsProcessor.process",
-										null,
-										"Statm File:"
-												+ "/proc/"
-												+ pid
-												+ "/statm *Not Found*. Process must have already exited");
-								return;
-							}
-							ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
-									rStatmFile, 2, 0);
-							// if process is stopping or already dead dont
-							// collect metrics. The Camel
-							// route has just been stopped.
-							if (!collectStats(process.getProcessState())) {
-								return;
-							}
-
-							Future<ProcessResidentMemory> prm = pool
-									.submit(collector);
-
-							totalRss += prm.get().get();
-
-							rStatmFile.close();
-						}
-
-						ProcessCpuUsageCollector processCpuUsageCollector = 
-								new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
-						processCpuUsage = pool
-								.submit(processCpuUsageCollector);
-						long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
-						if ( cpuUsageInNanos >= 0 ) {
-							// cpuUsage comes from cpuacct.usage and is in nanos
-							totalCpuUsageInMillis = Math.round( cpuUsageInNanos / 1000000 );  // normalize into millis
-						} else {
-							totalCpuUsageInMillis = -1;
-						}
-						logger.info(
-								"LinuxProcessMetricsProcessor.process",null,
-								"CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+totalCpuUsageInMillis);
-					} else {
-						if (swapUsageScript != null) {
-							DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
-									process.getPID(),
-									managedProcess.getOwner(), swapUsageScript,        
-									logger);
-							totalSwapUsage = processSwapSpaceUsage
-									.getSwapUsage();
-						}
-
-						ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
-								logger, process.getPID());
-
-						// if process is stopping or already dead dont collect
-						// metrics. The Camel
-						// route has just been stopped.
-						if (!collectStats(process.getProcessState())) {
-						    return;
-						}
-						processMajorFaultUsage = pool
-								.submit(processMajorFaultUsageCollector);
-						totalFaults = processMajorFaultUsage.get()
-								.getMajorFaults();
-						// Cgroups are not available so percent CPU is not available
-						totalCpuUsageInMillis = -1;   // -1 stands for N/A
-						//currentCpuUsage = -1; // -1 stands for N/A
-
-						ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
-								statmFile, 2, 0);
-						// if process is stopping or already dead dont collect
-						// metrics. The Camel
-						// route has just been stopped.
-						if (!collectStats(process.getProcessState())) {
-							return;
-						}
-
-						Future<ProcessResidentMemory> prm = pool
-								.submit(collector);
-						totalRss = prm.get().get();
-					}
-
-				} catch (Exception exc) {
-					if (!collectStats(process.getProcessState())) {
-						return;
-					}
-					logger.error("LinuxProcessMetricsProcessor.process", null,
-							exc);
-				}
-
-				// report cpu utilization while the process is running
-				if (managedProcess.getDuccProcess().getProcessState()
-						.equals(ProcessState.Running) ||
-						managedProcess.getDuccProcess().getProcessState()
-						.equals(ProcessState.Initializing)	
-						) {
-					if (agent.useCgroups && totalCpuUsageInMillis != -1) {
-						
-						long timeRunning = 1;
-						if ( process.getTimeWindowInit() != null ) {
-							timeRunning = process.getTimeWindowInit().getElapsedMillis();
-						}
-						if ( process.getTimeWindowRun() != null ) {
-							timeRunning += process.getTimeWindowRun().getElapsedMillis();
-						}
-						// normalize time in running state into seconds
-						percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
-						process.setCpuTime( percentCPU );
-					} else {
-						process.setCpuTime(-1);   // -1 stands for N/A
-						percentCPU = -1;
-					}
-				} else {
-					// if process is not dead, report the last known percentCPU
-					process.setCpuTime(percentCPU);
-				}
-				// publish current CPU usage by computing a delta from the last time
-				// CPU data was fetched.
-				if ( totalCpuUsageInMillis > 0 ) {
-					double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0;
-					double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0;
-					process.setCurrentCPU(Math.round(100*(millisCPU/millisRun) ) );
-					previousCPUReadingInMillis = totalCpuUsageInMillis;
-					previousSnapshotTime = System.currentTimeMillis();
-					
-				} else {
-					if (agent.useCgroups && totalCpuUsageInMillis != -1 ) {
-						process.setCurrentCPU(0);
-					} else {
-						process.setCurrentCPU(-1);  // -1 stands for N/A
-					}
-				}
-				logger.info(
-					"process",
-					null,
-					"----------- PID:" + process.getPID()
-					+ " Total CPU Time (%):" + process.getCpuTime()
-					+ " Delta CPU Time (%):" +process.getCurrentCPU() );
-				// collects process Major faults (swap in memory)
-				process.setMajorFaults(totalFaults);
-				// Current Process Swap Usage in bytes
-				long st = System.currentTimeMillis();
-				long processSwapUsage = totalSwapUsage * 1024;
-				// collects swap usage from /proc/<PID>/smaps file via a script
-				// DUCC_HOME/admin/collect_process_swap_usage.sh
-				process.setSwapUsage(processSwapUsage);
-				logger.info(
+				logger.trace(
 						"process",
 						null,
-						"----------- PID:" + process.getPID()
-								+ " Major Faults:" + totalFaults
-								+ " Process Swap Usage:" + processSwapUsage
-								+ " Max Swap Usage Allowed:"
-								+ managedProcess.getMaxSwapThreshold()
-								+ " Time to Collect Swap Usage:"
-								+ (System.currentTimeMillis() - st));
-				if (processSwapUsage > 0
-						&& processSwapUsage > managedProcess
-								.getMaxSwapThreshold()) {
-				} else {
-					// Use Memory Guard only if cgroups are disabled and fudge
-					// factor > -1
-
-					if (!agent.useCgroups
-							&& fudgeFactor > -1
-							&& managedProcess.getProcessMemoryAssignment()
-									.getMaxMemoryWithFudge() > 0) {
-						// RSS is in terms of pages(blocks) which size is system
-						// dependent. Default 4096 bytes
-						long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize
-																			// RSS
-																			// into
-																			// MB
-						logger.trace(
-								"process",
-								null,
-								"*** Process with PID:"
-										+ managedProcess.getPid()
-										+ " Assigned Memory (MB): "
-										+ managedProcess
-												.getProcessMemoryAssignment()
-										+ " MBs. Current RSS (MB):" + rss);
-						// check if process resident memory exceeds its memory
-						// assignment calculate in the PM
-						if (rss > managedProcess.getProcessMemoryAssignment()
-								.getMaxMemoryWithFudge()) {
-							logger.error(
-									"process",
-									null,
-									"\n\n********************************************************\n\tProcess with PID:"
-											+ managedProcess.getPid()
-											+ " Exceeded its max memory assignment (including a fudge factor) of "
-											+ managedProcess
-													.getProcessMemoryAssignment()
-													.getMaxMemoryWithFudge()
-											+ " MBs. This Process Resident Memory Size: "
-											+ rss
-											+ " MBs .Killing process ...\n********************************************************\n\n");
-							try {
-								managedProcess.kill(); // mark it for death
-								process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
-										.toString());
-								agent.stopProcess(process);
-
-								if (agent.useCgroups) {
-									for (String pid : cgroupPids) {
-										// skip the main process that was just
-										// killed above. Only kill
-										// its child processes.
-										if (pid.equals(managedProcess
-												.getDuccProcess().getPID())) {
-											continue;
-										}
-										killChildProcess(pid, "-15");
-									}
-								}
-							} catch (Exception ee) {
-								if (!collectStats(process.getProcessState())) {
-									return;
-								}
-								logger.error("process", null, ee);
-							}
-							return;
-						}
-					}
-
-				}
-				// Publish resident memory
-				process.setResidentMemory((totalRss * blockSize));
-				// dont collect GC metrics for POPs. May not be java or may not
-				// be a jmx enabled java process
-				if (!process.getProcessType().equals(ProcessType.Pop)) {
-					ProcessGarbageCollectionStats gcStats = gcStatsCollector
-							.collect();
-					process.setGarbageCollectionStats(gcStats);
-					logger.info(
+						"*** Process with PID:"
+								+ managedProcess.getPid()
+								+ " Assigned Memory (MB): "
+								+ managedProcess
+										.getProcessMemoryAssignment()
+								+ " MBs. Current RSS (MB):" + rss);
+				// check if process resident memory exceeds its memory
+				// assignment calculate in the PM
+				if (rss > managedProcess.getProcessMemoryAssignment()
+						.getMaxMemoryWithFudge()) {
+					logger.error(
 							"process",
 							null,
-							"PID:" + process.getPID()
-									+ " Total GC Collection Count :"
-									+ gcStats.getCollectionCount()
-									+ " Total GC Collection Time :"
-									+ gcStats.getCollectionTime());
-				}
+							"\n\n********************************************************\n\tProcess with PID:"
+									+ managedProcess.getPid()
+									+ " Exceeded its max memory assignment (including a fudge factor) of "
+									+ managedProcess
+											.getProcessMemoryAssignment()
+											.getMaxMemoryWithFudge()
+									+ " MBs. This Process Resident Memory Size: "
+									+ rss
+									+ " MBs .Killing process ...\n********************************************************\n\n");
+					try {
+						managedProcess.kill(); // mark it for death
+						process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
+								.toString());
+						agent.stopProcess(process);
 
-			} catch (Exception ex) {
-				// if the child process is not running dont log the exception.
-				if (!collectStats(process.getProcessState())) {
+						if (agent.useCgroups) {
+							for (String pid : cgroupPids) {
+								// skip the main process that was just
+								// killed above. Only kill
+								// its child processes.
+								if (pid.equals(managedProcess
+										.getDuccProcess().getPID())) {
+									continue;
+								}
+								killChildProcess(pid, "-15");
+							}
+						}
+					} catch (Exception ee) {
+						if (!collectStats(process.getProcessState())) {
+							return;
+						}
+						logger.error("process", null, ee);
+					}
 					return;
 				}
-				logger.error("process", null, ex);
-				ex.printStackTrace();
 			}
 
+		}
+
+	}
+
+	private ProcessGarbageCollectionStats getGCStats() throws Exception {
+		if (!process.getProcessType().equals(ProcessType.Pop)) {
+			ProcessGarbageCollectionStats gcStats = gcStatsCollector
+					.collect();
+		   return gcStats;
+		}
+		return new ProcessGarbageCollectionStats();
+	}
+	public boolean processIsActive() {
+		return process.getProcessState().equals(ProcessState.Starting)
+               ||
+			   process.getProcessState().equals(ProcessState.Initializing)
+			   || 
+			   process.getProcessState().equals(ProcessState.Running);
+	}
+	public void process(Exchange e) {
+		// if process is stopping or already dead dont collect metrics. The
+		// Camel route has just been stopped.
+		if (closed || !processIsActive()) {
+ 		    logger.info("LinuxProcessMetricsProcessor.process",	null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Returning");	
+ 		    return;
+		}
+		try {
+			
+			process.setSwapUsage(getSwapUsage());
+			process.setMajorFaults(getFaults());
+
+			long rssInBytes = getRss();
+			process.setResidentMemory(rssInBytes);
+
+			long totalCpuUsageInMillis = getCpuUsage();
+
+			// set CPU time in terms of %
+			process.setCpuTime(getCpuTime(totalCpuUsageInMillis));
+
+			process.setCurrentCPU(getCurrentCpu(totalCpuUsageInMillis));
+
+			ProcessGarbageCollectionStats gcStats = getGCStats();
+			process.setGarbageCollectionStats(gcStats);
+			logger.info(
+					"process",
+					null,
+					"----------- PID:" + process.getPID() + " RSS:" 
+							+ ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024))+ " MB" : "-1")
+							+ " Total CPU Time (%):" + process.getCpuTime()
+							+ " Delta CPU Time (%):" + process.getCurrentCPU()
+							+ " Major Faults:" + process.getMajorFaults()
+							+ " Process Swap Usage:" + process.getSwapUsage()
+							+ " Max Swap Usage Allowed:"
+							+ managedProcess.getMaxSwapThreshold()
+							+ " Total GC Collection Count :"
+							+ gcStats.getCollectionCount()
+							+ " Total GC Collection Time :"
+							+ gcStats.getCollectionTime());
+
+			killProcsIfExceedingMemoryThreshold();
+
+		} catch (Exception exc) {
+			if (!collectStats(process.getProcessState())) {
+				return;
+			}
+			logger.error("LinuxProcessMetricsProcessor.process", null, exc);
+		}
 	}
 
 	private void killChildProcess(final String pid, final String signal) {
diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java
index 85c4119..2c7ba7d 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/memory/DuccProcessResidentMemory.java
@@ -18,25 +18,17 @@
 */
 package org.apache.uima.ducc.common.agent.metrics.memory;
 
-import org.apache.uima.ducc.common.node.metrics.ByteBufferParser;
-
-public class DuccProcessResidentMemory extends ByteBufferParser implements
+public class DuccProcessResidentMemory implements
 		ProcessResidentMemory {
 
 	private static final long serialVersionUID = 8563460863767404377L;
-	private static final int TOTAL = 0;
-	private static final int RESIDENT = 1;
-
-	public DuccProcessResidentMemory(byte[] memInfoBuffer,
-			int[] memInfoFieldOffsets, int[] memInfoFiledLengths) {
-		super(memInfoBuffer, memInfoFieldOffsets, memInfoFiledLengths);
+	long rss;
+	
+	public DuccProcessResidentMemory(long rss) {
+		this.rss = rss;
 	}
 
 	public long get() {
-		return super.getFieldAsLong(RESIDENT);
-	}
-	
-	public long getTotal() {
-		return super.getFieldAsLong(TOTAL);
+		return rss;
 	}
 }
diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
index 99aed58..ff2723a 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessMemoryPageLoadUsage.java
@@ -18,57 +18,13 @@
 */

 package org.apache.uima.ducc.common.agent.metrics.swap;

 

-import java.io.BufferedReader;

-import java.io.InputStream;

-import java.io.InputStreamReader;

-

-public class DuccProcessMemoryPageLoadUsage implements

-		ProcessMemoryPageLoadUsage {

-	String pid;

+public class DuccProcessMemoryPageLoadUsage implements ProcessMemoryPageLoadUsage{

+	long faults;

 	

-	public DuccProcessMemoryPageLoadUsage(String pid) {

-		this.pid = pid;

+	public DuccProcessMemoryPageLoadUsage(long faults) {

+		this.faults = faults;

 	}	

-	public long getMajorFaults() throws Exception {

-		return collectProcessMajorFaults();

+	public long getMajorFaults() {

+		return faults;

 	}

-	private long collectProcessMajorFaults() throws Exception {

-		String[] command = new String[] {"/bin/ps","-o","maj_flt",pid};

-

-		ProcessBuilder builder = new ProcessBuilder(command);

-		builder.redirectErrorStream(true);

-		Process process = builder.start();

-		InputStream is = process.getInputStream();

-		if ( is != null ) {

-			InputStreamReader isr = new InputStreamReader(is);

-			BufferedReader br = new BufferedReader(isr);

-			String line;

-			int count = 0;

-			String faults = null;

-			try {

-				while ((line = br.readLine()) != null) {

-					// skip the header line

-					if (count == 1) {

-						faults = line.trim();

-					}

-					count++;

-				}

-			} finally {

-				if (is != null) {

-					is.close();

-				}

-				process.waitFor();

-				process.destroy();

-			}

-			if ( faults != null) {

-				return Long.parseLong(faults.trim());

-			} else {

-				return 0;

-			}

-			

-		}

-		return 0;

-		

-	}

-

 }

diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
index b4501e5..1be47b5 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/DuccProcessSwapSpaceUsage.java
@@ -19,72 +19,13 @@
 

 package org.apache.uima.ducc.common.agent.metrics.swap;

 

-import java.io.BufferedReader;

-import java.io.InputStreamReader;

-

-

-import org.apache.uima.ducc.common.utils.DuccLogger;

-import org.apache.uima.ducc.common.utils.Utils;

-

 public class DuccProcessSwapSpaceUsage implements ProcessSwapSpaceUsage {

-	String pid=null;

-	String execScript=null;

-	DuccLogger logger=null;

-	String[] command;

+	long swapusage;

 	

-	public DuccProcessSwapSpaceUsage( String pid, String owner, String execScript, DuccLogger logger) {

-		this.pid = pid;

-		this.execScript = execScript;

-		this.logger = logger;

-	    String c_launcher_path = 

-	            Utils.resolvePlaceholderIfExists(

-	                    System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties());

-	    command = new String[] { c_launcher_path,

-	              "-u", owner, "--", execScript, pid }; 

+	public DuccProcessSwapSpaceUsage( long swapusage) {

+		this.swapusage = swapusage;

 	}

 	public long getSwapUsage() {

-		long swapusage=0;

-		if ( pid != null && execScript != null ) {

-			InputStreamReader in = null;

-			try {

-				ProcessBuilder pb = new ProcessBuilder();

-				//String[] command = {execScript,pid};

-				pb.command(command); //command);

-

-				//logger.info("------------ getSwapUsage-", null, cmd);

-				pb.redirectErrorStream(true);

-				Process swapCollectorProcess = pb.start();

-				in = new InputStreamReader(swapCollectorProcess.getInputStream());

-				BufferedReader reader = new BufferedReader(in);

-				String line=null;

-				boolean skip = true;

-				while ((line = reader.readLine()) != null) {

-					try {

-						if ( line.startsWith("1001")) {

-							skip = false;

-							continue;

-						}

-						if (!skip) {

-							swapusage = Long.parseLong(line.trim());

-							logger.info("getSwapUsage-",null, "PID:"+pid+" Swap Usage:"+line);

-						}

-					} catch( NumberFormatException e) {

-						logger.error("getSwapUsage", null, line);

-					}

-				}

-			} catch( Exception e) {

-				logger.error("getSwapUsage", null, e);

-			} finally {

-				if ( in != null ) {

-					try {

-						in.close();	

-					} catch( Exception e) {

-						logger.error("getSwapUsage", null, e);

-					}

-					

-				}

-			}

-		}

 		return swapusage;

 	}

 	

diff --git a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
index cbc7c0f..e830642 100644
--- a/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
+++ b/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/agent/metrics/swap/ProcessMemoryPageLoadUsage.java
@@ -20,5 +20,5 @@
 package org.apache.uima.ducc.common.agent.metrics.swap;

 

 public interface ProcessMemoryPageLoadUsage {

-	public long getMajorFaults() throws Exception;

+	public long getMajorFaults();

 }

diff --git a/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex b/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex
index 40367d2..22a6aec 100644
--- a/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex
+++ b/uima-ducc-duccdocs/src/site/tex/duccbook/part4/install.tex
@@ -106,6 +106,7 @@
   \item DUCC run with user {\em ducc} credentials.
   \item libcgroup1-0.37+ on SLES, libcgroup-0.37+ on RHEL, and on Ubuntu all of cgroup-bin cgroup-lite libcgroup1
   \item along with a customized /etc/cgconfig.conf
+  \item On some flavors of Linux, the cgroup swap accounting is not enabled and swap is reported as N/A. To enable swap accounting add swapaccount=1 kernel parameter. More information on this step is available here: \url{http://unix.stackexchange.com/questions/147158/how-to-enable-swap-accounting-for-memory-cgroup-in-archlinux}.  
 \end{itemize}
 
   
diff --git a/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java b/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
index 1280b0e..4e40c0e 100644
--- a/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
+++ b/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
@@ -74,7 +74,6 @@
             System.out.println("Is this nuts or what, no logger!");
         }
 
-        forceCpuUsage();
         
         if ( initComplete ) {
             logger.log(Level.INFO, "Init bypassed in PID:TID " + pid + ":" + tid + ", already completed. ");
@@ -366,7 +365,10 @@
     @Override
     public void process(CAS cas) throws AnalysisEngineProcessException 
     {
-        String data = cas.getSofaDataString();
+
+        forceCpuUsage();
+
+    	String data = cas.getSofaDataString();
 
         //
         // Parms are in a single 4-token string:
diff --git a/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java b/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
index a0044ea..4655965 100644
--- a/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
+++ b/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java
@@ -200,6 +200,9 @@
 		String methodName = "copyInventoryMajorFaults";
 		logger.trace(methodName, null, messages.fetch("enter"));
 		process.setMajorFaults(inventoryProcess.getMajorFaults());
+		DuccId jobid = dw.getDuccId();
+		DuccId processId = process.getDuccId();
+		logger.trace(methodName, jobid, processId, "MajofFaults:"+process.getMajorFaults());
 		logger.trace(methodName, null, messages.fetch("exit"));
 		return;
 	}
@@ -208,6 +211,9 @@
 		String methodName = "copyInventoryRss";
 		logger.trace(methodName, null, messages.fetch("enter"));
 		process.setResidentMemory(inventoryProcess.getResidentMemory());
+		DuccId jobid = dw.getDuccId();
+		DuccId processId = process.getDuccId();
+		logger.trace(methodName, jobid, processId, "Rss:"+process.getResidentMemory());
 		logger.trace(methodName, null, messages.fetch("exit"));
 		return;
 	}
@@ -460,7 +466,6 @@
 	private void copyInventoryProcessState(IDuccWorkJob job, IDuccProcess inventoryProcess, IDuccProcess process) {
 		String methodName = "copyInventoryProcessState";
 		logger.trace(methodName, job.getDuccId(), messages.fetch("enter"));
-		
 		if(!compare(inventoryProcess.getProcessState().toString(),process.getProcessState().toString())) {
 			switch((JobState)job.getStateObject()) {
 			//case Initializing:
@@ -468,7 +473,6 @@
 			//	break;
 			default:
 				process.advanceProcessState(inventoryProcess.getProcessState());
-				logger.trace(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
 				if ( inventoryProcess.getProcessJmxUrl() != null && process.getProcessJmxUrl() == null) {
 					process.setProcessJmxUrl(inventoryProcess.getProcessJmxUrl());
 				}
@@ -476,6 +480,7 @@
 				break;
 			}
 		}
+		logger.trace(methodName, job.getDuccId(), process.getDuccId(), messages.fetchLabel("process state")+process.getProcessState());
 		logger.trace(methodName, job.getDuccId(), messages.fetch("exit"));
 	}
 	
diff --git a/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java b/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
index de5dd80..26ce85f 100644
--- a/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
+++ b/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
@@ -86,7 +86,7 @@
 
     // For a registered service, here is my registered id
     DuccId id;
-    HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
+    // UIMA-5244 HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
     String history_key = IStateServices.SvcMetaProps.work_instances.pname();
     String implementors_key = IStateServices.SvcMetaProps.implementors.pname();
 
@@ -136,6 +136,7 @@
     long last_runnable = 0;
 
     // The number of instances to maintain live.
+    // Pinger or manual start/stop may make the number of live instances differ from the registered number
     int instances = 1;
     int registered_instances;
 
@@ -171,8 +172,9 @@
 
     String[] coOwners = null;
 
-    String archive_key  = "true";
-    String archive_flag = IStateServices.SvcMetaProps.is_archived.columnName();
+    //  Swapped these 2 values  UIMA-5244
+    String archive_key = IStateServices.SvcMetaProps.is_archived.columnName();
+    String archive_flag  = "true";
 
     //
     // Constructor for a registered service
@@ -803,11 +805,13 @@
             return;
         }
 
+        /*   UIMA-5244 Can remove this as friendly_ids is always empty
         String history = meta_props.getStringProperty(history_key, "");
         for ( Long id : friendly_ids.keySet() ) {
             history = history + " " + id.toString();
         }
         meta_props.put(history_key, history);
+        */
         meta_props.put(archive_key, archive_flag);
 
         try {
@@ -1000,6 +1004,14 @@
         inst.update(share_id, host);
     }
 
+    /**
+     * 
+     * @param n     the new value for the register instance count
+     * 
+     * Called when the registration is updated via the CLI
+     * Does NOT update the desired number of running instances as this can be set by the pinger.
+     * Also modifications to the registration should not affect running instances.
+     */
     synchronized void updateRegisteredInstances(int n)
     {
         meta_props.setProperty(IStateServices.SvcMetaProps.instances.pname(), Integer.toString(n));
@@ -1008,7 +1020,11 @@
 
     /**
      * @param n      is the target number of instances we want running
-     * @param update indicates whether tp match registration to the target
+     * 
+     * This param dropped??
+     * @param update indicates whether to match registration to the target
+     * 
+     * Called by doStart & doStop so may be making the running instances differ from the registered number
      */
     synchronized void updateInstances(int n)
     {
@@ -1337,8 +1353,12 @@
      */
     boolean needNextStart(JobState old, JobState current)
     {
-        // UIMA-4587
+        // UIMA-4587 & UIMA-5244
     	String methodName="needNextStart";
+    	// Can't do this before we handle the OR publication of "defunct" instances that were running when DUCC was shutdown
+    	// They should not be counted as errors when the SM restarts.
+ /*       if ( isDeregistered() || !enabled() ) {
+            logger.info(methodName, id, "Bypassing instance start because service is unregistered or disabled.");*/
         if ( isDeregistered() ) {
             logger.info(methodName, id, "Bypassing instance start because service is unregistered.");
             return false;
@@ -2066,13 +2086,12 @@
      * Save the id for possible reuse.
      *
      * It's an error, albeit non-fatal, if the instance is already stashed.
-
-     * Note: this might be fatal for the instance, or the service, but it's not fatal for the SM
-     * so we simply note it in the log but not crash SM
+     * Can happen if an instance dies while  being stopped explicitly
      * UIMA-4258
      */
     synchronized void stash_instance_id(int instid)
     {
+        /* UIMA-5244 No need to warn as is expected.
     	String methodName = "stash_intance_id";
         if ( available_instance_ids.containsKey(instid) ) {
             try {
@@ -2083,6 +2102,7 @@
             }
             return;
         }
+        */
 
         available_instance_ids.put(instid, instid);
     }
@@ -2096,7 +2116,7 @@
      */
     synchronized void conditionally_stash_instance_id(int instid)
     {
-        if ( available_instance_ids.containsKey(instid) ) {
+        if ( available_instance_ids.containsKey(instid) ) {           // Is this necessary? Could simply let the put replace
             return;
         }
         stash_instance_id(instid);        
@@ -2106,13 +2126,15 @@
     {
     	String methodName = "start";
 
-        // UIMA-4587
-        if ( isDeregistered() ) {
-            logger.info(methodName, id, "Bypass start becuase service is unregistered.");
+        // UIMA-4587 & UIMA-5244
+    	// Can't do this yet
+/*        if ( isDeregistered() || !enabled()) {
+            logger.info(methodName, id, "Bypass start because service is unregistered or disabled.");*/
+    	if ( isDeregistered()) {
+    	    logger.info(methodName, id, "Bypass start because service is unregistered.");
             return;
         }
 
-
         if ( countImplementors() >= instances ) {
             return;
         }
@@ -2202,6 +2224,7 @@
 
     synchronized void stopAll()
     {
+        instances = 0;     // Reduce the target count to 0 to ensure no more are started UIMA-5244
         stop(implementors.size());
     }
 
diff --git a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java
index c36a459..29f849f 100644
--- a/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java
+++ b/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/DuccHandler.java
@@ -136,7 +136,7 @@
 	private static Messages messages = Messages.getInstance();
 	private static DuccId jobid = null;
 	
-	// These keys may have large values and be displayed witg Show/Hide buttons.
+	// These keys may have large values and be displayed with Show/Hide buttons.
 	// ducc.js must be updated if more than 4 are needed (services may have 4)
 	private final String[] n = {"classpath", "service_ping_classpath", "process_executable_args", "process_jvm_args", "environment"};
 	private final Set<String> hideableKeys = new HashSet<String>(Arrays.asList(n));
@@ -145,6 +145,8 @@
 	private enum AllocationType { JD, MR, SPC, SPU, UIMA };
 	private enum LogType { POP, UIMA };
 	
+	private String notAvailable = "N/A";
+	
 	private DuccAuthenticator duccAuthenticator = DuccAuthenticator.getInstance();
 	
 	private String duccVersion						= duccContext+"/version";
@@ -782,15 +784,22 @@
 				}
 				catch(Exception e) {
 				}
-				double swap = process.getSwapUsageMax();
-				if((swap * faults) > 0) {
-					sb.append("<span class=\"health_red\""+">");
+				if(faults < 0) {
+					sb.append("<span class=\"health_black\""+">");
+					sb.append(notAvailable);
+					sb.append("</span>");
 				}
 				else {
-					sb.append("<span class=\"health_black\""+">");
+					double swap = process.getSwapUsageMax();
+					if((swap * faults) > 0) {
+						sb.append("<span class=\"health_red\""+">");
+					}
+					else {
+						sb.append("<span class=\"health_black\""+">");
+					}
+					sb.append(faults);
+					sb.append("</span>");
 				}
-				sb.append(faults);
-				sb.append("</span>");
 				break;
 			}
 		}
@@ -807,34 +816,47 @@
 			default:
 				if(!process.isActive()) {
 					double swap = process.getSwapUsageMax();
-					swap = swap/Constants.GB;
-					String displaySwap = formatter.format(swap);
-					if(swap > 0) {
-						sb.append("<span class=\"health_red\""+">");
+					if(swap < 0) {
+						sb.append("<span class=\"health_black\""+">");
+						sb.append(notAvailable);
+						sb.append("</span>");
 					}
 					else {
-						sb.append("<span class=\"health_black\""+">");
+						swap = swap/Constants.GB;
+						String displaySwap = formatter.format(swap);
+						if(swap > 0) {
+							sb.append("<span class=\"health_red\""+">");
+						}
+						else {
+							sb.append("<span class=\"health_black\""+">");
+						}
+						sb.append(displaySwap);
+						sb.append("</span>");
 					}
-					sb.append(displaySwap);
-					sb.append("</span>");
 				}
 				else {
 					double swap = process.getSwapUsage();
-					swap = swap/Constants.GB;
-					String displaySwap = formatter.format(swap);
-					double swapMax = process.getSwapUsageMax();
-					swapMax = swapMax/Constants.GB;
-					String displaySwapMax = formatter.format(swapMax);
-					sb.append("<span title=\"max="+displaySwapMax+"\" align=\"right\" "+">");
-					if(swap > 0) {
-						sb.append("<span class=\"health_red\""+">");
+					if(swap < 0) {
+						sb.append("<span class=\"health_black\""+">");
+						sb.append(notAvailable);
+						sb.append("</span>");
 					}
 					else {
-						sb.append("<span class=\"health_black\""+">");
+						swap = swap/Constants.GB;
+						String displaySwap = formatter.format(swap);
+						double swapMax = process.getSwapUsageMax();
+						swapMax = swapMax/Constants.GB;
+						String displaySwapMax = formatter.format(swapMax);
+						sb.append("<span title=\"max="+displaySwapMax+"\" align=\"right\" "+">");
+						if(swap > 0) {
+							sb.append("<span class=\"health_red\""+">");
+						}
+						else {
+							sb.append("<span class=\"health_black\""+">");
+						}
+						sb.append(displaySwap);
+						sb.append("</span>");
 					}
-					sb.append(displaySwap);
-					sb.append("</span>");
-					sb.append("</span>");
 				}
 				break;
 			}
@@ -956,20 +978,35 @@
 		if(process != null) {
 			if(process.isComplete()) {
 				double rss = process.getResidentMemoryMax();
-				rss = rss/Constants.GB;
-				String displayRss = formatter.format(rss);
-				sb.append(displayRss);
+				if(rss < 0) {
+					sb.append("<span class=\"health_black\""+">");
+					sb.append(notAvailable);
+					sb.append("</span>");
+				}
+				else {
+					rss = rss/Constants.GB;
+					String displayRss = formatter.format(rss);
+					sb.append(displayRss);
+				}
+				
 			}
 			else {
 				double rss = process.getResidentMemory();
-				rss = rss/Constants.GB;
-				String displayRss = formatter.format(rss);
-				double rssMax = process.getResidentMemoryMax();
-				rssMax = rssMax/Constants.GB;
-				String displayRssMax = formatter.format(rssMax);
-				sb.append("<span title=\"max="+displayRssMax+"\" align=\"right\" "+">");
-				sb.append(displayRss);
-				sb.append("</span>");
+				if(rss < 0) {
+					sb.append("<span class=\"health_black\""+">");
+					sb.append(notAvailable);
+					sb.append("</span>");
+				}
+				else {
+					rss = rss/Constants.GB;
+					String displayRss = formatter.format(rss);
+					double rssMax = process.getResidentMemoryMax();
+					rssMax = rssMax/Constants.GB;
+					String displayRssMax = formatter.format(rssMax);
+					sb.append("<span title=\"max="+displayRssMax+"\" align=\"right\" "+">");
+					sb.append(displayRss);
+					sb.append("</span>");
+				}
 			}
 		}
 		return sb.toString();