blob: 564226f3c98245bca8045da1f82393307a17141a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.launcher;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.cmdline.ICommandLine;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public abstract class CommandExecutor implements Callable<Process> {
protected Process managedProcess = null;
protected String host;
protected String ip;
protected ICommandLine cmdLine;
protected NodeAgent agent;
public abstract void stop() throws Exception;
public abstract Process exec(ICommandLine commandLine,
Map<String, String> processEnv) throws Exception;
public CommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host,
String ip, Process managedProcess) throws Exception {
this.agent = agent;
this.host = host;
this.ip = ip;
this.managedProcess = managedProcess;
this.cmdLine = cmdLine;
}
/**
* Called after process is launched. Assign PID, state and finally drain
* process streams.
*
* @param process
* - launched process
*/
protected void postExecStep(java.lang.Process process, DuccLogger logger,
boolean isKillCmd) {
String methodName="postExecStep";
if (!isKillCmd) {
int pid = Utils.getPID(process);
if (pid != -1) {
((ManagedProcess) managedProcess).setPid(String.valueOf(pid));
boolean isAPorJD = ((ManagedProcess) managedProcess).isJd() ||
((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop);
// JDs and APs dond't report internal status to the agent (initializing or running) so assume these start and enter Running state
if (isAPorJD && !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Running);
}
logger.info(methodName,null,
">>>>>>>>> PID:"+String.valueOf(pid)+" Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
try {
synchronized(this) {
// wait for 5 seconds before starting the camel route
// responsible for collecting process related stats. Allow
// enough time for the process to start.
wait(5000);
}
RouteBuilder rb = agent.new ProcessMemoryUsageRoute(agent,
((ManagedProcess) managedProcess).getDuccProcess(),(ManagedProcess) managedProcess);
agent.getContext().addRoutes(rb);
agent.getContext().startRoute(String.valueOf(pid));
logger.info(methodName,null,
"Started Process Metric Gathering Thread For PID:"+String.valueOf(pid));
StringBuffer sb = new StringBuffer();
for ( Route route : agent.getContext().getRoutes() ) {
sb.append("Camel Context - RouteId:"+route.getId()+"\n");
}
logger.info(methodName, null,sb.toString());
logger.info(methodName, null,"Started Process Metric Gathering Thread For PID:"+String.valueOf(pid));
} catch( Exception e) {
logger.error("postExecStep", null, e);
}
}
}
// Drain process streams in dedicated threads.
((ManagedProcess) managedProcess).drainProcessStreams(process, logger,System.out, isKillCmd);
}
/**
* Called by Executor to exec a process.
*/
public Process call() throws Exception {
Process deployedProcess = null;
try {
// ICommandLine commandLine = ((ManagedProcess)
// managedProcess).getCommandLine();
Map<String, String> env = new HashMap<String, String>();
if (!isKillCommand(cmdLine)) {
// UIMA-4935 Moved setting of JpUniqueId to DuccComamndExecutor where the cmdLine is not shared
// Enrich environment for the new process. Via these settings
// the UIMA AS
// service wrapper can notify the agent of its state.
env.put(IDuccUser.EnvironmentVariable.DUCC_IP.value(), ip);
env.put(IDuccUser.EnvironmentVariable.DUCC_NODENAME.value(), host);
// Add "friendly" process name for coordination with JD and OR
env.put(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value(), ((ManagedProcess) managedProcess)
.getDuccId().getFriendly()+"");
if (((ManagedProcess) managedProcess).getDuccProcess()
.getProcessType()
.equals(ProcessType.Job_Uima_AS_Process)) {
IDuccStandardInfo processInfo = ((ManagedProcess) managedProcess)
.getProcessInfo();
long maxInitTime = 0;
if (processInfo != null) {
maxInitTime = processInfo
.getProcessInitializationTimeMax();
}
agent.getLogger().info("CommandExecutor.call",
((ManagedProcess) managedProcess).getWorkDuccId(),
"Starting Process Initialization Monitor with Max Process Initialization Time:" + maxInitTime);
((ManagedProcess) managedProcess)
.startInitializationTimer(maxInitTime);
}
}
deployedProcess = exec(cmdLine, env);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (((ManagedProcess) managedProcess).getDuccProcess()
.getProcessType().equals(ProcessType.Job_Uima_AS_Process)) {
((ManagedProcess) managedProcess).stopInitializationTimer();
}
}
return deployedProcess;
}
protected boolean isKillCommand(ICommandLine cmdLine) {
return (cmdLine.getExecutable() != null && (cmdLine.getExecutable()
.startsWith("/bin/kill") || cmdLine.getExecutable().startsWith(
"taskkill")));
}
}