| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.agent.launcher; |
| |
| import java.io.File; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.uima.ducc.agent.NodeAgent; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.TimeStamp; |
| import org.apache.uima.ducc.common.utils.Utils; |
| import org.apache.uima.ducc.common.utils.id.DuccId; |
| import org.apache.uima.ducc.transport.DuccExchange; |
| import org.apache.uima.ducc.transport.cmdline.ACommandLine; |
| import org.apache.uima.ducc.transport.cmdline.ICommandLine; |
| import org.apache.uima.ducc.transport.cmdline.JavaCommandLine; |
| import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine; |
| import org.apache.uima.ducc.transport.event.ProcessStopDuccEvent; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType; |
| import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; |
| import org.apache.uima.ducc.transport.event.common.ITimeWindow; |
| import org.apache.uima.ducc.transport.event.common.TimeWindow; |
| |
| |
| public class DuccCommandExecutor extends CommandExecutor { |
| DuccLogger logger = DuccLogger.getLogger(this.getClass(), |
| NodeAgent.COMPONENT_NAME); |
| @SuppressWarnings("unused") |
| private static AtomicInteger nextPort = new AtomicInteger(30000); |
| |
| public DuccCommandExecutor(NodeAgent agent, ICommandLine cmdLine,String host, String ip, Process managedProcess) |
| throws Exception { |
| super(agent, cmdLine, host, ip, managedProcess); |
| } |
| public DuccCommandExecutor(ICommandLine cmdLine,String host, String ip, Process managedProcess) |
| throws Exception { |
| super(null, cmdLine, host, ip, managedProcess); |
| } |
| |
| private boolean useDuccSpawn() { |
| if ( super.managedProcess.isAgentProcess() || Utils.isWindows() ) { |
| return false; |
| } |
| // On non-windows check if we should spawn the process via ducc_ling |
| String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn"); |
| if ( useSpawn != null && useSpawn.toLowerCase().equals("true")) { |
| return true; |
| } |
| // default |
| return false; |
| } |
| |
| private boolean createCGroupContainer(IDuccProcess duccProcess, String containerId, String owner ) throws Exception { |
| // create cgroups container and assign limits |
| if ( agent.cgroupsManager.createContainer( containerId, owner, useDuccSpawn()) ) { |
| return agent.cgroupsManager.setContainerMaxMemoryLimit(containerId, |
| owner, useDuccSpawn(), duccProcess.getCGroup().getMaxMemoryLimit()); |
| } |
| return false; |
| } |
| |
| private String getContainerId() { |
| String containerId; |
| if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service)) { |
| containerId = String.valueOf(((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId()); |
| } else { |
| containerId = ((ManagedProcess) managedProcess).getWorkDuccId().getFriendly()+"."+((ManagedProcess) managedProcess).getDuccProcess().getCGroup().getId(); |
| } |
| return containerId; |
| } |
| public Process exec(ICommandLine cmdLine, Map<String, String> processEnv) |
| throws Exception { |
| String methodName = "exec"; |
| try { |
| String[] cmd = getDeployableCommandLine(cmdLine,processEnv); |
| if ( isKillCommand(cmdLine) ) { |
| logger.info(methodName, null, "Killing process"); |
| stopProcess(cmdLine, cmd); |
| } else { |
| IDuccProcess duccProcess = ((ManagedProcess) managedProcess).getDuccProcess(); |
| // If running a real agent on a node, collect swap info and assign max swap usage threshold |
| // for each process. In virtual mode, where there are multiple agents per node, we dont |
| // set nor enforce swap limits. |
| if ( !agent.virtualAgent) { |
| // Calculate how much swap space the process is allowed to use. The calculation is based on |
| // the percentage of real memory the process is assigned. The process is entitled the |
| // same percentage of the swap. |
| // Normalize node's total memory as it is expressed in KB. The calculation below is based on bytes. |
| double percentOfTotal = ((double)duccProcess.getCGroup().getMaxMemoryLimit())/ |
| (agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()*1024); // need bytes |
| |
| |
| // substract 1Gig from total swap on this node to accommodate OS needs for swapping. The |
| // getSwapTotal() returns swap space in KBs so normalize 1Gig |
| long adjustedTotalSwapAvailable = |
| agent.getNodeInfo().getNodeMetrics().getNodeMemory().getSwapTotal() - 1048576; |
| // calculate the portion (in bytes) of swap this process is entitled to |
| long maxProcessSwapUsage = |
| (long) (adjustedTotalSwapAvailable*percentOfTotal)*1024; |
| // assigned how much swap this process is entitled to. If it exceeds this number the Agent |
| // will kill the process. |
| ((ManagedProcess) managedProcess).setMaxSwapThreshold(maxProcessSwapUsage); |
| logger.info(methodName, null, "---Process DuccId:"+duccProcess.getDuccId()+ |
| " CGroup.getMaxMemoryLimit():"+((duccProcess.getCGroup().getMaxMemoryLimit()/1024)/1024)+" MBs"+ |
| " Node Memory Total:"+(agent.getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal()/1024)+" MBs"+ |
| " Percentage Of Real Memory:"+percentOfTotal+ |
| " Adjusted Total Swap Available On Node:"+adjustedTotalSwapAvailable/1024+" MBs"+ |
| " Process Entitled To Max:"+(maxProcessSwapUsage/1024)/1024+" MBs of Swap" |
| ); |
| |
| //logger.info(methodName, null, "The Process With ID:"+duccProcess.getDuccId()+" is Entitled to the Max "+( (maxProcessSwapUsage/1024)/1024)+" Megs of Swap Space"); |
| // if configured to use cgroups and the process is the cgroup owner, create a cgroup |
| // using Process DuccId as a name. Additional processes may be injected into the |
| // cgroup by declaring cgroup owner id. |
| if ( agent.useCgroups ) { |
| // JDs are of type Pop (Plain Old Process). JDs run in a reservation. The cgroup container |
| // is created for the reservation and we co-locate as many JDs as we can fit in it. |
| //String containerId = ((ManagedProcess) managedProcess).getWorkDuccId()+"."+duccProcess.getCGroup().getId().getFriendly(); |
| String containerId = getContainerId(); |
| logger.info(methodName, null, "Creating CGroup with ID:"+containerId); |
| if ( !agent.cgroupsManager.cgroupExists(agent.cgroupsManager.getDuccCGroupBaseDir()+"/"+containerId) ) { |
| boolean failed = false; |
| // create cgroup container for JDs |
| try { |
| if ( createCGroupContainer(duccProcess, containerId, ((ManagedProcess)super.managedProcess).getOwner()) ) { |
| logger.info(methodName, null, "Created CGroup with ID:"+containerId+" With Memory Limit="+((ManagedProcess)super.managedProcess).getDuccProcess().getCGroup().getMaxMemoryLimit()+" Bytes"); |
| } else { |
| logger.info(methodName, null, "Failed To Create CGroup with ID:"+containerId); |
| duccProcess.setProcessState(ProcessState.Failed); |
| duccProcess.setReasonForStoppingProcess("CGroupCreationFailed"); |
| failed = true; |
| agent.stop(); |
| } |
| } catch( Exception e) { |
| logger.error(methodName, null, e); |
| failed = true; |
| agent.stop(); |
| } |
| if ( failed ) { |
| throw new RuntimeException("The Agent is Unable To Create A CGroup with Container ID: "+containerId+". Rejecting Deployment of Process with ID:"+duccProcess.getDuccId()); |
| } |
| } else { |
| logger.info(methodName, null, "CGroup Exists with ID:"+containerId); |
| |
| } |
| |
| String[] cgroupCmd = new String[cmd.length+3]; |
| cgroupCmd[0] = agent.cgroupsManager.getCGroupsUtilsDir()+"/cgexec"; |
| cgroupCmd[1] = "-g"; |
| cgroupCmd[2] = agent.cgroupsManager.getSubsystems()+":ducc/"+containerId; |
| int inx = 3; |
| for ( String cmdPart : cmd ) { |
| cgroupCmd[inx++] = cmdPart; |
| } |
| startProcess(cmdLine, cgroupCmd, processEnv); |
| } else { |
| // Not configured to use CGroups |
| startProcess(cmdLine, cmd, processEnv); |
| } |
| } else { |
| // dont use CGroups on virtual agents |
| startProcess(cmdLine, cmd, processEnv); |
| } |
| |
| |
| |
| } |
| return managedProcess; |
| } catch (Exception e) { |
| if ( ((ManagedProcess)super.managedProcess).getDuccProcess() != null ) { |
| DuccId duccId = ((ManagedProcess)super.managedProcess).getDuccId(); |
| logger.error(methodName, duccId, ((ManagedProcess)super.managedProcess).getDuccProcess().getDuccId(), e, new Object[]{}); |
| } |
| throw e; |
| } |
| } |
| |
| private void stopProcess(ICommandLine cmdLine, String[] cmd ) throws Exception { |
| String methodName = "stopProcess"; |
| Future<?> future = ((ManagedProcess) managedProcess).getFuture(); |
| if ( future == null ) { |
| throw new Exception("Future Object not Found. Unable to Stop Process with PID:"+((ManagedProcess) managedProcess).getPid()); |
| } |
| // for stop to work, PID must be provided |
| if (((ManagedProcess) managedProcess).getDuccProcess().getPID() == null || |
| ((ManagedProcess) managedProcess).getDuccProcess().getPID().trim().length() == 0 ) { |
| throw new Exception("Process Stop Command Failed. PID not provided."); |
| } |
| long maxTimeToWaitForProcessToStop = 60000; // default 1 minute |
| if ( super.agent.configurationFactory.processStopTimeout != null ) { |
| maxTimeToWaitForProcessToStop = Long.valueOf(super.agent.configurationFactory.processStopTimeout); |
| } |
| try { |
| // if the process is marked for death or still initializing or it is JD, kill it |
| if ( ((ManagedProcess) managedProcess).doKill() || |
| ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service) || |
| ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) || |
| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Initializing) || |
| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Starting) || |
| ((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.FailedInitialization)) { |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),">>>>>>>>>>>>>>> Killing Process:"+((ManagedProcess) managedProcess).getPid()); |
| |
| if ( ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service) || |
| ((ManagedProcess) managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop)) { |
| ICommandLine cmdL; |
| if (Utils.isWindows()) { |
| cmdL = new NonJavaCommandLine("taskkill"); |
| cmdL.addArgument("/PID"); |
| } else { |
| cmdL = new NonJavaCommandLine("/bin/kill"); |
| if ( ((ManagedProcess) managedProcess).isJd() ) { |
| // kill JD hard. |
| cmdL.addArgument("-9"); |
| } else { |
| cmdL.addArgument("-15"); |
| } |
| } |
| cmdL.addArgument(((ManagedProcess) managedProcess).getDuccProcess().getPID()); |
| |
| // String[] sigTermCmdLine = new String[] {"/bin/kill","-15",((ManagedProcess) managedProcess).getDuccProcess().getPID()}; |
| String[] sigTermCmdLine = getDeployableCommandLine(cmdL, new HashMap<String, String>()); |
| doExec(new ProcessBuilder(sigTermCmdLine), sigTermCmdLine, true); |
| } else { |
| doExec(new ProcessBuilder(cmd), cmd, true); |
| } |
| //doExec(new ProcessBuilder(cmd), cmd, true); |
| } else { // send stop request to quiesce the service |
| Map<String, Object> msgHeader = new HashMap<String, Object>(); |
| // Add PID to the header. Receiving process has a PID filter in its router |
| // to determine if the stop message is destined for it. |
| msgHeader.put(DuccExchange.ProcessPID, ((ManagedProcess) managedProcess).getPid()); |
| msgHeader.put(DuccExchange.DUCCNODEIP, ((ManagedProcess) managedProcess).getDuccProcess().getNodeIdentity().getIp()); |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"Agent Sending Stop request to remote process with PID:"+((ManagedProcess) managedProcess).getPid()+ " On Node:"+((ManagedProcess) managedProcess).getDuccProcess().getNodeIdentity().getIp()); |
| try { |
| // Use either Mina (socket) or jms to communicate with an agent |
| if (((ManagedProcess) managedProcess).getSocketEndpoint() != null ) { |
| |
| logger.info(methodName,null,"Agent Sending <<STOP>> to Endpoint:"+((ManagedProcess) managedProcess).getSocketEndpoint()); |
| // socket transport |
| super. |
| agent. |
| getEventDispatcherForRemoteProcess(). |
| dispatch( new ProcessStopDuccEvent(new HashMap<DuccId, IDuccProcess>()), |
| ((ManagedProcess) managedProcess).getSocketEndpoint(), msgHeader ); |
| |
| } else { |
| // jms transport |
| super. |
| agent. |
| getEventDispatcherForRemoteProcess(). |
| dispatch(new ProcessStopDuccEvent(new HashMap<DuccId, IDuccProcess>()),msgHeader ); |
| } |
| } catch( Exception ex) { |
| logger.error(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ex, new Object[]{}); |
| } finally { |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" Waiting for Process to Stop. Timout Value:"+maxTimeToWaitForProcessToStop+" millis"); |
| try { |
| // Start Kill timer only if process is still running |
| if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) { |
| // the following call will block!!! Waits for process to stop on its own. If it doesnt stop, kills it hard. |
| // The exact time the service is allotted for stopping is defined in ducc.properties |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Starting Timer For Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()); |
| future.get(maxTimeToWaitForProcessToStop, TimeUnit.MILLISECONDS); |
| } else { |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Dispatched STOP Request to Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" Process State: "+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process Not In Running State"); |
| } |
| |
| } catch (TimeoutException tex) { // on time out kill the process |
| if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Running)) { |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()+" .Process did not stop in alloted time of "+maxTimeToWaitForProcessToStop+" millis"); |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),">>>>>>>>>>>>>>> Killing Process:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" .Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()); |
| doExec(new ProcessBuilder(cmd), cmd, true); |
| } else { |
| logger.info(methodName,((ManagedProcess)super.managedProcess).getDuccId(),"------------ Agent Timed-out Waiting for Process with PID:"+((ManagedProcess) managedProcess).getDuccProcess().getPID()+" to Stop but the process is not in a running state. Process State:"+((ManagedProcess) managedProcess).getDuccProcess().getProcessState()); |
| } |
| } |
| } |
| } |
| } catch( Exception e) { // InterruptedException, ExecutionException |
| logger.error(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), e, new Object[]{}); |
| } |
| } |
| |
| private void startProcess(ICommandLine cmdLine,String[] cmd, Map<String, String> processEnv) throws Exception { |
| String methodName = "startProcess"; |
| |
| String millis; |
| millis = TimeStamp.getCurrentMillis(); |
| |
| ProcessBuilder pb = new ProcessBuilder(cmd); |
| |
| if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop) || |
| ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Service) ) { |
| ITimeWindow twi = new TimeWindow(); |
| ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowInit(twi); |
| twi.setStart(millis); |
| twi.setEnd(millis); |
| |
| ITimeWindow twr = new TimeWindow(); |
| ((ManagedProcess) managedProcess).getDuccProcess().setTimeWindowRun(twr); |
| twr.setStart(millis); |
| |
| } |
| |
| Map<String, String> env = pb.environment(); |
| // Dont enherit agent's environment |
| env.clear(); |
| // enrich Process environment |
| env.putAll(processEnv); |
| if( cmdLine instanceof ACommandLine ) { |
| // enrich Process environment with one from a given command line |
| env.putAll(((ACommandLine)cmdLine).getEnvironment()); |
| } |
| if ( logger.isTrace()) { |
| for( Entry<String, String> entry: env.entrySet()) { |
| String message = "key:"+entry.getKey()+" "+"value:"+entry.getValue(); |
| logger.trace(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), message); |
| |
| } |
| } |
| try { |
| doExec(pb, cmd, isKillCommand(cmdLine)); |
| } catch(Exception e) { |
| throw e; |
| } finally { |
| // millis = TimeStamp.getCurrentMillis(); |
| // twr.setEnd(millis); |
| } |
| } |
| /** |
| * Checks if a given process is AP. The code checks if process type is POP and it is *not* JD |
| * |
| * @param process - process instance |
| * @return - true if AP, false otherwise |
| */ |
| private boolean isAP( ManagedProcess process ) { |
| if ( !process.isJd() && process.getDuccProcess().getProcessType().equals(ProcessType.Pop) ) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| private void doExec(ProcessBuilder pb, String[] cmd, boolean isKillCmd) throws Exception { |
| String methodName = "doExec"; |
| int exitCode=0; |
| try { |
| |
| StringBuilder sb = new StringBuilder((isKillCommand(cmdLine) ?"--->Killing Process ":"---> Launching Process:") |
| + " Using command line:"); |
| int inx=0; |
| for (String cmdPart : cmd) { |
| sb.append("\n\t[").append(inx++).append("]").append(Utils.resolvePlaceholderIfExists(cmdPart, System.getProperties())); |
| } |
| logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), sb.toString()); |
| |
| java.lang.Process process = pb.start(); |
| // Drain process streams |
| postExecStep(process, logger, isKillCmd); |
| // block waiting for the process to terminate. |
| exitCode = process.waitFor(); |
| if ( !isKillCommand(cmdLine) ) { |
| logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), ">>>>>>>>>>>>> Process with PID:"+((ManagedProcess)super.managedProcess).getDuccProcess().getPID()+" Terminated. Exit Code:"+exitCode); |
| // Process is dead, determine if the cgroup container should be destroyed as well. |
| if ( agent.useCgroups ) { |
| String containerId = getContainerId(); |
| agent.cgroupsManager.destroyContainer(containerId); |
| logger.info(methodName, null, "Removed CGroup Container with ID:"+containerId); |
| } |
| } |
| // if DUCC kills a process, its exitCode should be reset to 0 |
| if ( ((ManagedProcess)super.managedProcess).doKill() || isKillCmd || |
| ((ManagedProcess)super.managedProcess).isStopping()) { // always true when undeploying process |
| exitCode = 0; |
| } |
| |
| } catch( NullPointerException ex) { |
| ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed); |
| StringBuffer sb = new StringBuffer(); |
| sb.setLength(0); |
| sb.append("\n\tJava ProcessBuilder Failed to Launch Process due to NullPointerException. An Entry in the Command Array Must be Null. Look at Command Array Below:\n"); |
| for (String cmdPart : cmd) { |
| if ( cmdPart != null ) { |
| sb.append("\n\t").append(cmdPart); |
| } |
| } |
| logger.info(methodName, ((ManagedProcess)super.managedProcess).getDuccId(), sb.toString()); |
| ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed); |
| throw ex; |
| } catch( Exception ex) { |
| ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed); |
| throw ex; |
| } finally { |
| // associate exit code |
| ((ManagedProcess) managedProcess).getDuccProcess().setProcessExitCode(exitCode); |
| // Per team discussion on Aug 31 2011, the process is stopped by an agent when initialization |
| // times out or initialization failed. Both Initialization_Timeout and FailedIntialization imply |
| // that the process is stopped. If the process is AP and it exited it should be marked |
| // as Stopped. If the exit was due to Ducc kill mark reason as KilledByDucc otherwise we have |
| // no way of knowing why the process exited and in such case reason is Other. |
| if ( (isAP((ManagedProcess)super.managedProcess)) ) { |
| ((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Stopped); |
| if ( ((ManagedProcess)super.managedProcess).doKill() ) { // killed by agent/ducc |
| ((ManagedProcess) managedProcess).getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString()); |
| } else { |
| ((ManagedProcess) managedProcess).getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.Other.toString()); |
| } |
| |
| } else if ( !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.InitializationTimeout) && |
| !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.FailedInitialization) && |
| !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Failed) && |
| !((ManagedProcess) managedProcess).getDuccProcess().getProcessState().equals(ProcessState.Killed)) { |
| ((ManagedProcess) managedProcess).getDuccProcess().setProcessState(ProcessState.Stopped); |
| } |
| } |
| |
| } |
| |
| private String[] getDeployableCommandLine(ICommandLine cmdLine, Map<String, String> processEnv) throws Exception { |
| //String methodName = "getDeployableCommandLine"; |
| String[] cmd = new String[0]; |
| |
| try { |
| // lock using Agent single permit semaphore. The Utils.concatAllArrays() |
| // uses native call (for efficiency) which appears not thread safe. |
| NodeAgent.lock(); |
| // Use ducc_ling (c code) as a launcher for the actual process. The ducc_ling |
| // allows the process to run as a specified user in order to write out logs in |
| // user's space as oppose to ducc space. |
| String c_launcher_path = |
| Utils.resolvePlaceholderIfExists( |
| System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties()); |
| |
| // if the command line is kill, don't provide any logging info to the ducc_ling. Otherwise, |
| // ducc_ling creates and empty log for each time we are killing a process |
| if ( isKillCommand(cmdLine) ) { |
| // Duccling, with no logging, always run by ducc, no need for workingdir |
| String[] duccling_nolog = new String[] { c_launcher_path, |
| "-u", ((ManagedProcess)super.managedProcess).getOwner(), |
| "--" }; |
| if ( useDuccSpawn() ) { |
| cmd = Utils.concatAllArrays(duccling_nolog, new String[] {cmdLine.getExecutable()},cmdLine.getCommandLine()); |
| } else { |
| cmd = Utils.concatAllArrays(new String[] {cmdLine.getExecutable()},cmdLine.getCommandLine()); |
| } |
| } else { |
| String processType = "-UIMA-"; |
| switch( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType() ) { |
| case Pop: |
| // Both JD and POP arbitrary process are POPs. Assume this is an arbitrary process |
| processType = "-POP-"; |
| if ( cmdLine instanceof JavaCommandLine ) { |
| List<String> options = ((JavaCommandLine)cmdLine).getOptions(); |
| for(String option : options ) { |
| // Both services and JD have processType=POP. However, only the JD |
| // will have -Dducc.deploy.components option set. |
| if (option.startsWith("-Dducc.deploy.components=")) { |
| processType = "-JD-"; |
| ((ManagedProcess)super.managedProcess).setIsJD(); // mark this process as JD |
| break; |
| } |
| } |
| } |
| break; |
| case Service: |
| //processType = "-AP-"; |
| break; |
| case Job_Uima_AS_Process: |
| processType = "-UIMA-"; |
| ((JavaCommandLine)cmdLine).addOption("-Dducc.deploy.components=uima-as"); |
| ((JavaCommandLine)cmdLine).setClassName("org.apache.uima.ducc.common.main.DuccService"); |
| |
| break; |
| } |
| // if ( ((ManagedProcess)super.managedProcess).getDuccProcess().getProcessType().equals(ProcessType.Pop)) { |
| // processType = "-JD-"; |
| // } |
| String processLogDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory()+ |
| (((ManagedProcess)super.managedProcess).getProcessInfo().getLogDirectory().endsWith(File.separator) ? "" : File.separator)+ |
| ((ManagedProcess)super.managedProcess).getWorkDuccId()+File.separator; |
| String processLogFile = ((ManagedProcess)super.managedProcess).getWorkDuccId()+ processType+host; |
| String workingDir = ((ManagedProcess)super.managedProcess).getProcessInfo().getWorkingDirectory(); |
| if ( workingDir == null ) { |
| workingDir = "NONE"; |
| } |
| |
| // Duccling, with logging |
| String[] duccling = new String[] { c_launcher_path, |
| "-f", processLogDir+processLogFile, |
| "-w", workingDir, |
| "-u", ((ManagedProcess)super.managedProcess).getOwner(), |
| "--" }; |
| |
| String executable = cmdLine.getExecutable(); |
| // Check if user specified which java to use to launch the process. If not provided, |
| // use the same java that the agent is running with |
| if (executable == null || executable.trim().length() == 0 ) { |
| executable = System.getProperty("java.home")+File.separator+"bin"+File.separator+"java"; |
| } |
| List<String> operationalProperties = new ArrayList<String>(); |
| |
| if ( cmdLine instanceof JavaCommandLine ) { |
| String duccHomePath = Utils.findDuccHome(); |
| operationalProperties.add("-DDUCC_HOME="+duccHomePath); |
| operationalProperties.add("-Dducc.deploy.configuration="+ |
| System.getProperty("ducc.deploy.configuration")); |
| if ( System.getProperties().containsKey("ducc.agent.managed.process.state.update.endpoint.type") ) { |
| String type = System.getProperty("ducc.agent.managed.process.state.update.endpoint.type"); |
| if (type != null && type.equalsIgnoreCase("socket")) { |
| operationalProperties.add("-D"+NodeAgent.ProcessStateUpdatePort+"="+System.getProperty(NodeAgent.ProcessStateUpdatePort)); |
| } |
| } |
| operationalProperties.add("-Dducc.process.log.dir="+processLogDir); |
| operationalProperties.add("-Dducc.process.log.basename="+processLogFile); //((ManagedProcess)super.managedProcess).getWorkDuccId()+ processType+host); |
| operationalProperties.add("-Dducc.job.id="+((ManagedProcess)super.managedProcess).getWorkDuccId()); |
| |
| } |
| String[] operationalPropertiesArray = new String[operationalProperties.size()]; |
| |
| if ( useDuccSpawn() ) { |
| cmd = Utils.concatAllArrays(duccling, new String[] {executable}, operationalProperties.toArray(operationalPropertiesArray), cmdLine.getCommandLine()); |
| } else { |
| cmd = Utils.concatAllArrays(new String[] {executable}, operationalProperties.toArray(operationalPropertiesArray), cmdLine.getCommandLine()); |
| } |
| // add JobId to the env |
| if ( processEnv != null ) { |
| processEnv.put("JobId", String.valueOf(((ManagedProcess)super.managedProcess).getWorkDuccId().getFriendly())); |
| } |
| } |
| return cmd; |
| } catch( Exception ex) { |
| ((ManagedProcess)super.managedProcess).getDuccProcess().setProcessState(ProcessState.Failed); |
| throw ex; |
| } finally { |
| NodeAgent.unlock(); |
| } |
| |
| } |
| public void stop() { |
| } |
| } |