UIMA-5891 Modified to force service termination on init failure. Also stop metrics route
git-svn-id: https://svn.apache.org/repos/asf/uima/uima-ducc/branches/uima-ducc-2.2.2-Patched@1843914 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
index 51c7eaa..39a741c 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/NodeAgent.java
@@ -36,10 +36,15 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
@@ -97,6 +102,7 @@
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
+import org.apache.uima.internal.util.ArrayUtils;
public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver {
public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME);
@@ -193,6 +199,9 @@
private String stateChangeEndpoint;
+ private ExecutorService cleanupExecutor = Executors.newCachedThreadPool();
+
+
public void setStateChangeEndpoint(String stateChangeEndpoint) {
this.stateChangeEndpoint = stateChangeEndpoint;
}
@@ -700,6 +709,7 @@
public void takeDownProcessWithNoJob(ProcessLifecycleController lifecycleController,
List<IDuccJobDeployment> jobDeploymentList) {
String methodName = "takeDownProcessWithNoJob";
+ ScheduledExecutorService scheduledExecutorService = null;
try {
inventorySemaphore.acquire();
List<IDuccProcess> purgeList = new ArrayList<IDuccProcess>();
@@ -746,7 +756,12 @@
+ processEntry.getValue().getPID());
processEntry.getValue().setReasonForStoppingProcess(
ReasonForStoppingProcess.JPHasNoActiveJob.toString());
- lifecycleController.stopProcess(processEntry.getValue());
+ //lifecycleController.stopProcess(processEntry.getValue());
+
+ // send SIGTERM to the ghost process. If process still standing
+ // after timeout, send SIGKILL
+ sigTermOnTimeoutSigKill(getManagedProcess(processEntry.getValue().getDuccId()));
+
} else {
// add process to purge list
purgeList.add(processEntry.getValue());
@@ -765,9 +780,24 @@
} finally {
inventorySemaphore.release();
+ if ( scheduledExecutorService != null ) {
+ scheduledExecutorService.shutdownNow();
+ }
}
}
-
+ private ManagedProcess getManagedProcess(DuccId id) {
+ synchronized (monitor) {
+ for (ManagedProcess runningProcess : deployedProcesses) {
+ // Find ManagedProcess instance the DuccProcess
+ // instance is associated with
+ if (runningProcess.getDuccProcess().getDuccId()
+ .equals(id)) {
+ return runningProcess;
+ }
+ }
+ }
+ return null;
+ }
/**
* Reconciles agent inventory with job processes sent by PM
*
@@ -1331,34 +1361,18 @@
ReasonForStoppingProcess.FailedInitialization.toString());
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
-/*
- deployedProcess.kill();
+
logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:"
+ duccEvent.getPid() + " Killing Process");
- try {
- super.getContext().stopRoute(duccEvent.getPid());
- } catch( Exception e) {
- logger.error(methodName, null, "Unable to stop Camel route for PID:"+duccEvent.getPid());
- }
- logger.info(methodName, null,
- "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
- + duccEvent.getPid() + ". Process Failed Initialization");
- undeployProcess(processEntry.getValue());
- */
+ sigTermOnTimeoutSigKill(deployedProcess);
+ //getFutureResult(deployedProcess.getDuccProcess()));
} else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) {
deployedProcess.getDuccProcess().setReasonForStoppingProcess(
ReasonForStoppingProcess.InitializationTimeout.toString());
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
- //deployedProcess.setStopping();
-
- // Mark process for death. Doesnt actually kill the process
-
- deployedProcess.kill();
logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:"
+ duccEvent.getPid() + " Killing Process");
-
- undeployProcess(processEntry.getValue());
-
+ sigTermOnTimeoutSigKill(deployedProcess);
}
else if (duccEvent.getState().equals(ProcessState.Stopping)) {
if ( duccEvent.getMessage() != null && duccEvent.getMessage().equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) {
@@ -1392,6 +1406,99 @@
}
}
+ private void sigTermOnTimeoutSigKill( ManagedProcess process) {
+ String methodName = "sigTermSigKill";
+ // Cleanup ghost process in a dedicated thread
+ Runnable processCleanupTask = new Runnable() {
+ public void run() {
+ /* ------------------------------------------------------------ */
+ // Code below sends SIGTERM to the ghost process, waits for it
+ // to die and if still standing the process is killed via kill -9
+ /* ------------------------------------------------------------ */
+ // Send SIGTERM to the ghost process
+ sigTerm(process.getPid(), process.getOwner());
+
+ // how long to wait for the process to die
+ long timeout = 60000; // default
+ try {
+ timeout = Integer.valueOf(configurationFactory.processStopTimeout);
+ } catch (NumberFormatException ee) {
+ // use default timeout
+ }
+ Future<?> future = process.getFuture();
+ if (future != null) {
+ try {
+ // this blocks until a process terminates or there is a timeout
+ future.get(timeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException | ExecutionException | InterruptedException te) {
+ // Process is still standing after SIGTERM so take it down hard
+ sigKill(process.getPid(), process.getOwner());
+ }
+ } else {
+ logger.info(methodName, null,
+ ">>>>> Unexpected process state. We expected Future object but none is available. This should not happen");
+ // should not happen, every running process has a future result
+ // object. In case of unthinkable just 'wack' the process
+ sigKill(process.getPid(), process.getOwner());
+ }
+ }
+ };
+ cleanupExecutor.execute(processCleanupTask);
+ }
+ private void sigTerm(String pid, String user) {
+ sendSignal(pid, user, SIGNAL.SIGTERM);
+ }
+
+ private void sigKill(String pid, String user) {
+ sendSignal(pid, user, SIGNAL.SIGKILL);
+ }
+ private boolean useDuccling() {
+ // On non-windows check if we should spawn the process via ducc_ling
+ String useSpawn = System
+ .getProperty("ducc.agent.launcher.use.ducc_spawn");
+ if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
+ return true;
+ }
+ return false;
+ }
+
+ private void sendSignal(String pid, String owner, SIGNAL signal ) {
+ String methodName = "sendSignal";
+ try {
+ String duccling_path = Utils.resolvePlaceholderIfExists(
+ System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
+ System.getProperties());
+
+ String[] cmdLine = { "/bin/kill"," ", signal.get()," "+ pid };
+
+ String[] duccling_nolog = new String[] { duccling_path, "-u",
+ owner,
+ "--" };
+ if (useDuccling()) {
+ cmdLine = (String[])ArrayUtils.combine(duccling_nolog,cmdLine);
+ //cmdLine = Utils.concatAllArrays(duccling_nolog,cmdLine);
+ }
+ StringBuilder sb = new StringBuilder();
+ for( String cmdPart : cmdLine) {
+ sb.append(cmdPart).append(" ");
+ }
+ logger.info(methodName, null, ">>> Stopping process with cmd "+sb.toString());
+ ProcessBuilder pb = new ProcessBuilder(cmdLine);
+ Process process = pb.start();
+ // try-with-resources. Closes InputStream when done or error
+ try (InputStream is = process.getInputStream() ) {
+ InputStreamReader streamReader = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(streamReader);
+ String line;
+ while ((line = br.readLine()) != null) {
+ logger.info(methodName, null, line);
+ }
+ process.waitFor();
+ }
+ } catch (Exception e) {
+ logger.error(methodName, null, e);
+ }
+ }
/**
* Deploys process using supplied command line
*
@@ -1580,6 +1687,7 @@
}
}
+
/**
* Kills a given process
*
@@ -2007,6 +2115,7 @@
logger.info("stop", null, "Reaper thread finished - calling super.stop()");
}
*/
+ cleanupExecutor.shutdown();
stopLock.wait(2000);
super.stop();
logger.info("stop", null, "Reaper thread finished - calling super.stop()");
diff --git a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
index 6a2f7f5..4172592 100644
--- a/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
+++ b/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/LinuxProcessMetricsProcessor.java
@@ -24,6 +24,7 @@
import java.util.concurrent.Future;
import org.apache.camel.Exchange;
+import org.apache.camel.Route;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector;
@@ -349,11 +350,45 @@
||
process.getProcessState().equals(ProcessState.Running);
}
+
+ private void stopRoute() throws Exception {
+ String methodName = "stopRoute";
+ if (process.getPID() != null && agent.getContext().getRoute(process.getPID()) != null) {
+ try {
+ // stop collecting process stats
+ agent.getContext().stopRoute(process.getPID());
+ } catch (Exception e) {
+ logger.error(methodName, null, "....Unable to stop Camel route for PID:" + process.getPID());
+ }
+ // remove route from context, otherwise the routes accumulate over time causing
+ // memory leak
+ agent.getContext().removeRoute(process.getPID());
+ StringBuilder sb = new StringBuilder("\n");
+ logger.info(methodName, null, "Removed Camel Route from Context for PID:" + process.getPID());
+
+ for (Route route : agent.getContext().getRoutes()) {
+ sb.append("Camel Context - RouteId:" + route.getId() + "\n");
+ }
+ logger.info(methodName, null, sb.toString());
+ }
+ }
public void process(Exchange e) {
// if process is stopping or already dead dont collect metrics. The
// Camel route has just been stopped.
if (closed || !processIsActive()) {
logger.info("LinuxProcessMetricsProcessor.process", null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Returning");
+ Thread t = new Thread(
+ new Runnable() {
+ public void run() {
+ try {
+ stopRoute();
+ } catch( Exception ex) {
+ logger.error("process", null, ex);
+ }
+
+ }
+ });
+ t.start();
return;
}
try {