UIMA-5891 Modified to force service termination on init failure. Also stop metrics route

git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/branches/uima-ducc-2.2.2-Patched@1843914 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
index 51c7eaa..39a741c 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
@@ -36,10 +36,15 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
@@ -97,6 +102,7 @@
 import org.apache.uima.ducc.transport.event.common.ITimeWindow;
 import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
 import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.apache.uima.internal.util.ArrayUtils;
 
 public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver {
   public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME);
@@ -193,6 +199,9 @@
   
   private String stateChangeEndpoint;
 
+  private ExecutorService cleanupExecutor = Executors.newCachedThreadPool();
+  
+  
   public void setStateChangeEndpoint(String stateChangeEndpoint) {
 	this.stateChangeEndpoint = stateChangeEndpoint;
   }
@@ -700,6 +709,7 @@
   public void takeDownProcessWithNoJob(ProcessLifecycleController lifecycleController,
           List<IDuccJobDeployment> jobDeploymentList) {
     String methodName = "takeDownProcessWithNoJob";
+    ScheduledExecutorService scheduledExecutorService = null;
     try {
       inventorySemaphore.acquire();
       List<IDuccProcess> purgeList = new ArrayList<IDuccProcess>();
@@ -746,7 +756,12 @@
                             + processEntry.getValue().getPID());
             processEntry.getValue().setReasonForStoppingProcess(
                     ReasonForStoppingProcess.JPHasNoActiveJob.toString());
-            lifecycleController.stopProcess(processEntry.getValue());
+            //lifecycleController.stopProcess(processEntry.getValue());
+            
+            // send SIGTERM to the ghost process. If process still standing
+            // after timeout, send SIGKILL
+            sigTermOnTimeoutSigKill(getManagedProcess(processEntry.getValue().getDuccId()));
+
           } else {
             // add process to purge list
             purgeList.add(processEntry.getValue());
@@ -765,9 +780,24 @@
 
     } finally {
       inventorySemaphore.release();
+      if ( scheduledExecutorService != null ) {
+         scheduledExecutorService.shutdownNow();
+      }
     }
   }
-
+  private ManagedProcess getManagedProcess(DuccId id) {
+      synchronized (monitor) {
+        for (ManagedProcess runningProcess : deployedProcesses) {
+          // Find ManagedProcess instance the DuccProcess
+          // instance is associated with
+          if (runningProcess.getDuccProcess().getDuccId()
+                  .equals(id)) {
+            return runningProcess;
+          }
+        }
+      }
+      return null;
+  }
   /**
    * Reconciles agent inventory with job processes sent by PM
    *
@@ -1331,34 +1361,18 @@
                     ReasonForStoppingProcess.FailedInitialization.toString());
       	      deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
               deployedProcess.setStopping();
-/*
-            deployedProcess.kill();
+
             logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:"
                     + duccEvent.getPid() + " Killing Process");
-            try {
-               super.getContext().stopRoute(duccEvent.getPid());
-           } catch( Exception e) {
-        	   logger.error(methodName, null, "Unable to stop Camel route for PID:"+duccEvent.getPid());
-           }
-            logger.info(methodName, null,
-                    "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
-                            + duccEvent.getPid() + ". Process Failed Initialization");
-            undeployProcess(processEntry.getValue());
-            */
+            sigTermOnTimeoutSigKill(deployedProcess);
+            		//getFutureResult(deployedProcess.getDuccProcess()));
           } else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) {
             deployedProcess.getDuccProcess().setReasonForStoppingProcess(
                     ReasonForStoppingProcess.InitializationTimeout.toString());
       	    deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
-            //deployedProcess.setStopping();
-
-            // Mark process for death. Doesnt actually kill the process
-
-            deployedProcess.kill();
             logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:"
                     + duccEvent.getPid() + " Killing Process");
-
-            undeployProcess(processEntry.getValue());
-
+            sigTermOnTimeoutSigKill(deployedProcess);
           }
           else if (duccEvent.getState().equals(ProcessState.Stopping)) {
         	  if ( duccEvent.getMessage() != null && duccEvent.getMessage().equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) {
@@ -1392,6 +1406,99 @@
     }
   }
 
