| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.agent; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileReader; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Route; |
| import org.apache.camel.model.RouteDefinition; |
| import org.apache.camel.builder.RouteBuilder; |
| import org.apache.commons.lang.SerializationUtils; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.uima.ducc.agent.config.AgentConfiguration; |
| import org.apache.uima.ducc.agent.event.AgentEventListener; |
| import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver; |
| import org.apache.uima.ducc.agent.launcher.CGroupsManager; |
| import org.apache.uima.ducc.agent.launcher.DefunctProcessDetector; |
| import org.apache.uima.ducc.agent.launcher.ICommand; |
| import org.apache.uima.ducc.agent.launcher.Launcher; |
| import org.apache.uima.ducc.agent.launcher.ManagedProcess; |
| import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority; |
| import org.apache.uima.ducc.agent.launcher.SigKillCommand; |
| import org.apache.uima.ducc.agent.launcher.SigTermCommand; |
| import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector; |
| import org.apache.uima.ducc.agent.processors.DefaultNodeInventoryProcessor; |
| import org.apache.uima.ducc.common.Node; |
| import org.apache.uima.ducc.common.NodeIdentity; |
| import org.apache.uima.ducc.common.admin.event.DuccAdminEvent; |
| import org.apache.uima.ducc.common.admin.event.DuccAdminEventQuiesceAndStop; |
| import org.apache.uima.ducc.common.admin.event.DuccAdminEventStop; |
| import org.apache.uima.ducc.common.admin.event.DuccAdminEventStopMetrics; |
| import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties; |
| import org.apache.uima.ducc.common.component.AbstractDuccComponent; |
| import org.apache.uima.ducc.common.main.DuccService; |
| import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.TimeStamp; |
| import org.apache.uima.ducc.common.utils.Utils; |
| import org.apache.uima.ducc.common.utils.IDuccLoggerComponents.Daemon; |
| import org.apache.uima.ducc.common.utils.id.DuccId; |
| import org.apache.uima.ducc.common.utils.id.IDuccId; |
| import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent; |
| import org.apache.uima.ducc.transport.agent.ProcessStateUpdate; |
| import org.apache.uima.ducc.transport.cmdline.ICommandLine; |
| import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine; |
| import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher; |
| import org.apache.uima.ducc.transport.event.AgentProcessLifecycleReportDuccEvent; |
| import org.apache.uima.ducc.transport.event.AgentProcessLifecycleReportDuccEvent.LifecycleEvent; |
| import org.apache.uima.ducc.transport.event.DuccEvent.EventType; |
| import org.apache.uima.ducc.transport.event.DaemonDuccEvent; |
| import org.apache.uima.ducc.transport.event.DuccEvent; |
| import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent; |
| import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent; |
| import org.apache.uima.ducc.transport.event.common.DuccProcess; |
| import org.apache.uima.ducc.transport.event.common.DuccReservation; |
| import org.apache.uima.ducc.transport.event.common.DuccReservationMap; |
| import org.apache.uima.ducc.transport.event.common.DuccUserReservation; |
| import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservation; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo; |
| import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; |
| import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState; |
| import org.apache.uima.ducc.transport.event.common.ITimeWindow; |
| import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment; |
| import org.apache.uima.ducc.transport.event.common.TimeWindow; |
| |
| public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver { |
| public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME); |
| |
| private static DuccId jobid = null; |
| // Replaced by duplicate in org.apache.uima.ducc.transport.agent.ProcessStateUpdate |
| // public static final String ProcessStateUpdatePort = "ducc.agent.process.state.update.port"; |
| |
| public static int SIGKILL = 9; |
| |
| public static int SIGTERM = 15; |
| |
| // for LinuxNodeMetrics logging |
| public static AtomicLong logCounter = new AtomicLong(); |
| |
| public static String cgroupFailureReason; |
| |
| // Map of known processes this agent is managing. This map is published |
| // at regular intervals as part of agent's inventory update. |
| private Map<DuccId, IDuccProcess> inventory = new ConcurrentHashMap<>(); |
| |
| // Semaphore controlling access to inventory Map |
| private Semaphore inventorySemaphore = new Semaphore(1); |
| |
| List<ManagedProcess> deployedProcesses = new ArrayList<ManagedProcess>(); |
| |
| // This agent's identity ( host name and IP address) |
| private NodeIdentity nodeIdentity; |
| |
| // Component to launch processes |
| private Launcher launcher; |
| |
| // Reference to the Agent's configuration factory component. This is where |
| // Agent |
| // dependencies are instantiated and injected via Agent's c'tor. |
| public AgentConfiguration configurationFactory; |
| |
| private static Semaphore agentLock = new Semaphore(1); |
| |
| private DuccEventDispatcher commonProcessDispatcher; |
| |
| private DuccEventDispatcher ORDispatcher; |
| |
| private Object monitor = new Object(); |
| |
| boolean duccLingExists = false; |
| |
| boolean runWithDuccLing = false; |
| |
| private List<DuccUserReservation> reservations = new ArrayList<DuccUserReservation>(); |
| |
| private Semaphore reservationsSemaphore = new Semaphore(1); |
| |
| // private AgentMonitor nodeMonitor; |
| |
| private volatile boolean stopping = false; |
| |
| private Object stopLock = new Object(); |
| |
| private RogueProcessReaper rogueProcessReaper = new RogueProcessReaper(logger, 5, 10); |
| |
| public volatile boolean useCgroups = false; |
| |
| public CGroupsManager cgroupsManager = null; |
| |
| public Node node = null; |
| |
| public volatile boolean excludeAPs = false; |
| |
| public int shareQuantum; |
| |
| private boolean agentVirtual = System.getProperty("ducc.agent.virtual") == null ? false : true; |
| |
| // This flag, when true, forces the agent to be "real" |
| // regardless of the value of ducc.agent.virtual. |
| // In the future, this flag and support for ducc.agent.virtual |
| // should be removed altogether. |
| // This is to support CGroups for all agents, |
| // which was previously disabled for virtual ones. |
| private boolean agentRealOnly = true; |
| |
| public boolean pageSizeFetched = false; |
| |
| public int pageSize = 4096; // default |
| |
| public int cpuClockRate = 100; |
| |
| public int numProcessors = 0; |
| |
| ExecutorService defunctDetectorExecutor = Executors.newCachedThreadPool(); |
| |
| private AgentEventListener eventListener; |
| |
| // indicates whether or not this agent received at least one publication |
| // from the PM. This flag is used to determine if the agent should use |
| // rogue process detector. The detector will be used if this flag is true. |
| public volatile boolean receivedDuccState = false; |
| |
| private String stateChangeEndpoint; |
| |
| long maxTimeToWaitForProcessToStop = 60000; // default 1 minute |
| |
| public void setStateChangeEndpoint(String stateChangeEndpoint) { |
| this.stateChangeEndpoint = stateChangeEndpoint; |
| } |
| |
| /** |
| * Ctor used exclusively for black-box testing of this class. |
| */ |
| public NodeAgent() { |
| super(COMPONENT_NAME, null); |
| } |
| |
| public NodeAgent(NodeIdentity ni) { |
| this(); |
| this.nodeIdentity = ni; |
| Utils.findDuccHome(); // add DUCC_HOME to System.properties |
| |
| if (configurationFactory.processStopTimeout != null) { |
| maxTimeToWaitForProcessToStop = Long.valueOf(configurationFactory.processStopTimeout); |
| } |
| } |
| |
| public long getLastORSequence() { |
| long lastORSequence = 0; |
| if (eventListener != null) { |
| lastORSequence = eventListener.getLastSequence(); |
| } |
| return lastORSequence; |
| } |
| |
| public AgentEventListener getEventListener() { |
| return eventListener; |
| } |
| |
| public void setAgentEventListener(AgentEventListener listener) { |
| eventListener = listener; |
| } |
| |
| public boolean isVirtual() { |
| boolean retVal = agentVirtual; |
| if (agentRealOnly) { |
| retVal = false; |
| } |
| return retVal; |
| } |
| |
| /** |
| * Tell Orchestrator about state change for recording into system-events.log |
| */ |
| private void stateChange(EventType eventType) { |
| String methodName = "stateChange"; |
| try { |
| Daemon daemon = Daemon.Agent; |
| NodeIdentity nodeIdentity = new NodeIdentity(); |
| DaemonDuccEvent ev = new DaemonDuccEvent(daemon, eventType, nodeIdentity); |
| ORDispatcher.dispatch(stateChangeEndpoint, ev, ""); |
| logger.info(methodName, null, stateChangeEndpoint, eventType.name(), |
| nodeIdentity.getCanonicalName()); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| |
| /* |
| * Report process lifecycle events on same channel that inventory is reported |
| */ |
| public DuccEventDispatcher getProcessLifecycleReportDispatcher() { |
| return ORDispatcher; |
| } |
| |
| /* |
| * Send process lifecycle event to interested listeners |
| */ |
| private void sendProcessLifecycleEventReport(IDuccProcess process, |
| LifecycleEvent lifecycleEvent) { |
| String location = "sendProcessLifecycleEventReport"; |
| try { |
| NodeIdentity nodeIdentity = getIdentity(); |
| AgentProcessLifecycleReportDuccEvent duccEvent = new AgentProcessLifecycleReportDuccEvent( |
| process, nodeIdentity, lifecycleEvent); |
| DuccEventDispatcher dispatcher = getProcessLifecycleReportDispatcher(); |
| dispatcher.dispatch(duccEvent); |
| StringBuffer sb = new StringBuffer(); |
| sb.append("id:" + process.getDuccId().toString() + " "); |
| sb.append("lifecycleEvent:" + lifecycleEvent.name() + " "); |
| String args = sb.toString().trim(); |
| logger.info(location, jobid, args); |
| } catch (Exception e) { |
| logger.error(location, jobid, e); |
| } |
| } |
| |
| /* |
| * Add ManagedProcess to map and send lifecycle event |
| */ |
| private void processDeploy(ManagedProcess mp) { |
| String location = "processDeploy"; |
| if (mp != null) { |
| if (deployedProcesses.contains(mp)) { |
| String args = "mp:" + mp.getProcessId(); |
| logger.error(location, jobid, args); |
| } else { |
| String args = "mp:" + mp.getProcessId(); |
| logger.debug(location, jobid, args); |
| deployedProcesses.add(mp); |
| IDuccProcess process = mp.getDuccProcess(); |
| sendProcessLifecycleEventReport(process, LifecycleEvent.Launch); |
| } |
| } else { |
| String args = "mp:" + mp; |
| logger.error(location, jobid, args); |
| } |
| } |
| |
| /* |
| * Remove ManagedProcess from map and send lifecycle event |
| */ |
| private void processUndeploy(ManagedProcess mp, Iterator<ManagedProcess> it) { |
| String location = "processUndeploy"; |
| if (mp != null) { |
| if (!deployedProcesses.contains(mp)) { |
| String args = "mp:" + mp.getProcessId(); |
| logger.error(location, jobid, args); |
| } else { |
| String args = "mp:" + mp.getProcessId(); |
| logger.debug(location, jobid, args); |
| // deployedProcesses.remove(mp); |
| // it.remove(); |
| IDuccProcess process = mp.getDuccProcess(); |
| sendProcessLifecycleEventReport(process, LifecycleEvent.Terminate); |
| } |
| } else { |
| String args = "mp:" + mp; |
| logger.error(location, jobid, args); |
| } |
| } |
| |
| /** |
| * C'tor for dependecy injection |
| * |
| * @param nodeIdentity |
| * - this Agent's identity |
| * @param launcher |
| * - component to launch processes |
| * @param context |
| * - camel context |
| */ |
| public NodeAgent(NodeIdentity nodeIdentity, Launcher launcher, CamelContext context, |
| AgentConfiguration factory) throws Exception { |
| super(COMPONENT_NAME, context); |
| |
| Utils.findDuccHome(); // add DUCC_HOME to System.properties |
| |
| // Running a real agent |
| agentVirtual = System.getProperty("ducc.agent.virtual") == null ? false : true; |
| |
| this.nodeIdentity = nodeIdentity; |
| this.launcher = launcher; |
| this.configurationFactory = factory; |
| this.commonProcessDispatcher = factory.getCommonProcessDispatcher(context); |
| this.ORDispatcher = factory.getORDispatcher(context); |
| |
| // fetch Page Size from the OS and cache it |
| pageSize = getOSPageSize(); |
| |
| numProcessors = getNodeProcessors(); |
| |
| logger.info("NodeAgent", null, "OS Page Size:" + pageSize); |
| |
| cpuClockRate = getOSClockRate(); |
| logger.info("NodeAgent", null, "OS Clock Rate:" + cpuClockRate); |
| |
| if (System.getProperty("ducc.rm.share.quantum") != null |
| && System.getProperty("ducc.rm.share.quantum").trim().length() > 0) { |
| shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim()); |
| } |
| /* Enable CGROUPS */ |
| String cgroups; |
| String cgUtilsPath = null; |
| boolean excludeNodeFromCGroups = false; |
| if (!isVirtual() |
| && (cgroups = System.getProperty("ducc.agent.launcher.cgroups.enable")) != null) { |
| if (cgroups.equalsIgnoreCase("true")) { |
| logger.info("nodeAgent", null, "ducc.properties [ducc.agent.launcher.cgroups.enable=true]"); |
| // Load exclusion file. Some nodes may be excluded from cgroups |
| String exclusionFile; |
| |
| // get the name of the exclusion file from ducc.properties |
| if ((exclusionFile = System.getProperty("ducc.agent.exclusion.file")) != null) { |
| logger.info("nodeAgent", null, |
| "Ducc configured with cgroup node exclusion file - ducc.properties [ducc.agent.exclusion.file=" |
| + exclusionFile + "]"); |
| // Parse node exclusion file and determine if cgroups and AP |
| // deployment |
| // is allowed on this node |
| NodeExclusionParser exclusionParser = new NodeExclusionParser(); |
| exclusionParser.parse(exclusionFile); |
| excludeNodeFromCGroups = exclusionParser.cgroupsExcluded(); |
| excludeAPs = exclusionParser.apExcluded(); |
| if (excludeNodeFromCGroups) { |
| logger.info("nodeAgent", null, |
| "------- Node Explicitly Excluded From Using CGroups. Check File:" |
| + exclusionFile); |
| cgroupFailureReason = "------- Node Explicitly Excluded From Using CGroups. Check File:" |
| + exclusionFile; |
| } |
| System.out.println( |
| "excludeNodeFromCGroups=" + excludeNodeFromCGroups + " excludeAPs=" + excludeAPs); |
| } else { |
| logger.info("nodeAgent", null, "Agent node *not* excluded from using cgroups"); |
| } |
| // node not in the exclusion list for cgroups |
| if (!excludeNodeFromCGroups) { |
| // fetch a list of paths the agent will search to find cgroups utils |
| // like cgexec. The default location is /usr/bin |
| logger.info("nodeAgent", null, |
| "Testing cgroups to check if runtime utilities (cgexec) exist in expected locations in the filesystem"); |
| String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir"); |
| if (cgroupsUtilsDirs == null) { |
| cgUtilsPath = "/usr/bin"; // default |
| } else { |
| String[] paths = cgroupsUtilsDirs.split(","); |
| for (String path : paths) { |
| File file = new File(path.trim() + "/cgexec"); |
| if (file.exists()) { |
| cgUtilsPath = path; |
| break; |
| } |
| } |
| } |
| // scan /proc/mounts for base cgroup dir |
| String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts"); |
| |
| if (cgUtilsPath == null) { |
| useCgroups = false; |
| logger.info("nodeAgent", null, |
| "------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties"); |
| } else if (cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) { |
| useCgroups = false; |
| logger.info("nodeAgent", null, |
| "------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts"); |
| |
| } else { |
| logger.info("nodeAgent", null, "Agent found cgroups runtime in " + cgUtilsPath |
| + " cgroups base dir=" + cgroupsBaseDir); |
| // if cpuacct is configured in cgroups, the subsystems list will be updated |
| String cgroupsSubsystems = "memory,cpu"; |
| |
| long maxTimeToWaitForProcessToStop = 60000; // default 1 minute |
| if (configurationFactory.processStopTimeout != null) { |
| maxTimeToWaitForProcessToStop = Long.valueOf(configurationFactory.processStopTimeout); |
| } |
| |
| cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, |
| logger, maxTimeToWaitForProcessToStop); |
| cgroupsManager.configure(this); |
| // check if cgroups base directory exists in the filesystem |
| // which means that cgroups |
| // and cgroups convenience package are installed and the |
| // daemon is up and running. |
| if (cgroupsManager.cgroupExists(cgroupsBaseDir)) { |
| logger.info("nodeAgent", null, |
| "Agent found cgroup base directory in " + cgroupsBaseDir); |
| try { |
| String containerId = "test"; |
| // validate cgroups by creating a dummy cgroup. The code checks if cgroup actually |
| // got created by |
| // verifying existence of test cgroup file. The second step in verification is to |
| // check if |
| // CPU control is working. Configured in cgconfig.conf, the CPU control allows for |
| // setting |
| // cpu.shares. The code will attempt to set the shares and subsequently tries to |
| // read the |
| // value from cpu.shares file to make sure the values match. Any exception in the |
| // above steps |
| // will cause cgroups to be disabled. |
| // |
| cgroupsManager.validator(cgroupsBaseDir, containerId, |
| System.getProperty("user.name"), false).cgcreate().cgset(100); // write |
| // cpu.shares=100 |
| // and |
| // validate |
| |
| // cleanup dummy cgroup |
| cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"), |
| SIGKILL); |
| useCgroups = true; |
| } catch (CGroupsManager.CGroupsException ee) { |
| logger.info("nodeAgent", null, ee); |
| cgroupFailureReason = ee.getMessage(); |
| useCgroups = false; |
| } |
| if (useCgroups) { |
| try { |
| // remove stale CGroups |
| cgroupsManager.cleanup(); |
| } catch (Exception e) { |
| logger.error("nodeAgent", null, e); |
| useCgroups = false; |
| logger.info("nodeAgent", null, |
| "Agent cgroup cleanup failed on this machine base directory in " |
| + cgroupsBaseDir |
| + ". Check if cgroups is installed on this node, Agent has correct permissions (consistent with cgconfig.conf), and the cgroup daemon is running"); |
| cgroupFailureReason = "------- CGroups Not Working on this Machine"; |
| } |
| } else { |
| logger.info("nodeAgent", null, |
| "Agent cgroup test failed on this machine base directory in " |
| + cgroupsBaseDir |
| + ". Check if cgroups is installed on this node, Agent has correct permissions (consistent with cgconfig.conf), and the cgroup daemon is running"); |
| cgroupFailureReason = "------- CGroups Not Working on this Machine"; |
| } |
| |
| } else { |
| logger.info("nodeAgent", null, "Agent failed to find cgroup base directory in " |
| + cgroupsBaseDir |
| + ". Check if cgroups is installed on this node and the cgroup daemon is running"); |
| // logger.info("nodeAgent", null, "------- CGroups Not Installed on this Machine"); |
| cgroupFailureReason = "------- CGroups Not Installed on this Machine"; |
| } |
| } |
| } |
| } |
| } else { |
| logger.info("nodeAgent", null, "------- CGroups Not Enabled on this Machine"); |
| cgroupFailureReason = "------- CGroups Not Enabled on this Machine - check ducc.properties: ducc.agent.launcher.cgroups.enable "; |
| } |
| |
| // begin publishing node metrics |
| factory.startNodeMetrics(this); |
| |
| logger.info("nodeAgent", null, |
| "CGroup Support=" + useCgroups + " excludeNodeFromCGroups=" + excludeNodeFromCGroups |
| + " excludeAPs=" + excludeAPs + " CGroups utils Dir:" + cgUtilsPath); |
| |
| String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn"); |
| if (useSpawn != null && useSpawn.toLowerCase().equals("true")) { |
| runWithDuccLing = true; |
| String c_launcher_path = Utils.resolvePlaceholderIfExists( |
| System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties()); |
| try { |
| File duccLing = new File(c_launcher_path); |
| if (duccLing.exists()) { |
| duccLingExists = true; |
| } |
| } catch (Exception e) { |
| logger.info("nodeAgent", null, |
| "------- Agent failed while checking for existence of ducc_ling", e); |
| } |
| |
| } |
| } |
| |
| private String fetchCgroupsBaseDir(String mounts) { |
| String cbaseDir = null; |
| BufferedReader br = null; |
| try { |
| FileInputStream fis = new FileInputStream(mounts); |
| // Construct BufferedReader from InputStreamReader |
| br = new BufferedReader(new InputStreamReader(fis)); |
| |
| String line = null; |
| while ((line = br.readLine()) != null) { |
| System.out.println(line); |
| if (line.trim().startsWith("cgroup")) { |
| String[] cgroupsInfo = line.split(" "); |
| if (cgroupsInfo[1].trim().equals("/cgroup")) { |
| cbaseDir = cgroupsInfo[1].trim(); |
| break; |
| } else if (cgroupsInfo[1].trim().endsWith("/memory")) { |
| // return the mount point minus the memory part |
| cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory")); |
| break; |
| } |
| } |
| } // while |
| |
| } catch (Exception e) { |
| logger.info("nodeAgent", null, |
| "------- Agent failed while checking for existence of CGroups", e.getMessage()); |
| } finally { |
| if (br != null) { |
| try { |
| br.close(); |
| } catch (Exception ex) { |
| } |
| } |
| } |
| return cbaseDir; |
| } |
| |
| public int getNodeProcessors() { |
| return runOSCommand(new String[] { "/usr/bin/getconf", "_NPROCESSORS_ONLN" }); |
| } |
| |
| public int getOSPageSize() { |
| return runOSCommand(new String[] { "/usr/bin/getconf", "PAGESIZE" }); |
| } |
| |
| public int getOSClockRate() { |
| return runOSCommand(new String[] { "/usr/bin/getconf", "CLK_TCK" }); |
| } |
| |
| private int runOSCommand(String[] cmd) { |
| InputStreamReader in = null; |
| BufferedReader reader = null; |
| int retVal = 0; |
| try { |
| ProcessBuilder pb = new ProcessBuilder(); |
| pb.command(cmd); |
| pb.redirectErrorStream(true); |
| Process p = pb.start(); |
| in = new InputStreamReader(p.getInputStream()); |
| reader = new BufferedReader(in); |
| String line = null; |
| |
| while ((line = reader.readLine()) != null) { |
| retVal = Integer.parseInt(line.trim()); |
| } |
| } catch (Exception e) { |
| logger.error("runOSCommand", null, e); |
| } finally { |
| if (reader != null) { |
| try { |
| reader.close(); |
| } catch (Exception ex) { |
| } |
| } |
| } |
| return retVal; |
| } |
| |
| public void setNodeInfo(Node node) { |
| this.node = node; |
| } |
| |
| public Node getNodeInfo() { |
| return node; |
| } |
| |
| public int getNodeTotalNumberOfShares() { |
| int shareQuantum = 0; |
| int shares = 1; |
| if (System.getProperty("ducc.rm.share.quantum") != null |
| && System.getProperty("ducc.rm.share.quantum").trim().length() > 0) { |
| shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim()); |
| shares = (int) getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() / shareQuantum; // get |
| // number |
| // of |
| // shares |
| if ((getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() % shareQuantum) > 0) |
| shares++; // ciel |
| } |
| |
| return shares; |
| } |
| |
| public void start(DuccService service) throws Exception { |
| super.start(service, null); |
| String methodName = "start"; |
| String name = nodeIdentity.getShortName(); |
| String ip = nodeIdentity.getIp(); |
| String jmxUrl = getProcessJmxUrl(); |
| DuccDaemonRuntimeProperties.getInstance().bootAgent(name, ip, jmxUrl); |
| String key = "ducc.broker.url"; |
| String value = System.getProperty(key); |
| logger.info(methodName, null, key + "=" + value); |
| stateChange(EventType.BOOT); |
| } |
| |
| public DuccEventDispatcher getEventDispatcherForRemoteProcess() { |
| return commonProcessDispatcher; |
| } |
| |
| public boolean duccLingExists() { |
| return duccLingExists; |
| } |
| |
| public void duccLingExists(boolean duccLingExists) { |
| this.duccLingExists = duccLingExists; |
| } |
| |
| public boolean runWithDuccLing() { |
| return runWithDuccLing; |
| } |
| |
| public void runWithDuccLing(boolean runWithDuccLing) { |
| this.runWithDuccLing = runWithDuccLing; |
| } |
| |
| /** |
| * Returns deep copy (by way of java serialization) of the Agents inventory. |
| */ |
| @SuppressWarnings("unchecked") |
| public Map<DuccId, IDuccProcess> getInventoryCopy() { |
| Object deepCopy = null; |
| try { |
| inventorySemaphore.acquire(); |
| deepCopy = SerializationUtils.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory); |
| } catch (InterruptedException e) { |
| } finally { |
| inventorySemaphore.release(); |
| } |
| return (Map<DuccId, IDuccProcess>) deepCopy; |
| } |
| |
| /** |
| * Returns shallow copy of the Agent's inventory |
| */ |
| public Map<DuccId, IDuccProcess> getInventoryRef() { |
| return inventory; |
| } |
| |
| /* |
| * Check if both the command and its args are missing, since the command defaults to the DUCC JVM. |
| */ |
| private boolean invalidCommand(ICommandLine commandLine) { |
| if (commandLine != null) { |
| if (commandLine.getExecutable() != null && commandLine.getExecutable().length() > 0) |
| return false; |
| if (commandLine.getCommandLine() != null && commandLine.getCommandLine().length > 0) |
| return false; |
| } |
| return true; |
| } |
| |
| private boolean isProcessDeallocated(IDuccProcess process) { |
| return (process.getProcessState().equals(ProcessState.Undefined) && process.isDeallocated()); |
| } |
| |
| /** |
| * Stops any process that is in agent's inventory but not in provided job list sent by the PM. |
| * |
| * @param lifecycleController |
| * - instance implementing stopProcess() method |
| * @param jobDeploymentList |
| * - all DUCC jobs sent by PM |
| */ |
| public void takeDownProcessWithNoJob(ProcessLifecycleController lifecycleController, |
| List<IDuccJobDeployment> jobDeploymentList) { |
| String methodName = "takeDownProcessWithNoJob"; |
| try { |
| inventorySemaphore.acquire(); |
| List<IDuccProcess> purgeList = new ArrayList<IDuccProcess>(); |
| boolean hasAjob = false; |
| // Check if every process in agent's inventory is associated with a |
| // job in a given |
| // jobDeploymentList |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // if a job list is empty, take down all agent processes that |
| // are in the inventory |
| if (jobDeploymentList.isEmpty()) { |
| logger.info(methodName, null, "...Agent Process:" + processEntry.getValue().getDuccId() |
| + " Not in JobDeploymentList. Ducc Currently Has No Jobs Running"); |
| hasAjob = false; |
| } else { |
| // iterate over all jobs |
| for (IDuccJobDeployment job : jobDeploymentList) { |
| // check if current process is a JD |
| if (job.getJdProcess() != null) { |
| if (processEntry.getValue().getDuccId().equals(job.getJdProcess().getDuccId())) { |
| hasAjob = true; |
| break; |
| } |
| } |
| // check if current process is a JP |
| for (IDuccProcess jProcess : job.getJpProcessList()) { |
| if (processEntry.getValue().getDuccId().equals(jProcess.getDuccId())) { |
| hasAjob = true; |
| break; |
| } |
| } |
| if (hasAjob) { |
| break; |
| } |
| } |
| } |
| if (!hasAjob) { |
| // if a process in agent inventory has no job and is still |
| // alive, stop it |
| if (isAlive(processEntry.getValue())) { |
| logger.error(methodName, null, |
| "<<<<<<<<< Stopping Process with no Job Assignement (Ghost Process) - DuccId:" |
| + processEntry.getValue().getDuccId() + " PID:" |
| + processEntry.getValue().getPID()); |
| processEntry.getValue().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.JPHasNoActiveJob.toString()); |
| lifecycleController.stopProcess(processEntry.getValue()); |
| } else { |
| // add process to purge list |
| purgeList.add(processEntry.getValue()); |
| } |
| } else { |
| hasAjob = false; |
| } |
| } |
| for (IDuccProcess processToPurge : purgeList) { |
| logger.error(methodName, null, |
| "XXXXXXXXXX Purging Process:" + processToPurge.getDuccId() + " Process State:" |
| + processToPurge.getProcessState() + " Process Resource State:" |
| + processToPurge.getResourceState()); |
| getInventoryRef().remove(processToPurge.getDuccId()); |
| } |
| } catch (Exception e) { |
| |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| private void stopProcessIfAlive(IDuccProcess process, |
| ProcessLifecycleController lifecycleController) { |
| String methodName = "stopProcessIfAlive"; |
| if (isAlive(process)) { |
| logger.error(methodName, null, |
| "<<<<<<<<< Stopping Process with no Job Assignement (Ghost Process) - DuccId:" |
| + process.getDuccId() + " PID:" + process.getPID()); |
| process.setReasonForStoppingProcess(ReasonForStoppingProcess.JPHasNoActiveJob.toString()); |
| lifecycleController.stopProcess(process); |
| } else { |
| logger.error(methodName, null, |
| "XXXXXXXXXX Purging Process:" + process.getDuccId() + " Process State:" |
| + process.getProcessState() + " Process Resource State:" |
| + process.getResourceState()); |
| |
| getInventoryRef().remove(process.getDuccId()); |
| |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) { |
| it.remove(); |
| break; |
| } |
| } |
| logger.info(methodName, null, "After Purge Inventory size:" + inventory.size() |
| + " deployedProcesses size:" + deployedProcesses.size()); |
| } |
| } |
| |
| /* |
| * Valid process exists in agent inventory and in an incoming OR state. If process exists in agent |
| * inventory but not in OR state than such process is invalid |
| */ |
| private boolean validProcess(IDuccProcess process, List<IDuccJobDeployment> jobDeploymentList) { |
| // iterate over all jobs |
| for (IDuccJobDeployment job : jobDeploymentList) { |
| // check if current process is a JD |
| if ((job.getJdProcess() != null |
| && process.getDuccId().equals(job.getJdProcess().getDuccId()))) { |
| return true; |
| } else { |
| // check if current process is a JP |
| for (IDuccProcess jProcess : job.getJpProcessList()) { |
| if (process.getDuccId().equals(jProcess.getDuccId())) { |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| public void takeDownProcessWithNoJobV2(ProcessLifecycleController lifecycleController, |
| List<IDuccJobDeployment> jobDeploymentList) { |
| String methodName = "takeDownProcessWithNoJobV2"; |
| try { |
| inventorySemaphore.acquire(); |
| |
| // iterate over all processes in agent inventory |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // if OR deployment list is empty, take down all agent processes that |
| // are in the inventory |
| if (jobDeploymentList.isEmpty() |
| || !validProcess(processEntry.getValue(), jobDeploymentList)) { |
| logger.info(methodName, null, "...Agent Process:" + processEntry.getValue().getDuccId() |
| + " Not in OR JobDeploymentList"); |
| stopProcessIfAlive(processEntry.getValue(), lifecycleController); |
| } |
| } |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| /** |
| * Reconciles agent inventory with job processes sent by PM |
| * |
| * @param lifecycleController |
| * - instance implementing stopProcess and startProcess |
| * @param process |
| * - job process from a Job List |
| * @param commandLine |
| * - in case this process is not in agents inventory we need cmd line to start it |
| * @param info |
| * - DUCC common info including user log dir, user name, etc |
| * @param workDuccId |
| * - job id |
| */ |
| public void reconcileProcessStateAndTakeAction(ProcessLifecycleController lifecycleController, |
| IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, |
| ProcessMemoryAssignment processMemoryAssignment, DuccId workDuccId, |
| boolean isPreemptable) { |
| String methodName = "reconcileProcessStateAndTakeAction"; |
| try { |
| inventorySemaphore.acquire(); |
| // Check if process exists in agent's inventory |
| if (getInventoryRef().containsKey(process.getDuccId())) { |
| IDuccProcess agentManagedProcess = getInventoryRef().get(process.getDuccId()); |
| // check if process is Running, Initializing, Started, or Starting |
| if (isAlive(agentManagedProcess)) { |
| // Stop the process if it has been deallocated |
| if (process.isDeallocated()) { |
| // if agent is in stopping state, it will try to stop |
| // its processes. |
| if (stopping && isPreemptable) { |
| logger.info(methodName, workDuccId, ">>>>>>>> Agent is stopping - process with PID:" |
| + process.getPID() + " is stopping"); |
| return; // agent is stopping. All processes are stopping |
| } else { |
| agentManagedProcess.setResourceState(ResourceState.Deallocated); |
| logger.info(methodName, workDuccId, |
| "<<<<<<<< Agent Stopping Process:" + process.getDuccId() + " PID:" |
| + process.getPID() + " Reason: Ducc Deallocated the Process."); |
| lifecycleController.stopProcess(agentManagedProcess); |
| } |
| } |
| // else nothing to do. Process has been deallocated |
| } |
| } else { // Process not in agent's inventory |
| // Add this process to the inventory so that it gets published. |
| getInventoryRef().put(process.getDuccId(), process); |
| if (process.isFailed()) { |
| // When a process scheduling class is invalid, the AgentEventListener will |
| // tag it as FAILED |
| process.setReasonForStoppingProcess( |
| IDuccProcess.ReasonForStoppingProcess.InvalidSchedulingClass.name()); |
| ITimeWindow twr = new TimeWindow(); |
| |
| process.setTimeWindowRun(twr); |
| twr.setStartLong(0); |
| twr.setEndLong(0); |
| |
| ITimeWindow twi = new TimeWindow(); |
| process.setTimeWindowInit(twi); |
| twi.setStartLong(0); |
| twi.setEndLong(0); |
| |
| } else if (process.isDeallocated()) { |
| // process not in agent's inventory and it is marked as |
| // deallocated. This can happen when an agent is restarted |
| // while the rest of DUCC is running. |
| // markAsStopped(process); |
| process.setProcessState(ProcessState.Stopped); |
| } else if (process.getResourceState().equals(ResourceState.Allocated)) { |
| // check if OR thinks that this process is still running. If |
| // so, this agent was restarted while the OR was running and |
| // we need to mark the process as Failed. |
| if (process.getProcessState().equals(ProcessState.Initializing) |
| || process.getProcessState().equals(ProcessState.Running)) { |
| process.setProcessState(ProcessState.Failed); |
| } else { |
| // enforce presence of command line |
| if (invalidCommand(commandLine)) { |
| process.setProcessState(ProcessState.Failed); |
| logger.info(methodName, workDuccId, |
| "Rejecting Process Start Request. Command line not provided for Process ID:" |
| + process.getDuccId()); |
| process.setReasonForStoppingProcess( |
| IDuccProcess.ReasonForStoppingProcess.CommandLineMissing.name()); |
| } else { |
| if (stopping) { |
| process.setProcessState(ProcessState.Rejected); |
| logger.info(methodName, workDuccId, ">>>>>>> Agent Rejected Process:" |
| + process.getDuccId() + " Start Request - Agent is stopping"); |
| return; |
| } |
| process.setProcessState(ProcessState.Starting); |
| logger.info(methodName, workDuccId, |
| ">>>>>>> Agent Starting Process:" + process.getDuccId() + " Process State:" |
| + process.getProcessState() + " Process Resource State:" |
| + process.getResourceState()); |
| lifecycleController.startProcess(process, commandLine, info, workDuccId, |
| processMemoryAssignment, isPreemptable); |
| } |
| } |
| } |
| } |
| } catch (Exception e) { |
| logger.error(methodName, workDuccId, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| public void doStartProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, |
| DuccId workDuccId, boolean isPreemptable) { |
| String methodName = "doStartProcess"; |
| try { |
| inventorySemaphore.acquire(); |
| if (getInventoryRef().containsKey(process.getDuccId())) { |
| logger.error(methodName, null, "Rejecting Process Start Request. Process with a Ducc ID:" |
| + process.getDuccId() + " is already in agent's inventory."); |
| return; |
| } |
| startProcess(process, commandLine, info, workDuccId, new ProcessMemoryAssignment(), |
| isPreemptable); |
| } catch (InterruptedException e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| private boolean isProcessRunning(IDuccProcess process) { |
| if (process.getProcessState().equals(ProcessState.Running) |
| || process.getProcessState().equals(ProcessState.Initializing)) { |
| return true; |
| } |
| return false; |
| } |
| |
| private boolean isOverSwapLimit(IDuccProcess process) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| // for (ManagedProcess deployedProcess : deployedProcesses) { |
| // Check if this process exceeds its alloted max swap usage |
| if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId()) |
| && process.getSwapUsage() > deployedProcess.getMaxSwapThreshold()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private long getSwapOverLimit(IDuccProcess process) { |
| long overLimit = 0; |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| // for (ManagedProcess deployedProcess : deployedProcesses) { |
| ManagedProcess deployedProcess = it.next(); |
| if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) { |
| overLimit = deployedProcess.getMaxSwapThreshold() - process.getSwapUsage(); |
| } |
| } |
| if (overLimit < 0) { |
| overLimit = 0; |
| } |
| return overLimit; |
| } |
| |
| /** |
| * Called when swap space on a node reached minimum as defined by ducc.node.min.swap.threshold in |
| * ducc.properties. The agent will find the biggest (in terms of memory) process in its inventory |
| * and stop it. |
| */ |
| public void killProcessDueToLowSwapSpace(long minSwapThreshold) { |
| String methodName = "killProcessDueToLowSwapSpace"; |
| IDuccProcess biggestProcess = null; |
| try { |
| inventorySemaphore.acquire(); |
| // find the fattest process in terms of absolute use of swap over the process limit |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| if (isProcessRunning(processEntry.getValue()) && isOverSwapLimit(processEntry.getValue()) |
| && (biggestProcess == null || getSwapOverLimit(biggestProcess) < getSwapOverLimit( |
| processEntry.getValue()))) { |
| biggestProcess = processEntry.getValue(); |
| } |
| } |
| } catch (InterruptedException e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| if (biggestProcess != null) { |
| biggestProcess.setReasonForStoppingProcess(ReasonForStoppingProcess.LowSwapSpace.toString()); |
| logger.info(methodName, null, "Stopping Process:" + biggestProcess.getDuccId() + " PID:" |
| + biggestProcess.getPID() |
| + " Due to a low swap space. Process' RSS exceeds configured swap threshold of " |
| + minSwapThreshold |
| + " Defined in ducc.properties. Check ducc.node.min.swap.threshold property"); |
| stopProcess(biggestProcess); |
| } |
| } |
| |
| public void interruptThreadInWaitFor(String pid) throws Exception { |
| String methodName = "interruptZombieProcess"; |
| synchronized (monitor) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| // for (ManagedProcess dProcess : deployedProcesses) { |
| ManagedProcess dProcess = it.next(); |
| if (dProcess.getPid() != null && dProcess.getPid().equals(pid)) { |
| Future<?> future = dProcess.getFuture(); |
| if (future != null && !future.isDone() && !future.isCancelled()) { |
| future.cancel(true); // interrupt the thread blocked on waitFor() |
| logger.info(methodName, dProcess.getDuccProcess().getDuccId(), |
| "Interrupted Thread - Zombie Process with PID:" + dProcess.getPid()); |
| } |
| break; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called when Agent receives request to start a process. |
| * |
| * @param process |
| * - IDuccProcess instance with identity (DuccId) |
| * @param commandLine |
| * - fully defined command line that will be used to exec the process. |
| * |
| */ |
| public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, |
| DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment, |
| boolean isPreemptable) { |
| String methodName = "startProcess"; |
| |
| try { |
| // Add process to the Agent's inventory before it is started |
| getInventoryRef().put(process.getDuccId(), process); |
| // enforce presence of command line |
| if (invalidCommand(commandLine)) { |
| process.setProcessState(ProcessState.Failed); |
| logger.info(methodName, null, |
| "Rejecting Process Start Request. Command line not provided for Process ID:" |
| + process.getDuccId()); |
| } else if (isProcessDeallocated(process)) { |
| process.setProcessState(ProcessState.Stopped); |
| logger.info(methodName, null, "Rejecting Process Start Request. Process ID:" |
| + process.getDuccId() + " hava already been deallocated due to Shrink"); |
| } else { |
| deployProcess(process, commandLine, info, workDuccId, processMemoryAssignment, |
| isPreemptable); |
| } |
| |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| |
| } |
| |
| public boolean isAlive(IDuccProcess invProcess) { |
| return invProcess.getProcessState().equals(ProcessState.Initializing) |
| || invProcess.getProcessState().equals(ProcessState.Running) |
| || invProcess.getProcessState().equals(ProcessState.Stopping) |
| || invProcess.getProcessState().equals(ProcessState.Starting) |
| || invProcess.getProcessState().equals(ProcessState.Started); |
| } |
| |
| public void doStopProcess(IDuccProcess process) { |
| String methodName = "stopProcess"; |
| try { |
| inventorySemaphore.acquire(); |
| stopProcess(process); |
| } catch (InterruptedException e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| |
| } |
| |
| /** |
| * Called when Agent receives request to stop a process. |
| * |
| * @param process |
| * - IDuccProcess instance with identity (DuccId) |
| * |
| */ |
| public void stopProcess(IDuccProcess process) { |
| String methodName = "stopProcess"; |
| try { |
| IDuccProcess invProcess = null; |
| if ((invProcess = getInventoryRef().get(process.getDuccId())) != null |
| && isAlive(invProcess)) { |
| logger.info(methodName, null, "Undeploing Process with PID:" + process.getPID()); |
| undeployProcess(process); |
| } else if (invProcess == null) { // process not in inventory |
| logger.info(methodName, null, |
| "Agent received Stop request for a process which is not in the Agent's inventory. " |
| + "It looks like this Agent was killed along with its child processes. Adding stale process to the inventory. PID:" |
| + process.getPID() + " DuccId:" + process.getDuccId() + ""); |
| // Received a request to stop a process that this is not in the |
| // current |
| // inventory. Most likely this agent was killed while its |
| // processes were |
| // still running. Add the process to the agent's inventory so |
| // that its |
| // included in the next published inventory. This is done so |
| // that the |
| // orchestrator can cleanup its state. |
| if (process.getProcessState() != ProcessState.Stopped |
| && process.getProcessState() != ProcessState.Failed |
| && process.getProcessState() != ProcessState.InitializationTimeout |
| && process.getProcessState() != ProcessState.FailedInitialization) { |
| // Force the Stopped state if not already stopped or failed |
| process.setProcessState(ProcessState.Stopped); |
| } |
| // Add stale process to the inventory. This will eventually be |
| // cleaned up |
| // when the PM sends purge request. |
| getInventoryRef().put(process.getDuccId(), process); |
| } |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| |
| /** |
| * Checks if process with a given PID has already registered memory collector Camel route. This |
| * route periodically fetches resident memory of a process with a given PID. Each process |
| * collector route is identified by process PID. |
| * |
| * @param pid |
| * - process PID also id of its route |
| * @return - true if memory collector route has already been created. False, otherwise |
| */ |
| private boolean addProcessMemoryCollector(String pid) { |
| // search all camel routes for one with a given id |
| for (Route route : super.getContext().getRoutes()) { |
| if (route.getId().equals(pid)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Remove given process from Agent's inventory |
| * |
| * @param process |
| * - process to purge from inventory |
| * @throws Exception |
| */ |
| public void purgeProcess(IDuccProcess process) throws Exception { |
| String methodName = "purgeProcess"; |
| DuccId key = null; |
| String pid = ""; |
| try { |
| inventorySemaphore.acquire(); |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // Check if process with a given unique DuccId exist in the |
| // local map |
| if (processEntry.getKey().equals(process.getDuccId())) { |
| key = processEntry.getKey(); |
| pid = processEntry.getValue().getPID(); |
| break; |
| } |
| } |
| if (key != null) { |
| getInventoryRef().remove(key); |
| logger.info(methodName, null, ">>>> Agent Purged Process with PID:" + pid); |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| inventorySemaphore.release(); |
| } |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| // for (ManagedProcess deployedProcess : deployedProcesses) { |
| ManagedProcess deployedProcess = it.next(); |
| // Find ManagedProcess instance the DuccProcess instance is |
| // associated with |
| if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) { |
| processUndeploy(deployedProcess, it); |
| break; |
| } |
| } |
| } |
| |
| private boolean changeState(ProcessState state) { |
| switch (state) { |
| case FailedInitialization: |
| case InitializationTimeout: |
| case Stopped: |
| case Stopping: |
| return false; |
| case Starting: |
| case Started: |
| case Initializing: |
| case Running: |
| return true; |
| default: |
| break; |
| } |
| return false; |
| } |
| |
| /** |
| * Called when a service wrapper sends status update |
| * |
| * @param duccEvent |
| * - Ducc event object |
| * |
| * @throws Exception |
| */ |
| |
| public void updateProcessStatus(ProcessStateUpdateDuccEvent duccEvent) throws Exception { |
| String methodName = "updateProcessStatus"; |
| |
| try { |
| inventorySemaphore.acquire(); |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // Check if process with a given unique DuccId exist in the |
| // local map |
| if (processEntry.getKey().getUnique().equals(duccEvent.getDuccProcessId())) { |
| // found it. Update pid and state of the process |
| if (duccEvent.getPid() != null && processEntry.getValue().getPID() == null) { |
| processEntry.getValue().setPID(duccEvent.getPid()); |
| } |
| |
| if (duccEvent.getProcessJmxUrl() != null |
| && processEntry.getValue().getProcessJmxUrl() == null) { |
| processEntry.getValue().setProcessJmxUrl(duccEvent.getProcessJmxUrl()); |
| } |
| ITimeWindow tw = processEntry.getValue().getTimeWindowInit(); |
| if (tw != null) { |
| if (!duccEvent.getState().equals(ProcessState.Initializing)) { |
| // Mark the time the process ended initialization. It also |
| // covers a case when the process terminates while initializing |
| tw.setEnd(TimeStamp.getCurrentMillis()); |
| if (duccEvent.getState().equals(ProcessState.Running)) { |
| ITimeWindow twr = new TimeWindow(); |
| String millis; |
| millis = TimeStamp.getCurrentMillis(); |
| // Mark the time the process started running |
| processEntry.getValue().setTimeWindowRun(twr); |
| twr.setStart(millis); |
| } |
| } |
| } else { |
| logger.info(methodName, null, |
| "++++++++++++ Agent Init TimeWindow not available - tw==null"); |
| } |
| ManagedProcess deployedProcess = null; |
| synchronized (monitor) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| // Find ManagedProcess instance the DuccProcess |
| // instance is associated with |
| ManagedProcess dProcess = it.next(); |
| if (dProcess.getDuccProcess().getDuccId().getUnique() |
| .equals(duccEvent.getDuccProcessId())) { |
| deployedProcess = dProcess; |
| break; |
| } |
| } |
| } |
| if (processEntry.getValue().getProcessState() != ProcessState.Running |
| && duccEvent.getState().equals(ProcessState.Running) && deployedProcess != null) { |
| // cancel process initialization timer. |
| deployedProcess.stopInitializationTimer(); |
| } |
| |
| logger.info(methodName, null, |
| ">>>> Agent Handling Process Update - Ducc Id: " |
| + processEntry.getValue().getDuccId() + " PID:" |
| + processEntry.getValue().getPID() + " Status:" + duccEvent.getState() |
| + " Deallocated:" + processEntry.getValue().isDeallocated()); |
| if (deployedProcess != null && deployedProcess.getSocketEndpoint() == null |
| && duccEvent.getServiceEdnpoint() != null) { |
| deployedProcess.setSocketEndpoint(duccEvent.getServiceEdnpoint()); |
| } |
| |
| // This is a delayed stop. Previously a request to stop the |
| // process was received |
| // but the PID was not available yet. Instead a flag was set |
| // to initiate a |
| // stop after the process reports the PID. |
| if (deployedProcess != null && deployedProcess.killAfterLaunch()) { |
| logger.info(methodName, null, |
| ">>>> Process Ducc Id:" + processEntry.getValue().getDuccId() |
| + " Was Previously Tagged for Kill While It Was Starting"); |
| undeployProcess(processEntry.getValue()); |
| } else if (deployedProcess != null && deployedProcess.doKill() && deployedProcess |
| .getDuccProcess().getProcessState().equals(ProcessState.Stopped)) { |
| deployedProcess.getDuccProcess() |
| .setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString()); |
| } else if (deployedProcess != null && (deployedProcess.doKill() |
| || deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Failed) |
| || deployedProcess.getDuccProcess().getProcessState() |
| .equals(ProcessState.Killed))) { |
| // The process has already stopped, but managed to send |
| // the last update before dying. Ignore the update |
| return; |
| } else if (changeState(processEntry.getValue().getProcessState())) { |
| logger.info(methodName, null, "=============== PID:" + processEntry.getValue().getPID() |
| + " Changing State - current state:" + processEntry.getValue().getProcessState() |
| + " New State:" + duccEvent.getState()); |
| processEntry.getValue().setProcessState(duccEvent.getState()); |
| DuccEventDispatcher dispatcher = configurationFactory |
| .getORDispatcher(super.getContext()); |
| try { |
| DefaultNodeInventoryProcessor processor = configurationFactory |
| .nodeInventoryProcessor(this); |
| Map<DuccId, IDuccProcess> inventoryCopy = (Map<DuccId, IDuccProcess>) SerializationUtils |
| .clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory); |
| |
| processor.dispatchInventoryUpdate(dispatcher, |
| configurationFactory.getInventoryUpdateEndpoint(), inventoryCopy); |
| logger.info(methodName, null, "Sent Node Inventory Update to the OR - process PID:" |
| + processEntry.getValue().getPID()); |
| |
| } catch (Exception e) { |
| logger.warn("", null, e); |
| } |
| |
| // if the process is Stopping, it must have hit an error threshold |
| } |
| // Check if MemoryCollector should be created for this |
| // process. It collects |
| // resident memory of the process at regular intervals. |
| // Should only be added |
| // once for each process. This route will have its id set to |
| // process PID. |
| if (addProcessMemoryCollector(duccEvent.getPid()) |
| && (duccEvent.getState().equals(ProcessState.Initializing) |
| || duccEvent.getState().equals(ProcessState.Running))) { |
| if (duccEvent.getState().equals(ProcessState.Running)) { |
| if (processEntry.getValue().getUimaPipelineComponents() != null |
| && processEntry.getValue().getUimaPipelineComponents().size() > 0) { |
| processEntry.getValue().getUimaPipelineComponents().clear(); |
| if (duccEvent.getUimaPipeline() != null) { |
| duccEvent.getUimaPipeline().clear(); |
| } |
| } |
| } |
| |
| } else if (duccEvent.getState().equals(ProcessState.Stopped) |
| || duccEvent.getState().equals(ProcessState.Failed) |
| || duccEvent.getState().equals(ProcessState.Killed)) { |
| if (deployedProcess.getMetricsProcessor() != null) { |
| deployedProcess.getMetricsProcessor().close(); // close open fds (stat and statm |
| // files) |
| } |
| logger.info(methodName, null, |
| "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:" |
| + duccEvent.getPid()); |
| } else if (duccEvent.getState().equals(ProcessState.FailedInitialization)) { |
| logger.info(methodName, null, |
| ">>>> Agent Handling Process FailedInitialization. PID:" + duccEvent.getPid()); |
| deployedProcess.getDuccProcess().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.FailedInitialization.toString()); |
| deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping); |
| deployedProcess.setStopping(); |
| |
| deployedProcess.kill(); |
| logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:" |
| + duccEvent.getPid() + " Killing Process"); |
| |
| undeployProcess(processEntry.getValue()); |
| |
| } else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) { |
| deployedProcess.getDuccProcess().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.InitializationTimeout.toString()); |
| deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping); |
| deployedProcess.setStopping(); |
| |
| // Mark process for death. Doesnt actually kill the process |
| |
| deployedProcess.kill(); |
| logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:" |
| + duccEvent.getPid() + " Killing Process"); |
| |
| undeployProcess(processEntry.getValue()); |
| |
| } else if (duccEvent.getState().equals(ProcessState.Stopping)) { |
| if (duccEvent.getMessage() != null && duccEvent.getMessage() |
| .equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) { |
| processEntry.getValue().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.ExceededErrorThreshold.toString()); |
| } |
| if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped) |
| && !deployedProcess.getDuccProcess().getProcessState() |
| .equals(ProcessState.Stopping)) { |
| deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping); |
| deployedProcess.setStopping(); |
| } |
| } |
| if (duccEvent.getUimaPipeline() != null) { |
| StringBuffer buffer = new StringBuffer("\t\tUima Pipeline -"); |
| for (IUimaPipelineAEComponent uimaAeState : duccEvent.getUimaPipeline()) { |
| buffer.append("\n\t\tAE:").append(uimaAeState.getAeName()).append(" state:") |
| .append(uimaAeState.getAeState()).append(" InitTime:") |
| .append(uimaAeState.getInitializationTime() / 1000).append(" secs. Thread:") |
| .append(uimaAeState.getAeThreadId()); |
| } |
| logger.info(methodName, null, buffer.toString()); |
| ((DuccProcess) processEntry.getValue()) |
| .setUimaPipelineComponents(duccEvent.getUimaPipeline()); |
| } |
| return; // found it. Done |
| } |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| /** |
| * Deploys process using supplied command line |
| * |
| * @param process |
| * - Process with identity (DuccId) |
| * @param commandLine |
| * - fully defined command line that will be used to exec the process. |
| */ |
| private void deployProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, |
| DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment, boolean preemptable) { |
| String methodName = "deployProcess"; |
| synchronized (monitor) { |
| boolean deployProcess = true; |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| // for (ManagedProcess deployedProcess : deployedProcesses) { |
| ManagedProcess deployedProcess = it.next(); |
| // ignore duplicate start request for the same process |
| if (deployedProcess.getDuccId().equals(process.getDuccId())) { |
| deployProcess = false; |
| break; |
| } |
| } |
| if (deployProcess) { |
| try { |
| logger.info(methodName, workDuccId, |
| "Agent [" + getIdentity().getIp() + "] Deploying Process - DuccID:" |
| + process.getDuccId().getFriendly() + " Unique DuccID:" |
| + process.getDuccId().getUnique() + " Node Assignment:" |
| + process.getNodeIdentity().getIp() + " Process Memory Assignment:" |
| + processMemoryAssignment + " MBs"); |
| TimeWindow tw = new TimeWindow(); |
| tw.setStart(TimeStamp.getCurrentMillis()); |
| tw.setEnd(null); |
| process.setTimeWindowInit(tw); |
| ManagedProcess managedProcess = new ManagedProcess(process, commandLine, this, logger, |
| processMemoryAssignment, preemptable); |
| managedProcess.setProcessInfo(info); |
| managedProcess.setWorkDuccId(workDuccId); |
| |
| // enrich process spec with unique ducc id which will be |
| // used to correlate message |
| // exchanges |
| // between the agent and launched process |
| |
| ManagedProcess deployedProcess = launcher.launchProcess(this, getIdentity(), process, |
| commandLine, this, managedProcess); |
| processDeploy(deployedProcess); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } else { |
| logger.info(methodName, workDuccId, |
| "Ignoring duplicate request to start process - DuccID:" |
| + process.getDuccId().getFriendly() + " Unique DuccID:" |
| + process.getDuccId().getUnique()); |
| } |
| } |
| } |
| |
| class AgentStreamConsumer implements Runnable { |
| private InputStream theStream; |
| |
| AgentStreamConsumer(InputStream is) { |
| theStream = is; |
| } |
| |
| public void run() { |
| String methodName = "AgentStreamConsumer.run"; |
| |
| BufferedReader bufferedReader = null; |
| try { |
| bufferedReader = new BufferedReader(new InputStreamReader(theStream)); |
| String line = null; |
| while ((line = bufferedReader.readLine()) != null) { |
| StringBuffer outputBuffer = new StringBuffer(); |
| outputBuffer.append(line + "\n"); |
| } |
| } catch (Throwable t) { |
| logger.warn(methodName, null, t); |
| t.printStackTrace(); |
| } finally { |
| try { |
| bufferedReader.close(); |
| } catch (Exception e) { |
| } |
| } |
| } |
| } |
| |
| enum SIGNAL { |
| SIGTERM("-15"), SIGKILL("-9"); |
| |
| String signal = ""; |
| |
| SIGNAL(String kind) { |
| signal = kind; |
| } |
| |
| public String get() { |
| return signal; |
| } |
| }; |
| |
| class ProcessRunner implements Runnable { |
| ManagedProcess deployedProcess; |
| |
| public ProcessRunner(final ManagedProcess deployedProcess) {// final String pid, SIGNAL signal ) |
| // { |
| this.deployedProcess = deployedProcess; |
| } |
| |
| public void run() { |
| stopProcess(deployedProcess.getDuccProcess()); |
| } |
| } |
| |
| private boolean runnable(ManagedProcess process) { |
| return (process.getDuccProcess().getProcessState().equals(ProcessState.Initializing) |
| || process.getDuccProcess().getProcessState().equals(ProcessState.Starting) |
| || process.getDuccProcess().getProcessState().equals(ProcessState.Started) |
| || process.getDuccProcess().getProcessState().equals(ProcessState.Running)); |
| } |
| |
| /** |
| * This method is called when an agent receives a STOP request. It sends SIGTERM to all |
| * non-preemptable child processes and starts a timer. If the timer pops and child processes are |
| * still running, the agent takes itself out via halt() |
| */ |
| private boolean stopChildProcesses(boolean quiesceMode) { |
| String methodName = "stopNow"; |
| boolean wait = false; |
| try { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| String pid = deployedProcess.getDuccProcess().getPID(); |
| logger.info(methodName, null, "....Process:" + pid + " is JD=" + deployedProcess.isJd() |
| + " Preemptable:" + deployedProcess.isPreemptable()); |
| // dont send SIGTERM to non-preemptable processes in quiesce mode |
| if ((quiesceMode && !deployedProcess.isPreemptable()) || deployedProcess.isStopping() |
| || pid == null || pid.trim().length() == 0 || !runnable(deployedProcess)) { |
| continue; |
| } |
| |
| logger.info(methodName, null, |
| "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId() |
| + " PID:" + pid + " Sending SIGTERM Process State:" |
| + deployedProcess.getDuccProcess().getProcessState().toString() |
| + " Process Type:" + deployedProcess.getDuccProcess().getProcessType() |
| + " Uima AS:" + deployedProcess.isUimaAs() + " Preemtable:" |
| + deployedProcess.isPreemptable()); |
| wait = true; |
| deployedProcess.setStopPriority(StopPriority.DONT_WAIT); |
| // Stop each child process in its own thread to parallelize SIGTERM requests |
| ExecutorService executor = Executors.newSingleThreadExecutor(); |
| executor.execute(new ProcessRunner(deployedProcess)); |
| |
| } |
| |
| } catch (Exception e) { |
| logger.warn(methodName, null, e); |
| } |
| return wait; |
| } |
| |
| private void killChildProcesses(boolean killOnlyUimaAs, boolean quiesce) { |
| String methodName = "killChildProcesses"; |
| |
| try { |
| if (useCgroups) { |
| logger.info(methodName, null, "CgroupsManager.cleanup() before "); |
| if (killOnlyUimaAs) { |
| Set<String> pidsToKill = new HashSet<>(); |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess p = it.next(); |
| if (!p.isPreemptable() && p.getPid() != null && p.isUimaAs()) { |
| pidsToKill.add(p.getPid()); |
| } |
| } |
| if (!pidsToKill.isEmpty()) { |
| logger.info(methodName, null, ">>>>>>>> Found " + pidsToKill.size() |
| + " UIMA-AS processes still running - killing all non-preemptables via kill -9"); |
| // Since SIGTERM may not be enough to take down a process, use cgroups to find |
| // any UIMA-AS process still standing and do hard kill |
| cgroupsManager.cleanupPids(pidsToKill); |
| |
| } |
| |
| } else { |
| Set<String> pidsToKill = new HashSet<>(); |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| // Since SIGTERM may not be enough to take down a process, use cgroups to find |
| // any process still standing and do hard kill |
| while (it.hasNext()) { |
| ManagedProcess p = it.next(); |
| if ((!quiesce && !p.isPreemptable()) && p.getPid() != null && p.isUimaAs()) { |
| pidsToKill.add(p.getPid()); |
| } |
| } |
| // cgroupsManager.cleanup(); |
| cgroupsManager.cleanupPids(pidsToKill); |
| |
| } |
| logger.info(methodName, null, "CgroupsManager.cleanup() after "); |
| } else { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| |
| String pid = deployedProcess.getDuccProcess().getPID(); |
| if ((quiesce && !deployedProcess.isPreemptable()) || pid == null |
| || pid.trim().length() == 0 || !runnable(deployedProcess)) { |
| continue; |
| } |
| logger.info(methodName, null, |
| "....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId() |
| + " PID:" + pid + " Sending SIGKILL Process State:" |
| + deployedProcess.getDuccProcess().getProcessState().toString()); |
| ICommandLine cmdLine; |
| if (Utils.isWindows()) { |
| cmdLine = new NonJavaCommandLine("taskkill"); |
| cmdLine.addArgument("/PID"); |
| } else { |
| cmdLine = new NonJavaCommandLine("/bin/kill"); |
| cmdLine.addArgument("-9"); |
| } |
| cmdLine.addArgument(pid); |
| |
| deployedProcess.setStopping(); |
| deployedProcess.setStopPriority(StopPriority.DONT_WAIT); |
| |
| launcher.launchProcess(this, getIdentity(), deployedProcess.getDuccProcess(), cmdLine, |
| this, deployedProcess); |
| } |
| |
| } |
| } catch (Exception e) { |
| logger.warn(methodName, null, e); |
| } |
| |
| } |
| |
| private void handleSigTermTimeout(ManagedProcess deployedProcess) { |
| String methodName = "handleSigTermTimeout"; |
| if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)) { |
| logger.info(methodName, deployedProcess.getDuccId(), |
| "------------ Agent Timed-out Waiting for Process with PID:" |
| + deployedProcess.getDuccProcess().getPID() + " to Stop. Process State:" |
| + deployedProcess.getDuccProcess().getProcessState() |
| + " .Process did not stop in allotted time of " |
| + maxTimeToWaitForProcessToStop + " millis"); |
| logger.info(methodName, deployedProcess.getDuccId(), |
| ">>>>>>>>>>>>>>> Killing Process:" + deployedProcess.getDuccProcess().getPID() |
| + " .Process State:" + deployedProcess.getDuccProcess().getProcessState()); |
| } |
| |
| ICommand sigKillCommand = new SigKillCommand(deployedProcess, logger); |
| launcher.launchOSCommand(sigKillCommand); |
| } |
| |
| /** |
| * Kills a given process |
| * |
| * @param process |
| * - process to kill |
| */ |
| private void undeployProcess(IDuccProcess process) { |
| String methodName = "undeployProcess"; |
| |
| synchronized (monitor) { |
| |
| ManagedProcess deployedProcess = getDeployedProcess(process.getDuccId()); |
| |
| // Given process does not exist in agent's inventory |
| if (Objects.isNull(deployedProcess)) { |
| logger.info(methodName, null, |
| ".... Process - DuccId:" + process.getDuccId() + " PID:" + process.getPID() |
| + " Not in Agent's inventory. Adding to the inventory with state=Stopped"); |
| process.setProcessState(ProcessState.Stopped); |
| inventory.put(process.getDuccId(), process); |
| processDeploy(new ManagedProcess(process, null, this, logger, new ProcessMemoryAssignment(), |
| true)); |
| return; |
| } |
| |
| if (Objects.isNull(deployedProcess.getPid())) { |
| if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)) { |
| // process not reported its PID yet when the process |
| // reports its PID, check if it should be killed. |
| logger.info(methodName, deployedProcess.getDuccId(), ".... Process - Ducc ID:" |
| + deployedProcess.getDuccId() |
| + " Has Not Started Yet. PID Not Available. Tagging Process For Kill When It Reports Status"); |
| deployedProcess.killAfterLaunch(true); |
| } |
| return; |
| } |
| |
| ICommand sigTermCommand = new SigTermCommand(deployedProcess, logger); |
| |
| // Mark the process as stopping. When the process exits, |
| // the agent can determine if the process died on its own |
| // (due to say, user code problem) or if it died |
| // due to Agent initiated stop. |
| deployedProcess.setStopping(); |
| |
| try { |
| logger.info(methodName, deployedProcess.getDuccId(), |
| "------------ Agent Starting Killer Timer Task For Process with PID:" |
| + deployedProcess.getDuccProcess().getPID() + " Process State: " |
| + deployedProcess.getDuccProcess().getProcessState()); |
| // launch SIGTERM |
| launcher.launchOSCommand(sigTermCommand); |
| // fetch deployed process Future object which |
| // will be used to determine if its still |
| // running |
| Future<?> future = deployedProcess.getFuture(); |
| // future.get() blocks until process stops or timeout occurs |
| future.get(maxTimeToWaitForProcessToStop, TimeUnit.MILLISECONDS); |
| logger.info(methodName, deployedProcess.getDuccId(), ">>>>>>>>>>>>> Process with PID:" |
| + deployedProcess.getDuccProcess().getPID() + " Terminated"); |
| } catch (TimeoutException e) { |
| logger.info(methodName, deployedProcess.getDuccId(), |
| ">>>>>>>>> Timed Out Waiting For Process To Terminate After SIGTERM Was Sent"); |
| handleSigTermTimeout(deployedProcess); |
| } catch (Exception e) { |
| logger.error(methodName, deployedProcess.getDuccId(), e); |
| } |
| |
| try { |
| monitor.wait(500); |
| } catch (InterruptedException ee) { |
| |
| } |
| |
| // check if defunct process |
| if (isProcessRunning(deployedProcess.getDuccProcess())) { |
| // wait for a short while before running DefunctProcessDetector |
| // Sometimes when a process is killed via kill -9 it shows as |
| // defunct in ps output. Not sure why this is so. |
| |
| // spin a thread where we check if the process is defunct. If true, |
| // the process state is changed to Stopped and reason set to 'defunct'. |
| // Next inventory publication will include this new state and the OR |
| // can terminate a job. |
| defunctDetectorExecutor.execute(new DefunctProcessDetector(deployedProcess, logger)); |
| } |
| logger.info(methodName, deployedProcess.getDuccId(), "Inventory size:" + inventory.size() |
| + " deployedProcesses size:" + deployedProcesses.size()); |
| } |
| } |
| |
| public NodeIdentity getIdentity() { |
| return nodeIdentity; |
| } |
| |
| /** |
| * Called when a process exits. |
| */ |
| public void onProcessExit(IDuccProcess process) { |
| String methodName = "onProcessExit"; |
| if (process == null) { |
| return; |
| } |
| try { |
| ProcessStateUpdate processStateUpdate = new ProcessStateUpdate(process.getProcessState(), |
| process.getPID(), process.getDuccId().getUnique()); |
| ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate); |
| // cleanup Camel route associated with a process that just stopped |
| if (process.getPID() != null && super.getContext().getRoute(process.getPID()) != null) { |
| try { |
| // stop collecting process stats from /proc/<pid>/statm |
| super.getContext().stopRoute(process.getPID()); |
| } catch (Exception e) { |
| logger.error(methodName, null, |
| "....Unable to stop Camel route for PID:" + process.getPID()); |
| } |
| // remove route from context, otherwise the routes accumulate over time causing memory leak |
| super.getContext().removeRoute(process.getPID()); |
| StringBuilder sb = new StringBuilder("\n"); |
| logger.info(methodName, null, |
| "Removed Camel Route from Context for PID:" + process.getPID()); |
| |
| for (Route route : super.getContext().getRoutes()) { |
| sb.append("Camel Context - RouteId:" + route.getId() + "\n"); |
| } |
| logger.info(methodName, null, sb.toString()); |
| } |
| updateProcessStatus(event); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } finally { |
| |
| } |
| } |
| |
| public void onJPInitTimeout(IDuccProcess process, long timeout) { |
| String methodName = "onJPInitTimeout"; |
| try { |
| System.out.println("--------- Agent Timedout While Waiting For JP (PID:" + process.getPID() |
| + ") to initialize. The JP exceeded configured timeout of " + timeout / (60 * 1000) |
| + " minutes"); |
| ProcessStateUpdate processStateUpdate = new ProcessStateUpdate( |
| ProcessState.InitializationTimeout, process.getPID(), |
| process.getDuccId().getUnique()); |
| ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate); |
| updateProcessStatus(event); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| |
| public void shutdown(String reason) { |
| String methodName = "shutdown"; |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| try { |
| |
| undeployProcess(deployedProcess.getDuccProcess()); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| } |
| |
| public static void lock() throws Exception { |
| agentLock.acquire(); |
| } |
| |
| public static void unlock() throws Exception { |
| agentLock.release(); |
| } |
| |
| public boolean isManagedProcess(Set<NodeUsersCollector.ProcessInfo> processList, |
| NodeUsersCollector.ProcessInfo cpi) { |
| synchronized (monitor) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| if (deployedProcess.getDuccProcess() != null) { |
| // Check if process has been deployed but has not yet |
| // reported its PID. |
| // This is normal. It takes a bit of time until the JP |
| // reports |
| // its PID to the Agent. If there is at least one process in |
| // Agent |
| // deploy list with no PID we assume it is the one. |
| String dppid = deployedProcess.getDuccProcess().getPID(); |
| if (dppid == null || dppid.equals(String.valueOf(cpi.getPid()))) { |
| return true; |
| } |
| } |
| } |
| for (NodeUsersCollector.ProcessInfo pi : processList) { |
| if (pi.getPid() == cpi.getPPid() && pi.getChildren().size() > 0) { // is |
| // the current process a child of another java |
| // Process? |
| return isManagedProcess(pi.getChildren(), pi); |
| } |
| } |
| } |
| return false; |
| } |
| |
| public boolean isRogueProcess(String uid, Set<NodeUsersCollector.ProcessInfo> processList, |
| NodeUsersCollector.ProcessInfo cpi) throws Exception { |
| |
| synchronized (monitor) { |
| // if cgroups are enabled, check if a given PID (cpi) exists in any of |
| // the containers. If so, the process is not rogue. |
| if (useCgroups) { |
| if (cgroupsManager.isPidInCGroup(String.valueOf(cpi.getPid()))) { |
| return false; |
| } |
| } |
| // Agent adds a process to its inventory before launching it. So it |
| // is |
| // possible that the inventory contains a process with no PID. If |
| // there |
| // is such process in the inventory we cannot determine that a given |
| // pid is rogue yet. Eventually, the launched process reports its |
| // PID |
| boolean foundDeployedProcessWithNoPID = false; |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| if (deployedProcess.getDuccProcess() != null) { |
| // Check if process has been deployed but has not yet |
| // reported its PID. |
| // This is normal. It takes a bit of time until the JP |
| // reports |
| // its PID to the Agent. If there is at least one process in |
| // Agent |
| // deploy list with no PID we assume it is the one. |
| if (deployedProcess.getDuccProcess().getPID() == null) { |
| foundDeployedProcessWithNoPID = true; |
| break; |
| } |
| String dppid = deployedProcess.getDuccProcess().getPID(); |
| // process in inventory, not rogue |
| if (dppid != null && dppid.equals(String.valueOf(cpi.getPid()))) { |
| return false; |
| } |
| } |
| } |
| // not found |
| if (foundDeployedProcessWithNoPID) { |
| return false; |
| } else if (cpi.getPPid() == 1) { // Any process owned by init is rogue |
| // interrupt agent's thread blocking in waitFor() awaiting process termination. |
| // This process is a zombie and there is no need to waste the thread. |
| interruptThreadInWaitFor(String.valueOf(cpi.getPid())); |
| return true; |
| } else { |
| return isParentProcessRogue(processList, cpi); |
| } |
| } |
| } |
| |
| private boolean isParentProcessRogue(Set<NodeUsersCollector.ProcessInfo> processList, |
| NodeUsersCollector.ProcessInfo cpi) { |
| // boolean found = false; |
| for (NodeUsersCollector.ProcessInfo pi : processList) { |
| if (pi.getPid() == cpi.getPPid()) { // is the current process a |
| if (pi.isRogue()) { // if parent is rogue, a child is rogue as |
| // well |
| return true; |
| } |
| return false; |
| } else { |
| if (pi.getChildren().size() > 0) { |
| return isParentProcessRogue(pi.getChildren(), cpi); |
| } |
| } |
| } |
| return true; |
| |
| } |
| |
| /** |
| * Process resident memory collector routes. Collects resident memory at fixed interval from the |
| * OS. |
| * |
| */ |
| public class ProcessMemoryUsageRoute extends RouteBuilder { |
| private NodeAgent agent; |
| |
| private IDuccProcess process; |
| |
| private ManagedProcess managedProcess; |
| |
| public ProcessMemoryUsageRoute(NodeAgent agent, IDuccProcess process, |
| ManagedProcess managedProcess) { |
| this.process = process; |
| this.managedProcess = managedProcess; |
| this.agent = agent; |
| } |
| |
| public void configure() throws Exception { |
| Processor nmp = configurationFactory.processMetricsProcessor(agent, process, managedProcess); |
| int fixedRate = configurationFactory.getNodeInventoryPublishDelay(); |
| from("timer:processMemPollingTimer?fixedRate=true&delay=100&period=" + fixedRate) |
| .routeId(process.getPID()).autoStartup(true).process(nmp); |
| } |
| } |
| |
| @Override |
| public boolean isStopping() { |
| return stopping; |
| } |
| |
| @Override |
| public void quiesceAndStop() throws Exception { |
| stop(true, -1); |
| } |
| |
| private void stop(boolean quiesce, long waitTimeInSecs) throws Exception { |
| synchronized (stopLock) { |
| logger.info("stop", null, "Agent stop() - quiesce:" + quiesce); |
| if (stopping) { |
| return; |
| } |
| stopping = true; |
| stateChange(EventType.SHUTDOWN); |
| // Dispatch SIGTERM to all processes. If this is quiesce mode we dont try to stop |
| // non-preemptable processes |
| boolean wait = stopChildProcesses(quiesce); |
| if (quiesce) { |
| logger.info("stop", null, "Agent stopping managed processes"); |
| long waitTime = 60; // default |
| try { |
| waitTime = Long.valueOf(configurationFactory.processStopTimeout); |
| // Normalize. The configurationFactory.processStopTimeout from |
| // ducc.properties is in millis. The code below expects secs. |
| waitTime = (waitTime / 1000); |
| } catch (Exception e) { |
| } |
| // Version 2.10.2 of UIMA-AS is not supporting quiesce and stop |
| // so we need to implement wait than kill -9 strategy. |
| waitForChildProcessesToTerminateAndKill(wait, waitTime, true, quiesce); |
| logger.info("stop", null, |
| ">>>>>>>>>>>> stop() waitForChildProcessesToTerminateAndKill() completed"); |
| // wait for JD processes to terminate. Return only when all non-preemptables |
| // terminate. |
| waitForChildProcessesToTerminate(false); |
| logger.info("stop", null, |
| ">>>>>>>>>>>> stop() waitForChildProcessesToTerminate() completed"); |
| } else { |
| logger.info("stop", null, "Agent stopping managed processes with reaper delay of " |
| + waitTimeInSecs + " secs"); |
| |
| // wait for 60 secs and sends SIGKILL to any process still standing |
| waitForChildProcessesToTerminateAndKill(wait, waitTimeInSecs, false, quiesce); |
| } |
| // Send an empty process map as the final inventory |
| DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory, getLastORSequence(), |
| getIdentity()); |
| ORDispatcher.dispatch(duccEvent); |
| logger.info("stop", null, "Agent published final inventory"); |
| |
| logger.info("stop", null, "Stopping Publishing Metrics and Inventory"); |
| |
| configurationFactory.stop(); |
| logger.info("stop", null, "Reaper thread finished - calling super.stop()"); |
| super.stop(); |
| } |
| |
| } |
| |
| @Override |
| public void stop() throws Exception { |
| // not quiesce, 60=default timeout before kill -9 is used |
| stop(false, 60); |
| } |
| |
| private void waitForChildProcessesToTerminateAndKill(boolean wait, long waitTimeInSecs, |
| boolean killJustUimaAs, boolean quiesce) throws Exception { |
| if (wait && !deployedProcesses.isEmpty()) { |
| logger.info("waitForChildProcessesToTerminateAndKill", null, |
| "Agent Sent SIGTERM to ALL Non-Preemptable Child Processes - Number of Deployed Processes:" |
| + deployedProcesses.size()); |
| |
| Timer timer = new Timer(true); |
| logger.info("waitForChildProcessesToTerminateAndKill", null, "Waiting", waitTimeInSecs, |
| " secs before sending kill -9 to all ***non-preemptable*** child processes still running"); |
| CountDownLatch completionLatch = new CountDownLatch(1); |
| // start a timer task which when triggered kills processes via kill -9 |
| timer.schedule(new KillTimerTask(completionLatch, killJustUimaAs, quiesce), |
| waitTimeInSecs * 1000); |
| |
| // block this thread until killer task finishes its work |
| completionLatch.await(); |
| |
| } |
| stopLock.wait(1000); |
| logger.info("waitForChildProcessesToTerminateAndKill", null, "Done"); |
| |
| } |
| |
| private void waitForChildProcessesToTerminate(boolean quiesceMode) throws Exception { |
| logger.info("waitForChildProcessesToTerminate", null, |
| "Agent Sent SIGTERM to Child Processes - Waiting for them to Quiesce - Number of Deployed Processes:" |
| + deployedProcesses.size()); |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess p = it.next(); |
| // dont wait for JDs to stop. In quiesce mode we keep them |
| // running until all JPs terminate and only than we stop them |
| // if ( quiesceMode && ( p.isJd() || p.isUimaAs() ) ) { |
| // in quiesce mode, skip UIMA-AS processes since currently |
| // there is no support for quiesce there. So only wait for |
| // POPs (JD), UIMA based JPs, and Services |
| if (quiesceMode && p.isUimaAs()) { |
| continue; |
| } |
| // block waiting for process to terminate |
| p.getFuture().get(); |
| |
| } |
| } |
| |
| public Future<?> getDeployedJPFuture(IDuccId duccId) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| // for (ManagedProcess deployedProcess : deployedProcesses) { |
| ManagedProcess deployedProcess = it.next(); |
| // ignore duplicate start request for the same process |
| if (deployedProcess.getDuccId().equals(duccId)) { |
| return deployedProcess.getFuture(); |
| } |
| } |
| return null; |
| } |
| |
| public ManagedProcess getDeployedProcess(IDuccId duccId) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess deployedProcess = it.next(); |
| // ignore duplicate start request for the same process |
| if (deployedProcess.getDuccId().equals(duccId)) { |
| return deployedProcess; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Copies reservations sent by the PM. It copies reservations associated with this node. |
| * |
| * @param reserves |
| * - list of ALL reservations |
| * @throws Exception |
| */ |
| public void setReservations(List<DuccUserReservation> reserves) throws Exception { |
| try { |
| reservationsSemaphore.acquire(); |
| if (reserves != null) { |
| // clear old entries |
| reservations.clear(); |
| // Only copy reservations for this node |
| IDuccReservationMap reserveMap = new DuccReservationMap(); |
| for (DuccUserReservation r : reserves) { |
| reserveMap.clear(); |
| for (Map.Entry<DuccId, IDuccReservation> entry : r.getUserReservations().entrySet()) { |
| if (Utils.isThisNode(getIdentity().getIp(), |
| entry.getValue().getNodeIdentity().getIp())) { |
| reserveMap.addReservation(entry.getValue()); |
| } |
| } |
| if (reserveMap.getMap().size() > 0) { |
| DuccUserReservation reserve = new DuccUserReservation(r.getUserId(), r.getReserveID(), |
| reserveMap); |
| reservations.add(reserve); |
| } |
| } |
| } |
| |
| // this.reservations = reservations; |
| logger.debug("setReservations", null, |
| "+++++++++++ Copied User Reservations - List Size:" + reservations.size()); |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| } |
| |
| public List<DuccId> getUserReservations(String uid) { |
| List<DuccId> reservationIds = new ArrayList<DuccId>(); |
| try { |
| reservationsSemaphore.acquire(); |
| if (reservations != null) { |
| for (DuccUserReservation r : reservations) { |
| if (r.getUserId().equals(uid)) { |
| for (Map.Entry<DuccId, IDuccReservation> entry : r.getUserReservations().entrySet()) { |
| reservationIds.add(entry.getValue().getDuccId()); |
| } |
| break; |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| return reservationIds; |
| } |
| |
| public void copyAllUserReservations(TreeMap<String, NodeUsersInfo> map) { |
| try { |
| reservationsSemaphore.acquire(); |
| if (reservations != null) { |
| logger.debug("copyAllUserReservations", null, |
| "+++++++++++ Copying User Reservations - List Size:" + reservations.size()); |
| for (DuccUserReservation r : reservations) { |
| if ("System".equals(r.getUserId())) { |
| continue; |
| } |
| NodeUsersInfo nui = null; |
| if (map.containsKey(r.getUserId())) { |
| nui = map.get(r.getUserId()); |
| } else { |
| nui = new NodeUsersInfo(r.getUserId()); |
| map.put(r.getUserId(), nui); |
| } |
| nui.addReservation(r.getReserveID()); |
| } |
| } else { |
| logger.debug("copyAllUserReservations", null, " *********** No Reservations"); |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| |
| } |
| |
| public boolean userHasReservation(String uid) throws Exception { |
| try { |
| reservationsSemaphore.acquire(); |
| |
| for (DuccUserReservation r : reservations) { |
| if (r.getUserId().equals(uid)) { |
| return true; |
| } |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| return false; |
| } |
| |
| public Object deepCopy(Object original) throws Exception { |
| ObjectInputStream ois = null; |
| ObjectOutputStream oos; |
| ByteArrayInputStream bis; |
| ByteArrayOutputStream bos; |
| Object copy; |
| try { |
| // serialize object to bytes |
| bos = new ByteArrayOutputStream(); |
| oos = new ObjectOutputStream(bos); |
| oos.writeObject(original); |
| oos.close(); |
| |
| // construct an object from the bytes |
| bis = new ByteArrayInputStream(bos.toByteArray()); |
| ois = new ObjectInputStream(bis); |
| copy = ois.readObject(); |
| return copy; |
| } catch (Exception e) { |
| throw e; |
| } finally { |
| if (ois != null) { |
| ois.close(); |
| } |
| } |
| } |
| |
| public RogueProcessReaper getRogueProcessReaper() { |
| return rogueProcessReaper; |
| } |
| |
| /** |
| * Called when an Agent receives self dispatched Ping message. |
| */ |
| // public void ping(AgentPingEvent agentPing) { |
| // nodeMonitor.nodeArrives(agentPing.getNode()); |
| // } |
| /* |
| * public boolean excludeUser(String userId ) { if ( configurationFactory.userExclusionList != |
| * null ) { // exclusion list contains comma separated user ids String[] excludedUsers = |
| * configurationFactory.userExclusionList.split(","); for ( String excludedUser : excludedUsers ) |
| * { if ( excludedUser.equals(userId)) { return true; } } } return false; } public boolean |
| * excludeProcess(String process ) { if ( configurationFactory.processExclusionList != null ) { // |
| * exclusion list contains comma separated user ids String[] excludedProcesses = |
| * configurationFactory.processExclusionList.split(","); for ( String excludedProcess : |
| * excludedProcesses ) { if ( excludedProcess.equals(process)) { return true; } } } return false; |
| * } |
| */ |
| public static void main(String[] args) { |
| try { |
| NodeIdentity node = new NodeIdentity(InetAddress.getLocalHost().getHostAddress(), |
| InetAddress.getLocalHost().getHostName()); |
| NodeAgent agent = new NodeAgent(node); |
| |
| List<DuccUserReservation> reserves = new ArrayList<DuccUserReservation>(); |
| |
| IDuccReservationMap reserveMap = new DuccReservationMap(); |
| IDuccReservationMap reserveMap2 = new DuccReservationMap(); |
| |
| NodeIdentity ni1 = node; |
| // new NodeIdentity(, name); |
| NodeIdentity ni2 = new NodeIdentity("111.111.111.111", "node100"); |
| NodeIdentity ni3 = node; |
| NodeIdentity ni4 = new NodeIdentity("222.222.222.222", "node102"); |
| |
| DuccId id1 = new DuccId(100); |
| DuccId id2 = new DuccId(101); |
| DuccId id3 = new DuccId(102); |
| DuccId id4 = new DuccId(103); |
| |
| IDuccReservation reservation1 = new DuccReservation(id1, ni1, 1); |
| reserveMap.addReservation(reservation1); |
| IDuccReservation reservation2 = new DuccReservation(id2, ni2, 1); |
| reserveMap.addReservation(reservation2); |
| IDuccReservation reservation4 = new DuccReservation(id4, ni4, 1); |
| reserveMap.addReservation(reservation4); |
| |
| IDuccReservation reservation3 = new DuccReservation(id3, ni3, 1); |
| reserveMap2.addReservation(reservation3); |
| |
| DuccUserReservation reserve = new DuccUserReservation("joe", new DuccId(500), reserveMap); |
| DuccUserReservation reserve2 = new DuccUserReservation("jane", new DuccId(500), reserveMap2); |
| reserves.add(reserve); |
| reserves.add(reserve2); |
| |
| agent.setReservations(reserves); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| private class NodeExclusionParser { |
| private boolean excludeNodeFromCGroups = false; |
| |
| private boolean excludeAP = false; |
| |
| public void parse(String exclFile) throws Exception { |
| // <node>=cgroup,ap |
| File exclusionFile = new File(exclFile); |
| if (!exclusionFile.exists()) { |
| return; |
| } |
| BufferedReader br = new BufferedReader(new FileReader(exclusionFile)); |
| String line; |
| NodeIdentity node = getIdentity(); |
| String nodeName = node.getShortName(); |
| |
| while ((line = br.readLine()) != null) { |
| if (line.startsWith(nodeName)) { |
| String exclusions = line.substring(line.indexOf("=") + 1); |
| String[] parsedExclusions = exclusions.split(","); |
| for (String exclusion : parsedExclusions) { |
| |
| if (exclusion.trim().equals("cgroup")) { |
| excludeNodeFromCGroups = true; |
| |
| } else if (exclusion.trim().equals("ap")) { |
| excludeAP = true; |
| |
| } |
| } |
| break; |
| } |
| } |
| br.close(); |
| } |
| |
| public boolean apExcluded() { |
| return excludeAP; |
| } |
| |
| public boolean cgroupsExcluded() { |
| return excludeNodeFromCGroups; |
| } |
| } |
| |
| public DuccLogger getLogger() { |
| return logger; |
| } |
| |
| private void handleQuiesceAndStopEvent(DuccAdminEventQuiesceAndStop event) { |
| logger.info("handleQuiesceAndStopEvent", null, "... Agent Received an Admin Request to Stop"); |
| try { |
| stop(true, -1); |
| } catch (Exception e) { |
| logger.info("handleQuiesceAndStopEvent", null, e); |
| } |
| |
| } |
| |
| private void handleStopEvent(DuccAdminEventStop event) { |
| logger.info("handleStopEvent", null, "... Agent Received an Admin Request to Stop"); |
| try { |
| |
| stop(false, event.getTimeout()); |
| } catch (Exception e) { |
| logger.info("handleStopEvent", null, e); |
| } |
| |
| } |
| |
| private void handleStopPublishingEvent(DuccAdminEventStopMetrics event) { |
| if (isThisTargetNode(getTargetNodes(event.getTargets()))) { |
| logger.info("handleStopPublishingEvent", null, |
| "... Agent Received an Admin Request to Stop Metrics Collection and Publishing"); |
| // Stop Camel route responsible for driving collection and publishing of metrics |
| configurationFactory.stopMetricsRoute(); |
| logger.info("handleStopPublishingEvent", null, |
| "... Agent Stopped Metrics Collection and Publishing"); |
| } |
| |
| } |
| |
| private String[] getTargetNodes(String targets) { |
| logger.info("getTargetNodes", null, " Targets for Admin Command:" + targets); |
| |
| return targets.split(","); |
| } |
| |
| private boolean isThisTargetNode(String[] nodes) { |
| for (String targetNode : nodes) { |
| if (Utils.isMachineNameMatch(targetNode.trim(), getIdentity().getCanonicalName())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private boolean isTarget(String[] targets) { |
| for (String target : targets) { |
| String[] targetParts = target.trim().split("@"); |
| logger.info("isTarget", null, |
| " Targets for Admin Command:" + target + " This agent canonical identity:" |
| + getIdentity().getCanonicalName() + " short name:" |
| + getIdentity().getShortName()); |
| if ("agent".equals(targetParts[0])) { |
| if (Utils.isMachineNameMatch(targetParts[1].trim(), getIdentity().getShortName())) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void handleAdminEvent(DuccAdminEvent event) throws Exception { |
| |
| Thread t = new Thread() { |
| public void run() { |
| if (event instanceof DuccAdminEventStop) { |
| if (isTarget(getTargetNodes(((DuccAdminEventStop) event).getTargets()))) { |
| handleStopEvent((DuccAdminEventStop) event); |
| } |
| } else if (event instanceof DuccAdminEventQuiesceAndStop) { |
| if (isTarget(getTargetNodes(((DuccAdminEventQuiesceAndStop) event).getTargets()))) { |
| logger.info("handleAdminEvent", null, "Node a target for quiesce"); |
| handleQuiesceAndStopEvent((DuccAdminEventQuiesceAndStop) event); |
| } |
| } else if (event instanceof DuccAdminEventStopMetrics) { |
| handleStopPublishingEvent((DuccAdminEventStopMetrics) event); |
| } else { |
| logger.info("handleAdminEvent", null, |
| "... Agent Received Unexpected Message of Type:" + event.getClass().getName()); |
| } |
| } |
| }; |
| t.start(); |
| |
| /* |
| * if (event instanceof DuccAdminEventStopMetrics) { // Get target machines from the message |
| * String[] nodes = ((DuccAdminEventStopMetrics) event).getTargetNodes().split(","); // Check if |
| * this message applies to this node for (String targetNode : nodes) { if |
| * (Utils.isMachineNameMatch(targetNode.trim(), getIdentity().getCanonicalName())) { |
| * logger.info("handleAdminEvent", null, |
| * "... Agent Received an Admin Request to Stop Metrics Collection and Publishing"); // Stop |
| * Camel route responsible for driving collection and publishing of metrics |
| * configurationFactory.stopMetricsRoute(); logger.info("handleAdminEvent", null, |
| * "... Agent Stopped Metrics Collection and Publishing"); break; } else { |
| * logger.info("handleAdminEvent", null, "... Agent Not Target For Message:" + |
| * event.getClass().getName()); } } } else { logger.info("handleAdminEvent", null, |
| * "... Agent Received Unexpected Message of Type:" + event.getClass().getName()); |
| * |
| * } |
| */ |
| } |
| |
| private class KillTimerTask extends TimerTask { |
| |
| private CountDownLatch completionLatch; |
| |
| private boolean killJustUimaAs; |
| |
| private boolean quiesce; |
| |
| public KillTimerTask(CountDownLatch completionLatch, boolean killOnlyUimaAs, boolean quiesce) { |
| this.completionLatch = completionLatch; |
| this.killJustUimaAs = killOnlyUimaAs; |
| this.quiesce = quiesce; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| // send kill -9 to any child process still running |
| killChildProcesses(killJustUimaAs, quiesce); |
| } finally { |
| completionLatch.countDown(); |
| } |
| } |
| |
| } |
| } |