blob: e89236358f7f50e607c37b281e883f048e4d04aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.launcher;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.container.FlagsHelper;
import org.apache.uima.ducc.common.exception.DuccRuntimeException;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.TimeStamp;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
import org.apache.uima.ducc.transport.cmdline.ACommandLine;
import org.apache.uima.ducc.transport.cmdline.ICommandLine;
import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
import org.apache.uima.ducc.user.common.PrivateClassLoader;
public class DuccCommandExecutor extends CommandExecutor {
DuccLogger logger = DuccLogger.getLogger(this.getClass(), NodeAgent.COMPONENT_NAME);
@SuppressWarnings("unused")
private static AtomicInteger nextPort = new AtomicInteger(30000);
public DuccCommandExecutor(NodeAgent agent, ICommandLine cmdLine, String host, String ip,
Process managedProcess) throws Exception {
super(agent, cmdLine, host, ip, managedProcess);
}
public DuccCommandExecutor(ICommandLine cmdLine, String host, String ip, Process managedProcess)
throws Exception {
super(null, cmdLine, host, ip, managedProcess);
}
private boolean useDuccSpawn() {
if (super.managedProcess.isAgentProcess() || Utils.isWindows()) {
return false;
}
// On non-windows check if we should spawn the process via ducc_ling
String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
return true;
}
// default
return false;
}
private boolean createCGroupContainer(IDuccProcess duccProcess, String containerId, String owner)
throws Exception {
// create cgroups container and assign limits
if (agent.cgroupsManager.createContainer(containerId, System.getProperty("user.name"),
agent.cgroupsManager.getUserGroupName(System.getProperty("user.name")),
useDuccSpawn())) {
logger.info("createCGroupContainer", null,
"Calculating CPU shares \nProcess Max Memory="
+ duccProcess.getCGroup().getMaxMemoryLimit() + "\nNode Memory Total="
+ agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal());
long cpuShares = duccProcess.getCGroup().getMaxMemoryLimit()
/ agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal();
logger.info("createCGroupContainer", null, "\nCalculated Shares=" + cpuShares);
agent.cgroupsManager.setContainerCpuShares(containerId, owner, useDuccSpawn(), cpuShares);
long swappiness = 10; // default
if (agent.configurationFactory.nodeSwappiness != null) {
swappiness = Long.valueOf(agent.configurationFactory.nodeSwappiness);
}
agent.cgroupsManager.setContainerSwappiness(containerId, owner, useDuccSpawn(), swappiness);
return agent.cgroupsManager.setContainerMaxMemoryLimit(containerId, owner, useDuccSpawn(),
duccProcess.getCGroup().getMaxMemoryLimit());
}
return false;
}
private String getContainerId() {
String containerId;
if (((ManagedProcess) super.managedProcess).getDuccProcess().getProcessType()
.equals(ProcessType.Service)) {
containerId = String
.valueOf(((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId());
} else {
containerId = ((ManagedProcess) managedProcess).getWorkDuccId().getFriendly() + "."
+ ((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId();
}
return containerId;
}
public Process exec(ICommandLine cmdLine, Map<String, String> processEnv) throws Exception {
String methodName = "exec";
try {
String[] cmd = getDeployableCommandLine(cmdLine, processEnv);
if (isKillCommand(cmdLine)) {
logger.debug(methodName, null, "Killing process");
stopProcess(cmdLine, cmd);
} else {
IDuccProcess duccProcess = ((ManagedProcess) managedProcess).getDuccProcess();
// If running a real agent on a node, collect swap info and
// assign max swap usage threshold
// for each process. In virtual mode, where there are multiple
// agents per node, we dont
// set nor enforce swap limits.
if (!agent.isVirtual()) {
// Calculate how much swap space the process is allowed to
// use. The calculation is based on
// the percentage of real memory the process is assigned.
// The process is entitled the
// same percentage of the swap.
// Normalize node's total memory as it is expressed in KB.
// The calculation below is based on bytes.
double percentOfTotal = ((double) duccProcess.getCGroup().getMaxMemoryLimit())
/ (agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() * 1024); // need
// bytes
// substract 1Gig from total swap on this node to
// accommodate OS needs for swapping. The
// getSwapTotal() returns swap space in KBs so normalize
// 1Gig
long adjustedTotalSwapAvailable = agent.getNodeInfo().getNodeMetrics().getNodeMemory()
.getSwapTotal() - 1048576;
// calculate the portion (in bytes) of swap this process is
// entitled to
long maxProcessSwapUsage = (long) (adjustedTotalSwapAvailable * percentOfTotal) * 1024;
// assigned how much swap this process is entitled to. If it
// exceeds this number the Agent
// will kill the process.
((ManagedProcess) managedProcess).setMaxSwapThreshold(maxProcessSwapUsage);
logger.info(methodName, null, "---Process DuccId:" + duccProcess.getDuccId()
+ " CGroup.getMaxMemoryLimit():"
+ ((duccProcess.getCGroup().getMaxMemoryLimit() / 1024) / 1024) + " MBs"
+ " Node Memory Total:"
+ (agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() / 1024)
+ " MBs" + " Percentage Of Real Memory:" + percentOfTotal
+ " Adjusted Total Swap Available On Node:" + adjustedTotalSwapAvailable / 1024
+ " MBs" + " Process Entitled To Max:" + (maxProcessSwapUsage / 1024) / 1024
+ " MBs of Swap");
// logger.info(methodName, null,
// "The Process With ID:"+duccProcess.getDuccId()+" is Entitled to the Max "+(
// (maxProcessSwapUsage/1024)/1024)+" Megs of Swap Space");
// if configured to use cgroups and the process is the
// cgroup owner, create a cgroup
// using Process DuccId as a name. Additional processes may
// be injected into the
// cgroup by declaring cgroup owner id.
if (agent.useCgroups) {
// JDs are of type Pop (Plain Old Process). JDs run in a
// reservation. The cgroup container
// is created for the reservation and we co-locate as
// many JDs as we can fit in it.
// String containerId = ((ManagedProcess)
// managedProcess).getWorkDuccId()+"."+duccProcess.getCGroup().getId().getFriendly();
String containerId = getContainerId();
logger.info(methodName, null, "Checking for CGroup Existance with ID:" + containerId);
if (!agent.cgroupsManager.cgroupExists(
agent.cgroupsManager.getDuccCGroupBaseDir() + "/" + containerId)) {
logger.info(methodName, null, "No CGroup with ID:" + containerId + " Found");
boolean failed = false;
// create cgroup container for JDs
try {
if (createCGroupContainer(duccProcess, containerId,
((ManagedProcess) super.managedProcess).getOwner())) {
logger.info(
methodName, null, "Created CGroup with ID:" + containerId
+ " With Memory Limit=" + ((ManagedProcess) super.managedProcess)
.getDuccProcess().getCGroup().getMaxMemoryLimit()
+ " Bytes");
} else {
logger.info(methodName, null, "Failed To Create CGroup with ID:" + containerId);
duccProcess.setProcessState(ProcessState.Failed);
duccProcess.setReasonForStoppingProcess("CGroupCreationFailed");
failed = true;
// agent.stop();
}
} catch (Exception e) {
logger.error(methodName, null, e);
failed = true;
duccProcess.setProcessState(ProcessState.Failed);
duccProcess.setReasonForStoppingProcess("CGroupCreationFailed");
// agent.stop();
}
if (failed) {
logger.error(methodName, null,
new RuntimeException(
"The Agent is Unable To Create A CGroup with Container ID: "
+ containerId + ". Rejecting Deployment of Process with ID:"
+ duccProcess.getDuccId()));
return managedProcess;
}
} else {
logger.info(methodName, null, "CGroup Exists with ID:" + containerId);
}
String[] cgroupCmd = new String[cmd.length + 3];
cgroupCmd[0] = agent.cgroupsManager.getCGroupsUtilsDir() + "/cgexec";
cgroupCmd[1] = "-g";
cgroupCmd[2] = agent.cgroupsManager.getSubsystems() + containerId; // UIMA-5405
// subsystems
// includes the
// "ducc" id
int inx = 3;
for (String cmdPart : cmd) {
cgroupCmd[inx++] = cmdPart;
}
startProcess(cmdLine, cgroupCmd, processEnv);
} else {
// Not configured to use CGroups
startProcess(cmdLine, cmd, processEnv);
}
} else {
// dont use CGroups on virtual agents
startProcess(cmdLine, cmd, processEnv);
}
}
return managedProcess;
} catch (Throwable e) {
if (((ManagedProcess) super.managedProcess).getDuccProcess() != null) {
DuccId duccId = ((ManagedProcess) super.managedProcess).getDuccId();
logger.error(methodName, duccId,
((ManagedProcess) super.managedProcess).getDuccProcess().getDuccId(), e,
new Object[] {});
}
logger.error(methodName, null, e);
throw e;
}
}
private boolean processInRunningOrInitializingState() {
return (((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Running)
|| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Initializing)
|| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Starting)
|| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Stopping)
|| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Started));
}
private void stopProcess(ICommandLine cmdLine, String[] cmd) throws Exception {
String methodName = "stopProcess";
if (processInRunningOrInitializingState()) {
Future<?> future = ((ManagedProcess) managedProcess).getFuture();
if (future == null) {
throw new Exception("Future Object not Found. Unable to Stop Process with PID:"
+ ((ManagedProcess) managedProcess).getPid());
}
// for stop to work, PID must be provided
if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null
|| ((ManagedProcess) managedProcess).getDuccProcess().getPID().trim().length() == 0) {
throw new Exception("Process Stop Command Failed. PID not provided.");
}
try {
// NEW Code
logger.debug(methodName, ((ManagedProcess) super.managedProcess).getDuccId(),
">>>>>>>>>>>>>>> Stopping Process:" + ((ManagedProcess) managedProcess).getPid());
ICommandLine cmdL;
if (Utils.isWindows()) {
cmdL = new NonJavaCommandLine("taskkill");
cmdL.addArgument("/PID");
} else {
cmdL = new NonJavaCommandLine("/bin/kill");
cmdL.addArgument("-15");
}
cmdL.addArgument(((ManagedProcess) managedProcess).getDuccProcess().getPID());
String[] sigTermCmdLine = getDeployableCommandLine(cmdL, new HashMap<String, String>());
doExec(new ProcessBuilder(sigTermCmdLine), sigTermCmdLine, true);
// if agent receives admin STOP request, all managed processes should
// be stopped without each waiting for 60 secs. The agent
// blasts SIGTERM in parallel to all running child processes
if (!StopPriority.DONT_WAIT.equals(((ManagedProcess) managedProcess).getStopPriority())) {
long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
if (super.agent.configurationFactory.processStopTimeout != null) {
maxTimeToWaitForProcessToStop = Long
.valueOf(super.agent.configurationFactory.processStopTimeout);
}
try {
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(),
"------------ Agent Starting Killer Timer Task For Process with PID:"
+ ((ManagedProcess) managedProcess).getDuccProcess().getPID()
+ " Process State: "
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState());
future.get(maxTimeToWaitForProcessToStop, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
if (!((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Stopped)) {
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(),
"------------ Agent Timed-out Waiting for Process with PID:"
+ ((ManagedProcess) managedProcess).getDuccProcess().getPID()
+ " to Stop. Process State:"
+ ((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
+ " .Process did not stop in allotted time of "
+ maxTimeToWaitForProcessToStop + " millis");
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(),
">>>>>>>>>>>>>>> Killing Process:"
+ ((ManagedProcess) managedProcess).getDuccProcess().getPID()
+ " .Process State:" + ((ManagedProcess) managedProcess)
.getDuccProcess().getProcessState());
doExec(new ProcessBuilder(cmd), cmd, true);
}
}
}
} catch (Exception e) { // InterruptedException, ExecutionException
logger.error(methodName, ((ManagedProcess) super.managedProcess).getDuccId(), e,
new Object[] {});
}
}
}
private void startProcess(ICommandLine cmdLine, String[] cmd, Map<String, String> processEnv)
throws Exception {
String methodName = "startProcess";
ProcessBuilder pb = new ProcessBuilder(cmd);
Map<String, String> env = pb.environment();
// Dont enherit agent's environment
env.clear();
// enrich Process environment
env.putAll(processEnv);
if (cmdLine instanceof ACommandLine) {
// enrich Process environment with one from a given command line
env.putAll(((ACommandLine) cmdLine).getEnvironment());
}
if (logger.isTrace()) {
for (Entry<String, String> entry : env.entrySet()) {
String message = "key:" + entry.getKey() + " " + "value:" + entry.getValue();
logger.trace(methodName, ((ManagedProcess) super.managedProcess).getDuccId(), message);
}
}
try {
doExec(pb, cmd, isKillCommand(cmdLine));
} catch (Exception e) {
throw e;
} finally {
// millis = TimeStamp.getCurrentMillis();
// twr.setEnd(millis);
}
}
/**
* Checks if a given process is AP. The code checks if process type is POP and it is *not* JD
*
* @param process
* - process instance
* @return - true if AP, false otherwise
*/
private boolean isAP(ManagedProcess process) {
if (!process.isJd() && process.getDuccProcess().getProcessType().equals(ProcessType.Pop)) {
return true;
} else {
return false;
}
}
public void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd) throws Exception {
String methodName = "doExec";
int exitCode = 0;
boolean failed = false;
try {
StringBuilder sb = new StringBuilder(
(isKillCommand(cmdLine) ? "--->Killing Process " : "---> Launching Process:")
+ " Using command line:");
int inx = 0;
for (String cmdPart : cmd) {
sb.append("\n\t[").append(inx++).append("]")
.append(Utils.resolvePlaceholderIfExists(cmdPart, System.getProperties()));
// Not sure why place-holders are replaced just for this msg?
}
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(), sb.toString());
java.lang.Process process = pb.start();
// Drain process streams
postExecStep(process, logger, isKillCmd);
// block waiting for the process to terminate.
exitCode = process.waitFor();
if (!isKillCommand(cmdLine)) {
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(),
">>>>>>>>>>>>> Process with PID:"
+ ((ManagedProcess) super.managedProcess).getDuccProcess().getPID()
+ " Terminated. Exit Code:" + exitCode);
// Process is dead, determine if the cgroup container should be
// destroyed as well.
if (agent.useCgroups) {
String containerId = getContainerId();
String userId = ((ManagedProcess) super.managedProcess).getOwner();
// before destroying the container the code checks if there
// are processes still running in it. This could be true if
// user code launched child processes. If there are child
// processes still running, the code kills each one at a
// time and at the end the container is removed.
agent.cgroupsManager.destroyContainer(containerId, userId, NodeAgent.SIGKILL);
logger.info(methodName, null, "Removed CGroup Container with ID:" + containerId);
}
}
} catch (NullPointerException ex) {
((ManagedProcess) super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
StringBuffer sb = new StringBuffer();
sb.setLength(0);
sb.append(
"\n\tJava ProcessBuilder Failed to Launch Process due to NullPointerException. An Entry in the Command Array Must be Null. Look at Command Array Below:\n");
for (String cmdPart : cmd) {
if (cmdPart != null) {
sb.append("\n\t").append(cmdPart);
}
}
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(), sb.toString());
((ManagedProcess) super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
throw ex;
} catch (Exception ex) {
((ManagedProcess) super.managedProcess).getDuccProcess()
.setProcessState(ProcessState.LaunchFailed);
((ManagedProcess) super.managedProcess).getDuccProcess().setProcessExitCode(-1); // overwrite
// process
// exit code
// if stderr
// has a msg
StringWriter stackTraceBuffer = new StringWriter();
ex.printStackTrace(new PrintWriter(stackTraceBuffer));
((ManagedProcess) managedProcess).getDuccProcess()
.setReasonForStoppingProcess(stackTraceBuffer.toString());
failed = true;
logger.info(methodName, ((ManagedProcess) super.managedProcess).getDuccId(),
"Failed to launch Process - Reason:" + stackTraceBuffer.toString());
throw ex;
} finally {
if (isKillCmd) {
// the kill command process has been launched. Nothing else to do.
// We now wait for the process to die.
return;
}
if (!failed) {
// associate exit code
((ManagedProcess) managedProcess).getDuccProcess().setProcessExitCode(exitCode);
}
// Per team discussion on Aug 31 2011, the process is stopped by an
// agent when initialization
// times out or initialization failed. Both Initialization_Timeout
// and FailedIntialization imply
// that the process is stopped. If the process is AP and it exited
// it should be marked
// as Stopped. If the exit was due to Ducc kill mark reason as
// KilledByDucc otherwise we have
// no way of knowing why the process exited and in such case reason
// is Other.
if ((isAP((ManagedProcess) super.managedProcess))) {
// if failed to execute the command line, the process state is already set to Failed
if (!((ManagedProcess) super.managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.LaunchFailed)) {
((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Stopped);
}
if (((ManagedProcess) super.managedProcess).doKill()) { // killed
// by
// agent/ducc
((ManagedProcess) managedProcess).getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
} else if (!failed) {
((ManagedProcess) managedProcess).getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.Other.toString());
}
} else if (!isKillCommand(cmdLine)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.InitializationTimeout)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.FailedInitialization)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Failed)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.LaunchFailed)
&& !((ManagedProcess) managedProcess).getDuccProcess().getProcessState()
.equals(ProcessState.Killed)) {
((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Stopped);
}
}
}
private String[] getDeployableCommandLine(ICommandLine cmdLine, Map<String, String> processEnv)
throws Exception {
// String methodName = "getDeployableCommandLine";
String[] cmd = new String[0];
boolean uimaBasedAP = false;
try {
// lock using Agent single permit semaphore. The
// Utils.concatAllArrays()
// uses native call (for efficiency) which appears not thread safe.
NodeAgent.lock();
// Use ducc_ling (c code) as a launcher for the actual process. The
// ducc_ling
// allows the process to run as a specified user in order to write
// out logs in
// user's space as oppose to ducc space.
String c_launcher_path = Utils.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
// if the command line is kill, don't provide any logging info to
// the ducc_ling. Otherwise,
// ducc_ling creates and empty log for each time we are killing a
// process
if (isKillCommand(cmdLine)) {
// Duccling, with no logging, always run by ducc, no need for
// workingdir
String[] duccling_nolog = new String[] { c_launcher_path, "-u",
((ManagedProcess) super.managedProcess).getOwner(), "--" };
if (useDuccSpawn()) {
cmd = Utils.concatAllArrays(duccling_nolog, new String[] { cmdLine.getExecutable() },
cmdLine.getCommandLine());
} else {
cmd = Utils.concatAllArrays(new String[] { cmdLine.getExecutable() },
cmdLine.getCommandLine());
}
} else {
String processType = "-UIMA-";
// If Java then may run many JPs from the same cmdLine so make a local copy that we can
// modify
if (cmdLine instanceof JavaCommandLine) {
cmdLine = ((JavaCommandLine) cmdLine).copy();
}
logger.info("getDeployableCommandLine", ((ManagedProcess) super.managedProcess).getDuccId(),
"Deploying Process Type:" + ((ManagedProcess) super.managedProcess).getDuccProcess()
.getProcessType().name());
switch (((ManagedProcess) super.managedProcess).getDuccProcess().getProcessType()) {
case Pop:
// Both JD and POP arbitrary process are POPs. Assume this
// is an arbitrary process
processType = "-POP-";
if (cmdLine instanceof JavaCommandLine) {
for (String option : cmdLine.getOptions()) {
logger.info("getDeployableCommandLine",
((ManagedProcess) super.managedProcess).getDuccId(),
"POP CmdLine Option:" + option);
// Both services and JD have processType=POP.
// However, only the JD
// will have -Dducc.deploy.components option set.
if (option.startsWith("-Dducc.deploy.components=")) {
processType = "-JD-";
((ManagedProcess) super.managedProcess).setIsJD(); // mark this process as JD
// break;
} else if (option.startsWith("-Dducc.deploy.JpType=uima")) {
// this is an AP with JP-based "nature". Meaning its using
// a JP process configuration and its pull based model.
uimaBasedAP = true;
}
}
}
break;
case Service:
// processType = "-AP-";
break;
case Job_Uima_AS_Process:
processType = "-UIMA-";
boolean isDucc20JpProcess = false;
boolean isDucc20ServiceProcess = false;
// determine if we are launching Ducc2.0 or Ducc1.+ JP
List<String> options = cmdLine.getOptions();
for (String option : options) {
if (option.indexOf(FlagsHelper.Name.JpType.pname()) > -1) {
isDucc20JpProcess = true;
}
if (option.indexOf("ducc.deploy.components=service") > -1) {
isDucc20ServiceProcess = true;
}
// break;
if (option.trim().equals("-Dducc.deploy.JpType=uima")) {
// this is an AP with JP-based "nature". Meaning its using
// a JP process configuration and its pull based model.
uimaBasedAP = true;
} else if (option.trim().equals("-Dducc.deploy.JpType=uima-as")) {
((ManagedProcess) super.managedProcess).setUimaAs();
}
}
// Add main class and component type to the command line
if (isDucc20JpProcess) {
if (!isDucc20ServiceProcess) {
cmdLine.addOption("-Dducc.deploy.components=job-process");
}
// add port where an agent listens for process state update events
processEnv.put(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value(),
System.getProperty("AGENT_AP_STATE_UPDATE_PORT"));
((JavaCommandLine) cmdLine)
.setClassName("org.apache.uima.ducc.user.common.main.DuccJobService");
// ((JavaCommandLine)cmdLine).setClassName("org.apache.uima.ducc.ps.service.main.ServiceWrapper");
} else {
((ManagedProcess) super.managedProcess).setUimaAs();
cmdLine.addOption("-Dducc.deploy.components=uima-as");
processEnv.put(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value(),
System.getProperty(ProcessStateUpdate.ProcessStateUpdatePort));
((JavaCommandLine) cmdLine)
.setClassName("org.apache.uima.ducc.common.main.DuccService");
}
break;
}
String processLogDir = ((ManagedProcess) super.managedProcess).getProcessInfo()
.getLogDirectory()
+ (((ManagedProcess) super.managedProcess).getProcessInfo().getLogDirectory()
.endsWith(File.separator) ? "" : File.separator)
+ ((ManagedProcess) super.managedProcess).getWorkDuccId() + File.separator;
String processLogFile = ((ManagedProcess) super.managedProcess).getWorkDuccId()
+ processType + host;
String workingDir = ((ManagedProcess) super.managedProcess).getProcessInfo()
.getWorkingDirectory();
if (workingDir == null) {
workingDir = "NONE";
}
// Duccling, with logging
String[] duccling = new String[] { c_launcher_path, "-f", processLogDir + processLogFile,
"-w", workingDir, "-u", ((ManagedProcess) super.managedProcess).getOwner(), "--" };
String executable = cmdLine.getExecutable();
// Check if user specified which java to use to launch the
// process. If not provided,
// use the same java that the agent is running with
if (executable == null || executable.trim().length() == 0) {
executable = System.getProperty("java.home") + File.separator + "bin" + File.separator
+ "java";
}
boolean jd = ((ManagedProcess) super.managedProcess).isJd();
if (cmdLine instanceof JavaCommandLine) {
String classpath = "";
if (jd) {
for (String option : ((JavaCommandLine) cmdLine).getOptions()) {
if (option.startsWith("-Dducc.deploy.UserClasspath")) {
classpath = option.split("=")[1];
break;
}
}
} else {
classpath = ((JavaCommandLine) cmdLine).getClasspath();
}
String duccHomePath = Utils.findDuccHome();
/*
* String[] jars = classpath.split(":"); URLClassLoader clsLoader = newClassLoader(jars);
* Class<?> cls = clsLoader.loadClass("org.apache.uima.impl.UimaVersion"); Method
* majorVersionMethod = cls.getMethod("getMajorVersion"); short majorVersion =
* (short)majorVersionMethod.invoke(null); if ( !duccHomePath.trim().endsWith("/") ) {
* duccHomePath = duccHomePath.concat("/"); } String workItemJarDir =
* duccHomePath+"lib/uima-ducc/workitem/uima-ducc-workitem-"; if ( majorVersion < 3 ) {
* classpath = workItemJarDir+"v2.jar:"+classpath; } else if ( majorVersion >= 3 ) {
* classpath = workItemJarDir+"v3.jar:"+classpath; } else { throw new
* DuccRuntimeException("Unknown version of UIMA - majorVersion="+majorVersion); }
*
* if ( jd ) { // JD uses classloader separation to run user specified jars.
* ((JavaCommandLine) cmdLine).replaceOption("-Dducc.deploy.UserClasspath", classpath); }
* else { ((JavaCommandLine) cmdLine).setClasspath(classpath); }
*/
cmdLine.addOption("-DDUCC_HOME=" + duccHomePath);
cmdLine.addOption(
"-Dducc.deploy.configuration=" + System.getProperty("ducc.deploy.configuration"));
// UIMA-4935 Following moved from CommandExecutor to avoid duplications in the shared
// cmdLine
cmdLine.addOption("-Dducc.deploy.JpUniqueId="
+ ((ManagedProcess) managedProcess).getDuccId().getUnique());
// NOTE - These are redundant since the information is also
// in the environment for both Java and non-Java processes
cmdLine.addOption("-Dducc.process.log.dir=" + processLogDir);
cmdLine.addOption("-Dducc.process.log.basename=" + processLogFile);
cmdLine.addOption(
"-Dducc.job.id=" + ((ManagedProcess) super.managedProcess).getWorkDuccId());
}
if (useDuccSpawn()) {
cmd = Utils.concatAllArrays(duccling, new String[] { executable },
cmdLine.getCommandLine());
} else {
cmd = Utils.concatAllArrays(new String[] { executable }, cmdLine.getCommandLine());
}
// add JobId and the log prefix to the env so additional
// similarly-named log files can be created
// Also put the state update port in the environment for all processes ... instead of just
// some as a system property
processEnv.put(IDuccUser.EnvironmentVariable.DUCC_ID_JOB.value(), String
.valueOf(((ManagedProcess) super.managedProcess).getWorkDuccId().getFriendly()));
processEnv.put(IDuccUser.EnvironmentVariable.DUCC_LOG_PREFIX.value(),
processLogDir + processLogFile);
for (String part : cmd) {
if (part.startsWith("-Dducc.deploy.JpType=uima")) {
// this is an AP with JP-based "nature". Meaning its using
// a JP process configuration and its pull based model.
uimaBasedAP = true;
break;
}
}
// Currently agent has two ports where it listens for process state updates. One is
// for JPs and the other is for APs and JDs. The latter use a simplified state update
// protocol
// which is String based. The JPs actually serialize a more complex state Object.
// There is a way to deploy an AP with a JP "nature". Meaning an AP whose internal
// components looks just like a JP. Such AP must report its state to the same
// agent port as a JP. Only non-uima based APs will report to the other port.
if (jd || (isAP((ManagedProcess) super.managedProcess) && !uimaBasedAP)) {
logger.info("getDeployableCommandLine",
((ManagedProcess) super.managedProcess).getDuccId(),
"Deploying Process Type:"
+ ((ManagedProcess) super.managedProcess).getDuccProcess()
.getProcessType().name()
+ " This process will report state update to AP specific socket listener running on port:"
+ System.getProperty("AGENT_AP_STATE_UPDATE_PORT"));
ITimeWindow twi = new TimeWindow();
((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi);
String millis = TimeStamp.getCurrentMillis();
twi.setStart(millis);
twi.setEnd(millis);
ITimeWindow twr = new TimeWindow();
((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr);
twr.setStart(millis);
processEnv.put(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value(),
System.getProperty("AGENT_AP_STATE_UPDATE_PORT"));
} else {
logger.info("getDeployableCommandLine",
((ManagedProcess) super.managedProcess).getDuccId(),
"Deploying Process Type:"
+ ((ManagedProcess) super.managedProcess).getDuccProcess()
.getProcessType().name()
+ " This process will report state update to Camel-Mina listener running on port:"
+ System.getProperty(ProcessStateUpdate.ProcessStateUpdatePort));
// processEnv.put(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value(),
// System.getProperty(ProcessStateUpdate.ProcessStateUpdatePort));
}
}
// Replace the reserved DUCC variable with the architecture of this node (ppc64 or amd64 or
// ...)
// (could have been done in getCommandLine if that did return the full cmd line!)
String osArch = System.getProperty("os.arch");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < cmd.length; ++i) {
if (cmd[i].contains("${DUCC_OS_ARCH}")) {
cmd[i] = cmd[i].replace("${DUCC_OS_ARCH}", osArch);
}
sb.append(cmd[i]).append(" ");
}
logger.info("getDeployableCommandLine --------------- ", null, sb.toString());
return cmd;
} catch (Exception ex) {
((ManagedProcess) super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed);
throw ex;
} finally {
NodeAgent.unlock();
}
}
public URLClassLoader newClassLoader(String[] classPathElements) throws IOException {
ArrayList<URL> urlList = new ArrayList<URL>(classPathElements.length);
for (String element : classPathElements) {
if (element.endsWith("*")) {
File dir = new File(element.substring(0, element.length() - 1));
File[] files = dir.listFiles(); // Will be null if missing or not a dir
if (files != null) {
for (File f : files) {
if (f.getName().endsWith(".jar")) {
urlList.add(f.getCanonicalFile().toURI().toURL());
}
}
}
} else {
File f = new File(element);
if (f.exists()) {
urlList.add(f.getCanonicalFile().toURI().toURL());
}
}
}
URL[] urls = new URL[urlList.size()];
return new URLClassLoader(urlList.toArray(urls),
ClassLoader.getSystemClassLoader().getParent());
}
public void stop() {
}
}