+	private void sigTermOnTimeoutSigKill( ManagedProcess process) {
+		String methodName = "sigTermSigKill";
+		// Cleanup ghost process in a dedicated thread
+		Runnable processCleanupTask = new Runnable() {
+			public void run() {
+				/* ------------------------------------------------------------ */
+				// Code below sends SIGTERM to the ghost process, waits for it
+				// to die and if still standing the process is killed via kill -9
+				/* ------------------------------------------------------------ */
+				// Send SIGTERM to the ghost process
+				sigTerm(process.getPid(), process.getOwner());
+
+				// how long to wait for the process to die
+				long timeout = 60000; // default
+				try {
+					timeout = Integer.valueOf(configurationFactory.processStopTimeout);
+				} catch (NumberFormatException ee) {
+					// use default timeout
+				}
+				Future<?> future = process.getFuture();
+				if (future != null) {
+					try {
+						// this blocks until a process terminates or there is a timeout
+						future.get(timeout, TimeUnit.MILLISECONDS);
+					} catch (TimeoutException | ExecutionException | InterruptedException te) {
+						// Process is still standing after SIGTERM so take it down hard
+						sigKill(process.getPid(), process.getOwner());
+					}
+				} else {
+					logger.info(methodName, null,
+							">>>>> Unexpected process state. We expected Future object but none is available. This should not happen");
+					// should not happen, every running process has a future result
+					// object. In case of unthinkable just 'wack' the process
+					sigKill(process.getPid(), process.getOwner());
+				}
+			}
+		};
+		cleanupExecutor.execute(processCleanupTask);
+	}
+  private void sigTerm(String pid, String user) {
+	sendSignal(pid, user, SIGNAL.SIGTERM);
+  }
+	
+  private void sigKill(String pid, String user) {
+	sendSignal(pid, user, SIGNAL.SIGKILL);
+  }
+  private boolean useDuccling() {
+	// On non-windows check if we should spawn the process via ducc_ling
+	String useSpawn = System
+			.getProperty("ducc.agent.launcher.use.ducc_spawn");
+	if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+		return true;
+	}
+	return false;
+  }
+
+  private void sendSignal(String pid, String owner, SIGNAL signal ) {
+	  String methodName = "sendSignal";
+		try {
+			String duccling_path = Utils.resolvePlaceholderIfExists(
+					System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
+					System.getProperties());
+			
+			String[] cmdLine = { "/bin/kill"," ", signal.get()," "+ pid };
+
+			String[] duccling_nolog = new String[] { duccling_path, "-u",
+					owner,
+					"--" };
+			if (useDuccling()) {
+				cmdLine = (String[])ArrayUtils.combine(duccling_nolog,cmdLine);
+				//cmdLine = Utils.concatAllArrays(duccling_nolog,cmdLine);
+			} 
+			StringBuilder sb = new StringBuilder();
+			for( String cmdPart : cmdLine) {
+				sb.append(cmdPart).append(" ");
+			}
+			logger.info(methodName, null, ">>> Stopping process with cmd "+sb.toString());
+			ProcessBuilder pb = new ProcessBuilder(cmdLine);
+			Process process = pb.start();
+			// try-with-resources. Closes InputStream when done or error
+			try (InputStream is = process.getInputStream() ) {
+				InputStreamReader streamReader = new InputStreamReader(is);
+				BufferedReader br = new BufferedReader(streamReader);
+				String line;
+				while ((line = br.readLine()) != null) {
+					logger.info(methodName, null, line);
+				}
+				process.waitFor();
+			}
+		} catch (Exception e) {
+			logger.error(methodName, null, e);
+		}
+  }
   /**
    * Deploys process using supplied command line
    *
@@ -1580,6 +1687,7 @@
     	  }
 
       }
+
   /**
    * Kills a given process
    *
@@ -2007,6 +2115,7 @@
 	    logger.info("stop", null, "Reaper thread finished - calling super.stop()");
 	  }
 */
+	    cleanupExecutor.shutdown();
 	    stopLock.wait(2000);
 	    super.stop();
 	    logger.info("stop", null, "Reaper thread finished - calling super.stop()");
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
index 6a2f7f5..4172592 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.Future;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.Route;
 import org.apache.uima.ducc.agent.NodeAgent;
 import org.apache.uima.ducc.agent.launcher.ManagedProcess;
 import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector;
@@ -349,11 +350,45 @@
 			   || 
 			   process.getProcessState().equals(ProcessState.Running);
 	}
+
+	private void stopRoute() throws Exception {
+		String methodName = "stopRoute";
+		if (process.getPID() != null && agent.getContext().getRoute(process.getPID()) != null) {
+			try {
+				// stop collecting process stats 
+				agent.getContext().stopRoute(process.getPID());
+			} catch (Exception e) {
+				logger.error(methodName, null, "....Unable to stop Camel route for PID:" + process.getPID());
+			}
+			// remove route from context, otherwise the routes accumulate over time causing
+			// memory leak
+			agent.getContext().removeRoute(process.getPID());
+			StringBuilder sb = new StringBuilder("\n");
+			logger.info(methodName, null, "Removed Camel Route from Context for PID:" + process.getPID());
+
+			for (Route route : agent.getContext().getRoutes()) {
+				sb.append("Camel Context - RouteId:" + route.getId() + "\n");
+			}
+			logger.info(methodName, null, sb.toString());
+		}
+	}
 	public void process(Exchange e) {
 		// if process is stopping or already dead dont collect metrics. The
 		// Camel route has just been stopped.
 		if (closed || !processIsActive()) {
  		    logger.info("LinuxProcessMetricsProcessor.process",	null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Returning");	
+ 		    Thread t = new Thread( 
+ 		    		 new Runnable() {
+ 		    		      public void run() {
+ 		    		 		  try {
+ 		    		 			stopRoute();
+ 		    		 		  } catch( Exception ex) {
+ 		    		 			logger.error("process", null, ex);  
+ 		    		 		  }
+ 		    		    	  
+ 		    		      }
+ 		    		});
+ 		    t.start();
  		    return;
 		}
 		try {