blob: de569c23baa445e1fedbd7126c1df5cb324f5b77 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.launcher;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.cmdline.ICommandLine;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public abstract class CommandExecutor implements Callable<Process> {
protected Process managedProcess = null;
protected String host;
protected String ip;
protected ICommandLine cmdLine;
protected NodeAgent agent;
public abstract void stop() throws Exception;
public abstract Process exec(ICommandLine commandLine, Map<String, String> processEnv)
throws Exception;
public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host, String ip,
Process managedProcess) throws Exception {
this.agent = agent;
this.host = host;
this.ip = ip;
this.managedProcess = managedProcess;
this.cmdLine = cmdLine;
}
/**
* Called after process is launched. Assign PID, state and finally drain process streams.
*
* @param process
* - launched process
*/
protected void postExecStep(java.lang.Process process, DuccLogger logger, boolean isKillCmd) {
String methodName = "postExecStep";
if (!isKillCmd) {
int pid = Utils.getPID(process);
if (pid != -1) {
((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
((ManagedProcess) managedProcess).getDuccProcess().setPID(String.valueOf(pid));
/*
* boolean isAPorJD = ((ManagedProcess) managedProcess).isJd() || ((ManagedProcess)
* managedProcess).getDuccProcess() .getProcessType().equals(ProcessType.Pop);
*/
// JDs and APs dond't report internal status to the agent
// (initializing or running) so assume these start and enter
// Running state
/*
* if (isAPorJD && !((ManagedProcess) managedProcess).getDuccProcess()
* .getProcessState().equals(ProcessState.Stopped)) { ((ManagedProcess)
* managedProcess).getDuccProcess() .setProcessState(ProcessState.Running); }
*/
if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Stopped)
|| !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Stopping)
|| !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Failed)
|| !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.FailedInitialization)) {
((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Started);
}
((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
((ManagedProcess) managedProcess).getDuccProcess().setPID(String.valueOf(pid));
try {
synchronized (this) {
// wait for 5 seconds before starting the camel route
// responsible for collecting process related stats.
// Allow
// enough time for the process to start.
wait(5000);
}
logger.info(methodName, null, ">>>>>>>>> PID:" + String.valueOf(pid) + " Process State:"
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Stopped)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Stopping)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Failed)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.FailedInitialization)) {
RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
((ManagedProcess) managedProcess).getDuccProcess(),
(ManagedProcess) managedProcess);
agent.getContext().addRoutes(rb);
agent.getContext().startRoute(String.valueOf(pid));
logger.info(methodName, null,
"Started Process Metric Gathering Thread For PID:" + String.valueOf(pid));
StringBuffer sb = new StringBuffer();
for (Route route : agent.getContext().getRoutes()) {
sb.append("Camel Context - RouteId:" + route.getId() + "\n");
}
logger.info(methodName, null, sb.toString());
logger.info(methodName, null,
"Started Process Metric Gathering Thread For PID:" + String.valueOf(pid));
}
/*
* RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent, ((ManagedProcess)
* managedProcess).getDuccProcess(), (ManagedProcess) managedProcess);
* agent.getContext().addRoutes(rb); agent.getContext().startRoute(String.valueOf(pid));
* logger.info(methodName, null, "Started Process Metric Gathering Thread For PID:" +
* String.valueOf(pid));
*
* StringBuffer sb = new StringBuffer(); for (Route route :
* agent.getContext().getRoutes()) { sb.append("Camel Context - RouteId:" + route.getId()
* + "\n"); } logger.info(methodName, null, sb.toString());
*
* logger.info(methodName, null, "Started Process Metric Gathering Thread For PID:" +
* String.valueOf(pid));
*/
} catch (Exception e) {
logger.error("postExecStep", null, e);
}
}
}
// Drain process streams in dedicated threads.
((ManagedProcess) managedProcess).drainProcessStreams(process, logger, System.out, isKillCmd);
}
/**
* Called by Executor to exec a process.
*/
public Process call() throws Exception {
Process deployedProcess = null;
try {
// ICommandLine commandLine = ((ManagedProcess)
// managedProcess).getCommandLine();
Map<String, String> env = new HashMap<String, String>();
if (!isKillCommand(cmdLine)) {
// UIMA-4935 Moved setting of JpUniqueId to DuccComamndExecutor where the cmdLine is not
// shared
// Enrich environment for the new process. Via these settings
// the UIMA AS
// service wrapper can notify the agent of its state.
env.put(IDuccUser.EnvironmentVariable.DUCC_IP.value(), ip);
env.put(IDuccUser.EnvironmentVariable.DUCC_NODENAME.value(), host);
// Add "friendly" process name for coordination with JD and OR
env.put(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value(),
((ManagedProcess) managedProcess).getDuccId().getFriendly() + "");
// Add unique process id. The process will send this along with its state update
env.put(IDuccUser.EnvironmentVariable.DUCC_PROCESS_UNIQUEID.value(),
((ManagedProcess) managedProcess).getDuccId().getUnique());
if (((ManagedProcess) managedProcess).getDuccProcess().getProcessType()
.equals(ProcessType.Job_Uima_AS_Process)) {
IDuccStandardInfo processInfo = ((ManagedProcess) managedProcess).getProcessInfo();
long maxInitTime = 0;
if (processInfo != null) {
maxInitTime = processInfo.getProcessInitializationTimeMax();
}
agent.getLogger().info("CommandExecutor.call",
((ManagedProcess) managedProcess).getWorkDuccId(),
"Starting Process Initialization Monitor with Max Process Initialization Time:"
+ maxInitTime);
((ManagedProcess) managedProcess).startInitializationTimer(maxInitTime);
}
}
deployedProcess = exec(cmdLine, env);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (((ManagedProcess) managedProcess).getDuccProcess().getProcessType()
.equals(ProcessType.Job_Uima_AS_Process)) {
((ManagedProcess) managedProcess).stopInitializationTimer();
}
}
return deployedProcess;
}
protected boolean isKillCommand(ICommandLine cmdLine) {
return (cmdLine.getExecutable() != null && (cmdLine.getExecutable().startsWith("/bin/kill")
|| cmdLine.getExecutable().startsWith("taskkill")));
}
}