| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.agent; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileReader; |
| import java.io.InputStreamReader; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Route; |
| import org.apache.camel.builder.RouteBuilder; |
| import org.apache.commons.lang.SerializationUtils; |
| import org.apache.uima.ducc.agent.config.AgentConfiguration; |
| import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver; |
| import org.apache.uima.ducc.agent.launcher.CGroupsManager; |
| import org.apache.uima.ducc.agent.launcher.Launcher; |
| import org.apache.uima.ducc.agent.launcher.ManagedProcess; |
| import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector; |
| import org.apache.uima.ducc.common.Node; |
| import org.apache.uima.ducc.common.NodeIdentity; |
| import org.apache.uima.ducc.common.admin.event.DuccAdminEvent; |
| import org.apache.uima.ducc.common.admin.event.DuccAdminEventStopMetrics; |
| import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties; |
| import org.apache.uima.ducc.common.component.AbstractDuccComponent; |
| import org.apache.uima.ducc.common.main.DuccService; |
| import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.TimeStamp; |
| import org.apache.uima.ducc.common.utils.Utils; |
| import org.apache.uima.ducc.common.utils.id.DuccId; |
| import org.apache.uima.ducc.common.utils.id.IDuccId; |
| import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent; |
| import org.apache.uima.ducc.transport.agent.ProcessStateUpdate; |
| import org.apache.uima.ducc.transport.cmdline.ICommandLine; |
| import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine; |
| import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher; |
| import org.apache.uima.ducc.transport.event.DuccEvent; |
| import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent; |
| import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent; |
| import org.apache.uima.ducc.transport.event.common.DuccProcess; |
| import org.apache.uima.ducc.transport.event.common.DuccReservation; |
| import org.apache.uima.ducc.transport.event.common.DuccReservationMap; |
| import org.apache.uima.ducc.transport.event.common.DuccUserReservation; |
| import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservation; |
| import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; |
| import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo; |
| import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; |
| import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState; |
| import org.apache.uima.ducc.transport.event.common.ITimeWindow; |
| import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment; |
| import org.apache.uima.ducc.transport.event.common.TimeWindow; |
| |
| public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver { |
| public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME); |
| |
| public static final String ProcessStateUpdatePort = "ducc.agent.process.state.update.port"; |
| public static int SIGKILL=9; |
| public static int SIGTERM=15; |
| |
| //for LinuxNodeMetrics logging |
| public static AtomicLong logCounter = new AtomicLong(); |
| public static String cgroupFailureReason; |
| // Map of known processes this agent is managing. This map is published |
| // at regular intervals as part of agent's inventory update. |
| private Map<DuccId, IDuccProcess> inventory = new HashMap<DuccId, IDuccProcess>(); |
| |
| // Semaphore controlling access to inventory Map |
| private Semaphore inventorySemaphore = new Semaphore(1); |
| |
| List<ManagedProcess> deployedProcesses = new ArrayList<ManagedProcess>(); |
| |
| // This agent's identity ( host name and IP address) |
| private NodeIdentity nodeIdentity; |
| |
| // Component to launch processes |
| private Launcher launcher; |
| |
| // Reference to the Agent's configuration factory component. This is where |
| // Agent |
| // dependencies are instantiated and injected via Agent's c'tor. |
| public AgentConfiguration configurationFactory; |
| |
| private static Semaphore agentLock = new Semaphore(1); |
| |
| private DuccEventDispatcher commonProcessDispatcher; |
| |
| private DuccEventDispatcher inventoryDispatcher; |
| |
| private Object monitor = new Object(); |
| |
| boolean duccLingExists = false; |
| |
| boolean runWithDuccLing = false; |
| |
| private List<DuccUserReservation> reservations = new ArrayList<DuccUserReservation>(); |
| |
| private Semaphore reservationsSemaphore = new Semaphore(1); |
| |
| // private AgentMonitor nodeMonitor; |
| |
| private volatile boolean stopping = false; |
| |
| private RogueProcessReaper rogueProcessReaper = new RogueProcessReaper(logger, 5, 10); |
| |
| public volatile boolean useCgroups = false; |
| |
| public CGroupsManager cgroupsManager = null; |
| |
| public Node node = null; |
| |
| public volatile boolean excludeAPs = false; |
| |
| public int shareQuantum; |
| |
| public boolean virtualAgent = false; |
| |
| public boolean pageSizeFetched = false; |
| |
| public int pageSize = 4096; // default |
| |
| public int cpuClockRate = 100; |
| |
| public int numProcessors=0; |
| |
| // indicates whether or not this agent received at least one publication |
| // from the PM. This flag is used to determine if the agent should use |
| // rogue process detector. The detector will be used if this flag is true. |
| public volatile boolean receivedDuccState = false; |
| /** |
| * Ctor used exclusively for black-box testing of this class. |
| */ |
| public NodeAgent() { |
| super(COMPONENT_NAME, null); |
| } |
| |
| public NodeAgent(NodeIdentity ni) { |
| this(); |
| this.nodeIdentity = ni; |
| Utils.findDuccHome(); // add DUCC_HOME to System.properties |
| } |
| |
| /** |
| * C'tor for dependecy injection |
| * |
| * @param nodeIdentity |
| * - this Agent's identity |
| * @param launcher |
| * - component to launch processes |
| * @param context |
| * - camel context |
| */ |
| public NodeAgent(NodeIdentity nodeIdentity, Launcher launcher, CamelContext context, |
| AgentConfiguration factory) throws Exception { |
| super(COMPONENT_NAME, context); |
| |
| Utils.findDuccHome(); // add DUCC_HOME to System.properties |
| |
| // Running a real agent |
| virtualAgent = System.getProperty("ducc.agent.virtual") == null ? false : true; |
| |
| this.nodeIdentity = nodeIdentity; |
| this.launcher = launcher; |
| this.configurationFactory = factory; |
| this.commonProcessDispatcher = factory.getCommonProcessDispatcher(context); |
| this.inventoryDispatcher = factory.getORDispatcher(context); |
| |
| // fetch Page Size from the OS and cache it |
| pageSize = getOSPageSize(); |
| |
| // begin publishing node metrics |
| factory.startNodeMetrics(this); |
| |
| numProcessors = getNodeProcessors(); |
| |
| logger.info("NodeAgent", null, "OS Page Size:" + pageSize); |
| |
| cpuClockRate = getOSClockRate(); |
| logger.info("NodeAgent", null, "OS Clock Rate:" + cpuClockRate); |
| |
| if (System.getProperty("ducc.rm.share.quantum") != null |
| && System.getProperty("ducc.rm.share.quantum").trim().length() > 0) { |
| shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim()); |
| } |
| /* Enable CGROUPS */ |
| String cgroups; |
| String cgUtilsPath=null; |
| boolean excludeNodeFromCGroups = false; |
| if (!virtualAgent |
| && (cgroups = System.getProperty("ducc.agent.launcher.cgroups.enable")) != null) { |
| if (cgroups.equalsIgnoreCase("true")) { |
| logger.info("nodeAgent", null,"ducc.properties [ducc.agent.launcher.cgroups.enable=true]"); |
| // Load exclusion file. Some nodes may be excluded from cgroups |
| String exclusionFile; |
| |
| // get the name of the exclusion file from ducc.properties |
| if ((exclusionFile = System.getProperty("ducc.agent.exclusion.file")) != null) { |
| logger.info("nodeAgent",null, "Ducc configured with cgroup node exclusion file - ducc.properties [ducc.agent.exclusion.file="+exclusionFile+"]"); |
| // Parse node exclusion file and determine if cgroups and AP |
| // deployment |
| // is allowed on this node |
| NodeExclusionParser exclusionParser = new NodeExclusionParser(); |
| exclusionParser.parse(exclusionFile); |
| excludeNodeFromCGroups = exclusionParser.cgroupsExcluded(); |
| excludeAPs = exclusionParser.apExcluded(); |
| if (excludeNodeFromCGroups) { |
| logger.info("nodeAgent", null, |
| "------- Node Explicitly Excluded From Using CGroups. Check File:" |
| + exclusionFile); |
| cgroupFailureReason = "------- Node Explicitly Excluded From Using CGroups. Check File:" |
| + exclusionFile; |
| } |
| System.out.println("excludeNodeFromCGroups=" + excludeNodeFromCGroups + " excludeAPs=" |
| + excludeAPs); |
| } else { |
| logger.info("nodeAgent",null,"Agent node *not* excluded from using cgroups"); |
| } |
| // node not in the exclusion list for cgroups |
| if (!excludeNodeFromCGroups) { |
| // fetch a list of paths the agent will search to find cgroups utils |
| // like cgexec. The default location is /usr/bin |
| logger.info("nodeAgent",null,"Testing cgroups to check if runtime utilities (cgexec) exist in expected locations in the filesystem"); |
| String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir"); |
| if (cgroupsUtilsDirs == null) { |
| cgUtilsPath = "/usr/bin"; // default |
| } else { |
| String[] paths = cgroupsUtilsDirs.split(","); |
| for( String path : paths ) { |
| File file = new File(path.trim()+"/cgexec"); |
| if ( file.exists() ) { |
| cgUtilsPath = path; |
| break; |
| } |
| } |
| } |
| String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts"); |
| |
| if ( cgUtilsPath == null ) { |
| useCgroups = false; |
| logger.info("nodeAgent", null, "------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties"); |
| } else if ( cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) { |
| useCgroups = false; |
| logger.info("nodeAgent", null, "------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts"); |
| |
| } else { |
| logger.info("nodeAgent",null,"Agent found cgroups runtime in "+cgUtilsPath+" cgroups base dir="+cgroupsBaseDir); |
| // get the top level cgroup folder from ducc.properties. If |
| // not defined, use /cgroup/ducc as default |
| //String cgroupsBaseDir = System.getProperty("ducc.agent.launcher.cgroups.basedir"); |
| // if (cgroupsBaseDir == null) { |
| // cgroupsBaseDir = "/cgroup/ducc"; |
| // } |
| // get the cgroup subsystems. If not defined, default to the |
| // memory and cpu subsystem |
| String cgroupsSubsystems = System.getProperty("ducc.agent.launcher.cgroups.subsystems"); |
| if (cgroupsSubsystems == null) { |
| cgroupsSubsystems = "memory,cpu"; |
| } |
| long maxTimeToWaitForProcessToStop = 60000; // default 1 minute |
| if (configurationFactory.processStopTimeout != null) { |
| maxTimeToWaitForProcessToStop = Long |
| .valueOf(configurationFactory.processStopTimeout); |
| } |
| |
| cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems, logger, maxTimeToWaitForProcessToStop); |
| cgroupsManager.configure(this); |
| // check if cgroups base directory exists in the filesystem |
| // which means that cgroups |
| // and cgroups convenience package are installed and the |
| // daemon is up and running. |
| if (cgroupsManager.cgroupExists(cgroupsBaseDir)) { |
| logger.info("nodeAgent",null,"Agent found cgroup base directory in "+cgroupsBaseDir); |
| try { |
| String containerId = "test"; |
| String uid = "ducc"; |
| // validate cgroups by creating a dummy cgroup. The code checks if cgroup actually got created by |
| // verifying existence of test cgroup file. The second step in verification is to check if |
| // CPU control is working. Configured in cgconfig.conf, the CPU control allows for setting |
| // cpu.shares. The code will attempt to set the shares and subsequently tries to read the |
| // value from cpu.shares file to make sure the values match. Any exception in the above steps |
| // will cause cgroups to be disabled. |
| // |
| cgroupsManager.validator(cgroupsBaseDir, containerId, System.getProperty("user.name"),false) |
| .cgcreate() |
| .cgset(100); // write cpu.shares=100 and validate |
| |
| // cleanup dummy cgroup |
| cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"), SIGKILL); |
| useCgroups = true; |
| } catch( CGroupsManager.CGroupsException ee) { |
| logger.info("nodeAgent", null, ee); |
| cgroupFailureReason = ee.getMessage(); |
| useCgroups = false; |
| } |
| |
| try { |
| // remove stale CGroups |
| cgroupsManager.cleanupOnStartup(); |
| } catch (Exception e) { |
| logger.error("nodeAgent", null, e); |
| |
| } |
| |
| } else { |
| logger.info("nodeAgent",null,"Agent failed to find cgroup base directory in "+cgroupsBaseDir+". Check if cgroups is installed on this node and the cgroup daemon is running"); |
| //logger.info("nodeAgent", null, "------- CGroups Not Installed on this Machine"); |
| cgroupFailureReason = "------- CGroups Not Installed on this Machine"; |
| } |
| } |
| } |
| } |
| } else { |
| logger.info("nodeAgent", null, "------- CGroups Not Enabled on this Machine"); |
| cgroupFailureReason = "------- CGroups Not Enabled on this Machine - check ducc.properties: ducc.agent.launcher.cgroups.enable "; |
| } |
| logger.info("nodeAgent", null, "CGroup Support=" + useCgroups + " excludeNodeFromCGroups=" |
| + excludeNodeFromCGroups + " excludeAPs=" + excludeAPs+" CGroups utils Dir:"+cgUtilsPath); |
| |
| String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn"); |
| if (useSpawn != null && useSpawn.toLowerCase().equals("true")) { |
| runWithDuccLing = true; |
| String c_launcher_path = Utils.resolvePlaceholderIfExists( |
| System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties()); |
| try { |
| File duccLing = new File(c_launcher_path); |
| if (duccLing.exists()) { |
| duccLingExists = true; |
| } |
| } catch (Exception e) { |
| logger.info("nodeAgent", null, |
| "------- Agent failed while checking for existence of ducc_ling", e); |
| } |
| |
| } |
| } |
| |
| private String fetchCgroupsBaseDir(String mounts) { |
| String cbaseDir=null; |
| BufferedReader br = null; |
| try { |
| FileInputStream fis = new FileInputStream(mounts); |
| //Construct BufferedReader from InputStreamReader |
| br = new BufferedReader(new InputStreamReader(fis)); |
| |
| String line = null; |
| while ((line = br.readLine()) != null) { |
| System.out.println(line); |
| if ( line.trim().startsWith("cgroup") ) { |
| String[] cgroupsInfo = line.split(" "); |
| if ( cgroupsInfo[1].trim().equals("/cgroup") ) { |
| cbaseDir = cgroupsInfo[1].trim(); |
| break; |
| } else if ( cgroupsInfo[1].trim().endsWith("/memory") ) { |
| // return the mount point minus the memory part |
| cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory") ); |
| break; |
| } |
| } |
| } // while |
| |
| } catch( Exception e) { |
| logger.info("nodeAgent", null, |
| "------- Agent failed while checking for existence of ducc_ling", e); |
| } finally { |
| if ( br != null ) { |
| try { |
| br.close(); |
| } catch( Exception ex ) {} |
| } |
| } |
| return cbaseDir; |
| } |
| public int getNodeProcessors() { |
| return runOSCommand(new String[] { "/usr/bin/getconf", "_NPROCESSORS_ONLN" }); |
| } |
| public int getOSPageSize() { |
| return runOSCommand(new String[] { "/usr/bin/getconf", "PAGESIZE" }); |
| } |
| public int getOSClockRate() { |
| return runOSCommand(new String[] { "/usr/bin/getconf", "CLK_TCK" }); |
| } |
| private int runOSCommand(String[] cmd) { |
| InputStreamReader in = null; |
| BufferedReader reader = null; |
| int retVal = 0; |
| try { |
| ProcessBuilder pb = new ProcessBuilder(); |
| pb.command(cmd); |
| pb.redirectErrorStream(true); |
| Process p = pb.start(); |
| in = new InputStreamReader(p.getInputStream()); |
| reader = new BufferedReader(in); |
| String line = null; |
| |
| while ((line = reader.readLine()) != null) { |
| retVal = Integer.parseInt(line.trim()); |
| } |
| } catch (Exception e) { |
| logger.error("runOSCommand", null, e); |
| } finally { |
| if (reader != null) { |
| try { |
| reader.close(); |
| } catch (Exception ex) { |
| } |
| } |
| } |
| return retVal; |
| } |
| |
| public void setNodeInfo(Node node) { |
| this.node = node; |
| } |
| |
| public Node getNodeInfo() { |
| return node; |
| } |
| |
| public int getNodeTotalNumberOfShares() { |
| int shareQuantum = 0; |
| int shares = 1; |
| if (System.getProperty("ducc.rm.share.quantum") != null |
| && System.getProperty("ducc.rm.share.quantum").trim().length() > 0) { |
| shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim()); |
| shares = (int) getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() / shareQuantum; // get |
| // number |
| // of |
| // shares |
| if ((getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() % shareQuantum) > 0) |
| shares++; // ciel |
| } |
| |
| return shares; |
| } |
| |
| public void start(DuccService service) throws Exception { |
| super.start(service, null); |
| String name = nodeIdentity.getName(); |
| String ip = nodeIdentity.getIp(); |
| String jmxUrl = getProcessJmxUrl(); |
| DuccDaemonRuntimeProperties.getInstance().bootAgent(name, ip, jmxUrl); |
| } |
| |
| public DuccEventDispatcher getEventDispatcherForRemoteProcess() { |
| return commonProcessDispatcher; |
| } |
| |
| public boolean duccLingExists() { |
| return duccLingExists; |
| } |
| |
| public void duccLingExists(boolean duccLingExists) { |
| this.duccLingExists = duccLingExists; |
| } |
| |
| public boolean runWithDuccLing() { |
| return runWithDuccLing; |
| } |
| |
| public void runWithDuccLing(boolean runWithDuccLing) { |
| this.runWithDuccLing = runWithDuccLing; |
| } |
| |
| /** |
| * Returns deep copy (by way of java serialization) of the Agents inventory. |
| */ |
| @SuppressWarnings("unchecked") |
| public HashMap<DuccId, IDuccProcess> getInventoryCopy() { |
| Object deepCopy = null; |
| try { |
| inventorySemaphore.acquire(); |
| deepCopy = SerializationUtils.clone((HashMap<DuccId, IDuccProcess>) inventory); |
| } catch (InterruptedException e) { |
| } finally { |
| inventorySemaphore.release(); |
| } |
| return (HashMap<DuccId, IDuccProcess>) deepCopy; |
| } |
| |
| /** |
| * Returns shallow copy of the Agent's inventory |
| */ |
| public HashMap<DuccId, IDuccProcess> getInventoryRef() { |
| return (HashMap<DuccId, IDuccProcess>) inventory; |
| } |
| |
| /* |
| * Check if both the command and its args are missing, |
| * since the command defaults to the DUCC JVM. |
| */ |
| private boolean invalidCommand(ICommandLine commandLine) { |
| if (commandLine != null) { |
| if (commandLine.getExecutable() != null && commandLine.getExecutable().length() > 0) return false; |
| if (commandLine.getCommandLine() != null && commandLine.getCommandLine().length > 0) return false; |
| } |
| return true; |
| } |
| |
| private boolean isProcessDeallocated(IDuccProcess process) { |
| return (process.getProcessState().equals(ProcessState.Undefined) && process.isDeallocated()); |
| } |
| |
| /** |
| * Stops any process that is in agent's inventory but not in provided job list sent by the PM. |
| * |
| * @param lifecycleController |
| * - instance implementing stopProcess() method |
| * @param jobDeploymentList |
| * - all DUCC jobs sent by PM |
| */ |
| public void takeDownProcessWithNoJob(ProcessLifecycleController lifecycleController, |
| List<IDuccJobDeployment> jobDeploymentList) { |
| String methodName = "takeDownProcessWithNoJob"; |
| try { |
| inventorySemaphore.acquire(); |
| List<IDuccProcess> purgeList = new ArrayList<IDuccProcess>(); |
| boolean hasAjob = false; |
| // Check if every process in agent's inventory is associated with a |
| // job in a given |
| // jobDeploymentList |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // if a job list is empty, take down all agent processes that |
| // are in the inventory |
| if (jobDeploymentList.isEmpty()) { |
| logger.info(methodName, null, "...Agent Process:" + processEntry.getValue().getDuccId() |
| + " Not in JobDeploymentList. Ducc Currently Has No Jobs Running"); |
| hasAjob = false; |
| } else { |
| // iterate over all jobs |
| for (IDuccJobDeployment job : jobDeploymentList) { |
| // check if current process is a JD |
| if (job.getJdProcess() != null) { |
| if (processEntry.getValue().getDuccId().equals(job.getJdProcess().getDuccId())) { |
| hasAjob = true; |
| break; |
| } |
| } |
| // check if current process is a JP |
| for (IDuccProcess jProcess : job.getJpProcessList()) { |
| if (processEntry.getValue().getDuccId().equals(jProcess.getDuccId())) { |
| hasAjob = true; |
| break; |
| } |
| } |
| if (hasAjob) { |
| break; |
| } |
| } |
| } |
| if (!hasAjob) { |
| // if a process in agent inventory has no job and is still |
| // alive, stop it |
| if (isAlive(processEntry.getValue())) { |
| logger.error(methodName, null, |
| "<<<<<<<<< Stopping Process with no Job Assignement (Ghost Process) - DuccId:" |
| + processEntry.getValue().getDuccId() + " PID:" |
| + processEntry.getValue().getPID()); |
| processEntry.getValue().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.JPHasNoActiveJob.toString()); |
| lifecycleController.stopProcess(processEntry.getValue()); |
| } else { |
| // add process to purge list |
| purgeList.add(processEntry.getValue()); |
| } |
| } else { |
| hasAjob = false; |
| } |
| } |
| for (IDuccProcess processToPurge : purgeList) { |
| logger.error(methodName, null, "XXXXXXXXXX Purging Process:" + processToPurge.getDuccId() |
| + " Process State:" + processToPurge.getProcessState() + " Process Resource State:" |
| + processToPurge.getResourceState()); |
| getInventoryRef().remove(processToPurge.getDuccId()); |
| } |
| } catch (Exception e) { |
| |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| /** |
| * Reconciles agent inventory with job processes sent by PM |
| * |
| * @param lifecycleController |
| * - instance implementing stopProcess and startProcess |
| * @param process |
| * - job process from a Job List |
| * @param commandLine |
| * - in case this process is not in agents inventory we need cmd line to start it |
| * @param info |
| * - DUCC common info including user log dir, user name, etc |
| * @param workDuccId |
| * - job id |
| */ |
| public void reconcileProcessStateAndTakeAction(ProcessLifecycleController lifecycleController, |
| IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, |
| ProcessMemoryAssignment processMemoryAssignment, DuccId workDuccId) { |
| String methodName = "reconcileProcessStateAndTakeAction"; |
| try { |
| inventorySemaphore.acquire(); |
| // Check if process exists in agent's inventory |
| if (getInventoryRef().containsKey(process.getDuccId())) { |
| IDuccProcess agentManagedProcess = getInventoryRef().get(process.getDuccId()); |
| // check if process is Running, Initializing, or Starting |
| if (isAlive(agentManagedProcess)) { |
| // Stop the process if it has been deallocated |
| if (process.isDeallocated() ) { |
| agentManagedProcess.setResourceState(ResourceState.Deallocated); |
| logger.info( |
| methodName, |
| workDuccId, |
| "<<<<<<<< Agent Stopping Process:" + process.getDuccId() + " PID:" |
| + process.getPID() + " Reason: Ducc Deallocated the Process."); |
| lifecycleController.stopProcess(agentManagedProcess); |
| } |
| // else nothing to do. Process has been deallocated |
| } |
| } else { // Process not in agent's inventory |
| // Add this process to the inventory so that it gets published. |
| getInventoryRef().put(process.getDuccId(), process); |
| if (process.isDeallocated()) { |
| // process not in agent's inventory and it is marked as |
| // deallocated. This can happen when an agent is restarted |
| // while the rest of DUCC is running. |
| // markAsStopped(process); |
| process.setProcessState(ProcessState.Stopped); |
| } else if (process.getResourceState().equals(ResourceState.Allocated)) { |
| // check if OR thinks that this process is still running. If |
| // so, this agent was restarted while the OR was running and |
| // we need to mark the process as Failed. |
| if (process.getProcessState().equals(ProcessState.Initializing) |
| || process.getProcessState().equals(ProcessState.Running)) { |
| process.setProcessState(ProcessState.Failed); |
| } else { |
| // enforce presence of command line |
| if (invalidCommand(commandLine)) { |
| process.setProcessState(ProcessState.Failed); |
| logger.info(methodName, workDuccId, |
| "Rejecting Process Start Request. Command line not provided for Process ID:" |
| + process.getDuccId()); |
| } else { |
| process.setProcessState(ProcessState.Starting); |
| logger.info( |
| methodName, |
| workDuccId, |
| ">>>>>>> Agent Starting Process:" + process.getDuccId() + " Process State:" |
| + process.getProcessState() + " Process Resource State:" |
| + process.getResourceState()); |
| lifecycleController.startProcess(process, commandLine, info, workDuccId, |
| processMemoryAssignment); |
| } |
| } |
| } |
| } |
| } catch (Exception e) { |
| logger.error(methodName, workDuccId, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| public void doStartProcess(IDuccProcess process, ICommandLine commandLine, |
| IDuccStandardInfo info, DuccId workDuccId) { |
| String methodName = "doStartProcess"; |
| try { |
| inventorySemaphore.acquire(); |
| if (getInventoryRef().containsKey(process.getDuccId())) { |
| logger.error(methodName, null, "Rejecting Process Start Request. Process with a Ducc ID:" |
| + process.getDuccId() + " is already in agent's inventory."); |
| return; |
| } |
| startProcess(process, commandLine, info, workDuccId, new ProcessMemoryAssignment()); |
| } catch (InterruptedException e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| private boolean isProcessRunning(IDuccProcess process) { |
| if (process.getProcessState().equals(ProcessState.Running) |
| || process.getProcessState().equals(ProcessState.Initializing)) { |
| return true; |
| } |
| return false; |
| } |
| private boolean isOverSwapLimit(IDuccProcess process ) { |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| // Check if this process exceeds its alloted max swap usage |
| if ( deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId()) && |
| process.getSwapUsage() > deployedProcess.getMaxSwapThreshold() |
| ) { |
| return true; |
| } |
| } |
| return false; |
| } |
| private long getSwapOverLimit(IDuccProcess process) { |
| long overLimit = 0; |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| if ( deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId()) ) { |
| overLimit = deployedProcess.getMaxSwapThreshold() - process.getSwapUsage(); |
| } |
| } |
| if ( overLimit < 0 ) { |
| overLimit = 0; |
| } |
| return overLimit; |
| } |
| /** |
| * Called when swap space on a node reached minimum as defined by ducc.node.min.swap.threshold in |
| * ducc.properties. The agent will find the biggest (in terms of memory) process in its inventory |
| * and stop it. |
| */ |
| public void killProcessDueToLowSwapSpace(long minSwapThreshold) { |
| String methodName = "killProcessDueToLowSwapSpace"; |
| IDuccProcess biggestProcess = null; |
| try { |
| inventorySemaphore.acquire(); |
| // find the fattest process in terms of absolute use of swap over the process limit |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| if (isProcessRunning(processEntry.getValue()) |
| && isOverSwapLimit(processEntry.getValue()) |
| && (biggestProcess == null |
| || getSwapOverLimit(biggestProcess) < getSwapOverLimit(processEntry.getValue()))) { |
| biggestProcess = processEntry.getValue(); |
| } |
| } |
| } catch (InterruptedException e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| if (biggestProcess != null) { |
| biggestProcess.setReasonForStoppingProcess(ReasonForStoppingProcess.LowSwapSpace.toString()); |
| logger.info(methodName, null, "Stopping Process:" + biggestProcess.getDuccId() + " PID:" |
| + biggestProcess.getPID() |
| + " Due to a low swap space. Process' RSS exceeds configured swap threshold of " |
| + minSwapThreshold |
| + " Defined in ducc.properties. Check ducc.node.min.swap.threshold property"); |
| stopProcess(biggestProcess); |
| } |
| } |
| public void interruptThreadInWaitFor(String pid) throws Exception { |
| String methodName="interruptZombieProcess"; |
| synchronized (monitor) { |
| for (ManagedProcess dProcess : deployedProcesses) { |
| if ( dProcess.getPid() != null && dProcess.getPid().equals(pid) ) { |
| Future<?> future = dProcess.getFuture(); |
| if ( future != null && !future.isDone() && !future.isCancelled()) { |
| future.cancel(true); // interrupt the thread blocked on waitFor() |
| logger.info(methodName, dProcess.getDuccProcess().getDuccId(), "Interrupted Thread - Zombie Process with PID:"+dProcess.getPid()); |
| } |
| break; |
| } |
| } |
| } |
| } |
| /** |
| * Called when Agent receives request to start a process. |
| * |
| * @param process |
| * - IDuccProcess instance with identity (DuccId) |
| * @param commandLine |
| * - fully defined command line that will be used to exec the process. |
| * |
| */ |
| public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info, |
| DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment) { |
| String methodName = "startProcess"; |
| |
| try { |
| // Add process to the Agent's inventory before it is started |
| getInventoryRef().put(process.getDuccId(), process); |
| // enforce presence of command line |
| if (invalidCommand(commandLine)) { |
| process.setProcessState(ProcessState.Failed); |
| logger.info(methodName, null, |
| "Rejecting Process Start Request. Command line not provided for Process ID:" |
| + process.getDuccId()); |
| } else if (isProcessDeallocated(process)) { |
| process.setProcessState(ProcessState.Stopped); |
| logger.info(methodName, null, |
| "Rejecting Process Start Request. Process ID:" + process.getDuccId() |
| + " hava already been deallocated due to Shrink"); |
| } else { |
| deployProcess(process, commandLine, info, workDuccId, processMemoryAssignment); |
| } |
| |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| |
| } |
| |
| private boolean isAlive(IDuccProcess invProcess) { |
| return invProcess.getProcessState().equals(ProcessState.Initializing) |
| || invProcess.getProcessState().equals(ProcessState.Running) |
| || invProcess.getProcessState().equals(ProcessState.Stopping) |
| || invProcess.getProcessState().equals(ProcessState.Starting); |
| } |
| |
| public void doStopProcess(IDuccProcess process) { |
| String methodName = "stopProcess"; |
| try { |
| inventorySemaphore.acquire(); |
| stopProcess(process); |
| } catch (InterruptedException e) { |
| logger.error(methodName, null, e); |
| } finally { |
| inventorySemaphore.release(); |
| } |
| |
| } |
| |
| /** |
| * Called when Agent receives request to stop a process. |
| * |
| * @param process |
| * - IDuccProcess instance with identity (DuccId) |
| * |
| */ |
| public void stopProcess(IDuccProcess process) { |
| String methodName = "stopProcess"; |
| try { |
| IDuccProcess invProcess = null; |
| if ((invProcess = getInventoryRef().get(process.getDuccId())) != null && isAlive(invProcess)) { |
| logger.info(methodName, null, "Undeploing Process with PID:" + process.getPID()); |
| undeployProcess(process); |
| } else if (invProcess == null) { // process not in inventory |
| logger.info( |
| methodName, |
| null, |
| "Agent received Stop request for a process which is not in the Agent's inventory. " |
| + "It looks like this Agent was killed along with its child processes. Adding stale process to the inventory. PID:" |
| + process.getPID() + " DuccId:" + process.getDuccId() + ""); |
| // Received a request to stop a process that this is not in the |
| // current |
| // inventory. Most likely this agent was killed while its |
| // processes were |
| // still running. Add the process to the agent's inventory so |
| // that its |
| // included in the next published inventory. This is done so |
| // that the |
| // orchestrator can cleanup its state. |
| if (process.getProcessState() != ProcessState.Stopped |
| && process.getProcessState() != ProcessState.Failed |
| && process.getProcessState() != ProcessState.InitializationTimeout |
| && process.getProcessState() != ProcessState.FailedInitialization) { |
| // Force the Stopped state if not already stopped or failed |
| process.setProcessState(ProcessState.Stopped); |
| } |
| // Add stale process to the inventory. This will eventually be |
| // cleaned up |
| // when the PM sends purge request. |
| getInventoryRef().put(process.getDuccId(), process); |
| } |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| |
| /** |
| * Checks if process with a given PID has already registered memory collector Camel route. This |
| * route periodically fetches resident memory of a process with a given PID. Each process |
| * collector route is identified by process PID. |
| * |
| * @param pid |
| * - process PID also id of its route |
| * @return - true if memory collector route has already been created. False, otherwise |
| */ |
| private boolean addProcessMemoryCollector(String pid) { |
| // search all camel routes for one with a given id |
| for (Route route : super.getContext().getRoutes()) { |
| if (route.getId().equals(pid)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Remove given process from Agent's inventory |
| * |
| * @param process |
| * - process to purge from inventory |
| * @throws Exception |
| */ |
| public void purgeProcess(IDuccProcess process) throws Exception { |
| String methodName = "purgeProcess"; |
| DuccId key = null; |
| String pid = ""; |
| try { |
| inventorySemaphore.acquire(); |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // Check if process with a given unique DuccId exist in the |
| // local map |
| if (processEntry.getKey().equals(process.getDuccId())) { |
| key = processEntry.getKey(); |
| pid = processEntry.getValue().getPID(); |
| break; |
| } |
| } |
| if (key != null) { |
| getInventoryRef().remove(key); |
| logger.info(methodName, null, ">>>> Agent Purged Process with PID:" + pid); |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| inventorySemaphore.release(); |
| } |
| |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| // Find ManagedProcess instance the DuccProcess instance is |
| // associated with |
| if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) { |
| deployedProcesses.remove(deployedProcess); |
| break; |
| } |
| } |
| } |
| |
| private boolean changeState(ProcessState state) { |
| switch (state) { |
| case FailedInitialization: |
| case InitializationTimeout: |
| return false; |
| case Starting: |
| case Initializing: |
| case Running: |
| return true; |
| default: |
| break; |
| } |
| return false; |
| } |
| |
| /** |
| * Called when UIMA AS service wrapper sends an update when a process status changes. |
| * |
| * @param duccEvent |
| * - Ducc event object |
| * |
| * @throws Exception |
| */ |
| |
| public void updateProcessStatus(ProcessStateUpdateDuccEvent duccEvent) throws Exception { |
| String methodName = "updateProcessStatus"; |
| |
| try { |
| inventorySemaphore.acquire(); |
| for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) { |
| // Check if process with a given unique DuccId exist in the |
| // local map |
| if (processEntry.getKey().getUnique().equals(duccEvent.getDuccProcessId())) { |
| // found it. Update pid and state of the process |
| if (duccEvent.getPid() != null && processEntry.getValue().getPID() == null) { |
| processEntry.getValue().setPID(duccEvent.getPid()); |
| } |
| |
| if (duccEvent.getProcessJmxUrl() != null |
| && processEntry.getValue().getProcessJmxUrl() == null) { |
| processEntry.getValue().setProcessJmxUrl(duccEvent.getProcessJmxUrl()); |
| } |
| ITimeWindow tw = processEntry.getValue().getTimeWindowInit(); |
| if (tw.getEnd() == null ) { |
| if ( !duccEvent.getState().equals(ProcessState.Initializing)) { |
| // Mark the time the process ended initialization. It also |
| // covers a case when the process terminates while initializing |
| tw.setEnd(TimeStamp.getCurrentMillis()); |
| |
| if ( duccEvent.getState().equals(ProcessState.Running)) { |
| ITimeWindow twr = new TimeWindow(); |
| String millis; |
| millis = TimeStamp.getCurrentMillis(); |
| // Mark the time the process started running |
| processEntry.getValue().setTimeWindowRun(twr); |
| twr.setStart(millis); |
| } |
| } |
| } |
| ManagedProcess deployedProcess = null; |
| synchronized (monitor) { |
| for (ManagedProcess dProcess : deployedProcesses) { |
| // Find ManagedProcess instance the DuccProcess |
| // instance is associated with |
| if (dProcess.getDuccProcess().getDuccId().getUnique() |
| .equals(duccEvent.getDuccProcessId())) { |
| deployedProcess = dProcess; |
| break; |
| } |
| } |
| } |
| if (processEntry.getValue().getProcessState() != ProcessState.Running |
| && duccEvent.getState().equals(ProcessState.Running) && deployedProcess != null) { |
| // cancel process initialization timer. |
| deployedProcess.stopInitializationTimer(); |
| } |
| // if a JP process has been deallocated, ignore any updates |
| // from it. It's stopping. |
| if (processEntry.getValue().isDeallocated()) { |
| // stop collecting process stats from /proc/<pid>/statm |
| if (duccEvent.getPid() != null) { |
| try { |
| super.getContext().stopRoute(duccEvent.getPid()); |
| } catch( Exception e) { |
| logger.error(methodName, null, "Unable to stop Camel route for PID:"+duccEvent.getPid()); |
| } |
| } |
| return; |
| } |
| |
| logger.info(methodName, null, ">>>> Agent Handling Process Update - Ducc Id: " |
| + processEntry.getValue().getDuccId() + " PID:" + duccEvent.getPid() + " Status:" |
| + duccEvent.getState() + " Deallocated:" |
| + processEntry.getValue().isDeallocated()); |
| if (deployedProcess != null && deployedProcess.getSocketEndpoint() == null |
| && duccEvent.getServiceEdnpoint() != null) { |
| deployedProcess.setSocketEndpoint(duccEvent.getServiceEdnpoint()); |
| } |
| |
| // This is a delayed stop. Previously a request to stop the |
| // process was received |
| // but the PID was not available yet. Instead a flag was set |
| // to initiate a |
| // stop after the process reports the PID. |
| if (deployedProcess != null && deployedProcess.killAfterLaunch()) { |
| logger.info(methodName, null, ">>>> Process Ducc Id:" |
| + processEntry.getValue().getDuccId() |
| + " Was Previously Tagged for Kill While It Was Starting"); |
| undeployProcess(processEntry.getValue()); |
| } else if ( deployedProcess != null && deployedProcess.doKill() && |
| deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped) ) { |
| deployedProcess.getDuccProcess().setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString()); |
| } else if ( deployedProcess != null && ( deployedProcess.doKill() |
| || deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Failed) |
| || deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Killed)) ) { |
| // The process has already stopped, but managed to send |
| // the last update before dying. Ignore the update |
| return; |
| } else if (changeState(duccEvent.getState())) { |
| processEntry.getValue().setProcessState(duccEvent.getState()); |
| // if the process is Stopping, it must have hit an error threshold |
| } |
| // Check if MemoryCollector should be created for this |
| // process. It collects |
| // resident memory of the process at regular intervals. |
| // Should only be added |
| // once for each process. This route will have its id set to |
| // process PID. |
| if (addProcessMemoryCollector(duccEvent.getPid()) |
| && (duccEvent.getState().equals(ProcessState.Initializing) || duccEvent |
| .getState().equals(ProcessState.Running))) { |
| if ( duccEvent.getState().equals(ProcessState.Running) ) { |
| if ( processEntry.getValue().getUimaPipelineComponents() != null && |
| processEntry.getValue().getUimaPipelineComponents().size() > 0 ) { |
| processEntry.getValue().getUimaPipelineComponents().clear(); |
| if ( duccEvent.getUimaPipeline() != null ) { |
| duccEvent.getUimaPipeline().clear(); |
| } |
| } |
| } |
| /* |
| RouteBuilder rb = new ProcessMemoryUsageRoute(this, processEntry.getValue(), |
| deployedProcess); |
| super.getContext().addRoutes(rb); |
| //super.getContext().start(); |
| super.getContext().startRoute(duccEvent.getPid()); |
| logger.info( |
| methodName, |
| null, |
| "Started Process Metric Gathering Thread For PID:"+duccEvent.getPid()); |
| |
| |
| StringBuffer sb = new StringBuffer(); |
| if ( duccEvent.getState().equals(ProcessState.Running) ) { |
| if ( processEntry.getValue().getUimaPipelineComponents() != null && |
| processEntry.getValue().getUimaPipelineComponents().size() > 0 ) { |
| processEntry.getValue().getUimaPipelineComponents().clear(); |
| if ( duccEvent.getUimaPipeline() != null ) { |
| duccEvent.getUimaPipeline().clear(); |
| } |
| } |
| } |
| for ( Route route : super.getContext().getRoutes() ) { |
| sb.append("Camel Context - RouteId:"+route.getId()+"\n"); |
| } |
| logger.info( |
| methodName, |
| null, |
| sb.toString()); |
| |
| logger.info( |
| methodName, |
| null, |
| ">>>> Agent Added new Process Memory Collector Thread for Process:" |
| + duccEvent.getPid()); |
| */ |
| } else if (duccEvent.getState().equals(ProcessState.Stopped) |
| || duccEvent.getState().equals(ProcessState.Failed) |
| || duccEvent.getState().equals(ProcessState.Killed)) { |
| try { |
| // stop collecting process stats from /proc/<pid>/statm |
| super.getContext().stopRoute(duccEvent.getPid()); |
| } catch( Exception e) { |
| logger.error(methodName, null, "....Unable to stop Camel route for PID:" + duccEvent.getPid()); |
| } |
| |
| // super.getContext().stopRoute(duccEvent.getPid()); |
| |
| // remove route from context, otherwise the routes accumulate over time causing memory leak |
| super.getContext().removeRoute(duccEvent.getPid()); |
| StringBuffer sb = new StringBuffer(); |
| logger.info( |
| methodName, |
| null, |
| "Removed Camel Route from Context for PID:"+duccEvent.getPid()); |
| |
| for ( Route route : super.getContext().getRoutes() ) { |
| sb.append("Camel Context - RouteId:"+route.getId()+"\n"); |
| } |
| logger.info( |
| methodName, |
| null, |
| sb.toString()); |
| |
| |
| if ( deployedProcess.getMetricsProcessor() != null ) { |
| deployedProcess.getMetricsProcessor().close(); // close open fds (stat and statm files) |
| } |
| logger.info(methodName, null, |
| "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:" |
| + duccEvent.getPid()); |
| } else if (duccEvent.getState().equals(ProcessState.FailedInitialization)) { |
| logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:" |
| + duccEvent.getPid()); |
| deployedProcess.getDuccProcess().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.FailedInitialization.toString()); |
| deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping); |
| deployedProcess.setStopping(); |
| /* |
| deployedProcess.kill(); |
| logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:" |
| + duccEvent.getPid() + " Killing Process"); |
| try { |
| super.getContext().stopRoute(duccEvent.getPid()); |
| } catch( Exception e) { |
| logger.error(methodName, null, "Unable to stop Camel route for PID:"+duccEvent.getPid()); |
| } |
| logger.info(methodName, null, |
| "----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:" |
| + duccEvent.getPid() + ". Process Failed Initialization"); |
| undeployProcess(processEntry.getValue()); |
| */ |
| } else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) { |
| deployedProcess.getDuccProcess().setReasonForStoppingProcess( |
| ReasonForStoppingProcess.InitializationTimeout.toString()); |
| deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping); |
| //deployedProcess.setStopping(); |
| |
| // Mark process for death. Doesnt actually kill the process |
| |
| deployedProcess.kill(); |
| logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:" |
| + duccEvent.getPid() + " Killing Process"); |
| |
| undeployProcess(processEntry.getValue()); |
| |
| } |
| else if (duccEvent.getState().equals(ProcessState.Stopping)) { |
| if ( duccEvent.getMessage() != null && duccEvent.getMessage().equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) { |
| processEntry.getValue(). |
| setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededErrorThreshold.toString()); |
| } |
| if ( !deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped) && |
| !deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopping) ) { |
| deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping); |
| deployedProcess.setStopping(); |
| } |
| } |
| if (duccEvent.getUimaPipeline() != null) { |
| StringBuffer buffer = new StringBuffer("\t\tUima Pipeline -"); |
| for (IUimaPipelineAEComponent uimaAeState : duccEvent.getUimaPipeline()) { |
| buffer.append("\n\t\tAE:").append(uimaAeState.getAeName()).append(" state:") |
| .append(uimaAeState.getAeState()).append(" InitTime:") |
| .append(uimaAeState.getInitializationTime() / 1000).append(" secs. Thread:") |
| .append(uimaAeState.getAeThreadId()); |
| } |
| logger.info(methodName, null, buffer.toString()); |
| ((DuccProcess) processEntry.getValue()).setUimaPipelineComponents(duccEvent |
| .getUimaPipeline()); |
| } |
| return; // found it. Done |
| } |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| inventorySemaphore.release(); |
| } |
| } |
| |
| /** |
| * Deploys process using supplied command line |
| * |
| * @param process |
| * - Process with identity (DuccId) |
| * @param commandLine |
| * - fully defined command line that will be used to exec the process. |
| */ |
| private void deployProcess(IDuccProcess process, ICommandLine commandLine, |
| IDuccStandardInfo info, DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment) { |
| String methodName = "deployProcess"; |
| synchronized (monitor) { |
| boolean deployProcess = true; |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| // ignore duplicate start request for the same process |
| if (deployedProcess.getDuccId().equals(process.getDuccId())) { |
| deployProcess = false; |
| break; |
| } |
| } |
| if (deployProcess) { |
| try { |
| logger.info(methodName, workDuccId, "Agent [" + getIdentity().getIp() |
| + "] Deploying Process - DuccID:" + process.getDuccId().getFriendly() |
| + " Unique DuccID:" + process.getDuccId().getUnique() + " Node Assignment:" |
| + process.getNodeIdentity().getIp() + " Process Memory Assignment:" |
| + processMemoryAssignment + " MBs"); |
| TimeWindow tw = new TimeWindow(); |
| tw.setStart(TimeStamp.getCurrentMillis()); |
| tw.setEnd(null); |
| process.setTimeWindowInit(tw); |
| ManagedProcess managedProcess = new ManagedProcess(process, commandLine, this, logger, |
| processMemoryAssignment); |
| managedProcess.setProcessInfo(info); |
| managedProcess.setWorkDuccId(workDuccId); |
| |
| // enrich process spec with unique ducc id which will be |
| // used to correlate message |
| // exchanges |
| // between the agent and launched process |
| deployedProcesses.add(launcher.launchProcess(this, getIdentity(), process, commandLine, |
| this, managedProcess)); |
| |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } else { |
| logger.info(methodName, workDuccId, "Ignoring duplicate request to start process - DuccID:" |
| + process.getDuccId().getFriendly() + " Unique DuccID:" |
| + process.getDuccId().getUnique()); |
| } |
| } |
| } |
| |
| /** |
| * Kills a given process |
| * |
| * @param process |
| * - process to kill |
| */ |
| private void undeployProcess(IDuccProcess process) { |
| String methodName = "undeployProcess"; |
| synchronized (monitor) { |
| boolean processFound = false; |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| if (deployedProcess.getDuccId().equals(process.getDuccId())) { |
| String pid = deployedProcess.getDuccProcess().getPID(); |
| processFound = true; |
| if (deployedProcess.isStopping()) { |
| logger.info(methodName, null, "....Process Already Stopping PID:" + process.getPID()+" Returning"); |
| break; // this process is already in stopping state |
| } |
| logger.info(methodName, null, "....Undeploying Process - DuccId:" + process.getDuccId() |
| + " PID:" + pid); |
| if (pid != null) { |
| // try { |
| // // stop collecting process stats from /proc/<pid>/statm |
| // super.getContext().stopRoute(pid); |
| // logger.info(methodName, null, "Stopped Camel Route Collecting Metrics For PID:"+pid); |
| // } catch( Exception e) { |
| // logger.error(methodName, null, "....Unable to stop Camel route for PID:" + pid); |
| // } |
| // Mark the process as stopping. When the process exits, |
| // the agent can determine |
| // if the process died on its own (due to say, user code |
| // problem) or if it died |
| // due to Agent initiated stop. |
| deployedProcess.setStopping(); |
| // Agent will first send a stop request (via JMS) to the |
| // process. |
| // If the process doesnt stop within alloted window the |
| // agent |
| // will kill it hard |
| ICommandLine cmdLine; |
| try { |
| if (Utils.isWindows()) { |
| cmdLine = new NonJavaCommandLine("taskkill"); |
| cmdLine.addArgument("/PID"); |
| } else { |
| cmdLine = new NonJavaCommandLine("/bin/kill"); |
| cmdLine.addArgument("-9"); |
| } |
| cmdLine.addArgument(pid); |
| launcher.launchProcess(this, getIdentity(), process, cmdLine, this, deployedProcess); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } else if (!deployedProcess.getDuccProcess().getProcessState() |
| .equals(ProcessState.Stopped)) { // process |
| // not |
| // reported |
| // its |
| // PID |
| // yet |
| // when the process reports its PID, check if it should |
| // be killed. |
| logger.info( |
| methodName, |
| null, |
| ".... Process - Ducc ID:" |
| + deployedProcess.getDuccId() |
| + " Has Not Started Yet. PID Not Available. Tagging Process For Kill When It Reports Status"); |
| deployedProcess.killAfterLaunch(true); |
| } else { |
| logger.info(methodName, null, ".... Process Has Already Stopped."); |
| } |
| break; |
| } |
| } |
| if (!processFound) { |
| logger.info(methodName, null, ".... Process - DuccId:" + process.getDuccId() + " PID:" |
| + process.getPID() |
| + " Not in Agent's inventory. Adding to the inventory with state=Stopped"); |
| process.setProcessState(ProcessState.Stopped); |
| inventory.put(process.getDuccId(), process); |
| deployedProcesses.add(new ManagedProcess(process, null, this, logger, |
| new ProcessMemoryAssignment())); |
| } |
| } |
| } |
| |
| public NodeIdentity getIdentity() { |
| return nodeIdentity; |
| } |
| |
| /** |
| * Called when a process exits. |
| */ |
| public void onProcessExit(IDuccProcess process) { |
| String methodName = "onProcessExit"; |
| if ( process == null ) { |
| return; |
| } |
| try { |
| ProcessStateUpdate processStateUpdate = new ProcessStateUpdate(process.getProcessState(), |
| process.getPID(), process.getDuccId().getUnique()); |
| ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate); |
| // cleanup Camel route associated with a process that just stopped |
| if ( process.getPID() != null && super.getContext().getRoute(process.getPID()) != null ) { |
| // super.getContext().stopRoute(process.getPID()); |
| try { |
| // stop collecting process stats from /proc/<pid>/statm |
| super.getContext().stopRoute(process.getPID()); |
| } catch( Exception e) { |
| logger.error(methodName, null, "....Unable to stop Camel route for PID:" + process.getPID()); |
| } |
| // remove route from context, otherwise the routes accumulate over time causing memory leak |
| super.getContext().removeRoute(process.getPID()); |
| StringBuffer sb = new StringBuffer("\n"); |
| logger.info( |
| methodName, |
| null, |
| "Removed Camel Route from Context for PID:"+process.getPID()); |
| |
| for ( Route route : super.getContext().getRoutes() ) { |
| sb.append("Camel Context - RouteId:"+route.getId()+"\n"); |
| } |
| logger.info( |
| methodName, |
| null, |
| sb.toString()); |
| } |
| updateProcessStatus(event); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } finally { |
| |
| try { |
| synchronized (monitor) { |
| logger.info(methodName, null, "----------------- Deployed Process List Size:" |
| + deployedProcesses.size()); |
| |
| // reference to an object we need to remove from the list |
| // of deployed processes |
| ManagedProcess deployedProcessRef = null; |
| // Find a matching ManagedProcess for provided IDuccProcess |
| // so that we can remove it from the list |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| if (deployedProcess.getDuccProcess() != null |
| && deployedProcess.getDuccProcess().equals(process)) { |
| deployedProcessRef = deployedProcess; |
| break; |
| } |
| } |
| if (deployedProcessRef != null) { |
| logger.info(methodName, null, |
| "----------------- Removing Stopped Process from Deployed List"); |
| deployedProcesses.remove(deployedProcessRef); |
| logger.info(methodName, null, |
| "----------------- Deployed Process List Size After Remove:" |
| + deployedProcesses.size()); |
| } else { |
| logger.info(methodName, null, |
| "----------------- Process Exited but Not Found in List - Deployed Process List Size:" |
| + deployedProcesses.size()); |
| } |
| } |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| } |
| |
| public void onJPInitTimeout(IDuccProcess process, long timeout) { |
| String methodName = "onJPInitTimeout"; |
| try { |
| System.out.println("--------- Agent Timedout While Waiting For JP (PID:" + process.getPID() |
| + ") to initialize. The JP exceeded configured timeout of " + timeout/(60*1000) + " minutes"); |
| ProcessStateUpdate processStateUpdate = new ProcessStateUpdate( |
| ProcessState.InitializationTimeout, process.getPID(), process.getDuccId().getUnique()); |
| ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate); |
| updateProcessStatus(event); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| |
| public void shutdown(String reason) { |
| String methodName = "shutdown"; |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| try { |
| undeployProcess(deployedProcess.getDuccProcess()); |
| } catch (Exception e) { |
| logger.error(methodName, null, e); |
| } |
| } |
| } |
| |
| public static void lock() throws Exception { |
| agentLock.acquire(); |
| } |
| |
| public static void unlock() throws Exception { |
| agentLock.release(); |
| } |
| |
| public boolean isManagedProcess(Set<NodeUsersCollector.ProcessInfo> processList, |
| NodeUsersCollector.ProcessInfo cpi) { |
| synchronized (monitor) { |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| if (deployedProcess.getDuccProcess() != null) { |
| // Check if process has been deployed but has not yet |
| // reported its PID. |
| // This is normal. It takes a bit of time until the JP |
| // reports |
| // its PID to the Agent. If there is at least one process in |
| // Agent |
| // deploy list with no PID we assume it is the one. |
| String dppid = deployedProcess.getDuccProcess().getPID(); |
| if (dppid == null || dppid.equals(String.valueOf(cpi.getPid()))) { |
| return true; |
| } |
| // if (dppid != null && dppid.equals(cpi.getPid())) { |
| // return true; |
| // } |
| } |
| } |
| for (NodeUsersCollector.ProcessInfo pi : processList) { |
| if (pi.getPid() == cpi.getPPid() && pi.getChildren().size() > 0) { // is |
| // the |
| // current |
| // process |
| // a |
| // child |
| // of |
| // another |
| // java |
| // Process? |
| return isManagedProcess(pi.getChildren(), pi); |
| } |
| } |
| } |
| return false; |
| } |
| |
| public boolean isRogueProcess(String uid, Set<NodeUsersCollector.ProcessInfo> processList, |
| NodeUsersCollector.ProcessInfo cpi) throws Exception { |
| |
| synchronized (monitor) { |
| // if cgroups are enabled, check if a given PID (cpi) exists in any of |
| // the containers. If so, the process is not rogue. |
| if ( useCgroups ) { |
| if ( cgroupsManager.isPidInCGroup(String.valueOf(cpi.getPid())) ) { |
| return false; |
| } |
| } |
| // Agent adds a process to its inventory before launching it. So it |
| // is |
| // possible that the inventory contains a process with no PID. If |
| // there |
| // is such process in the inventory we cannot determine that a given |
| // pid is rogue yet. Eventually, the launched process reports its |
| // PID |
| boolean foundDeployedProcessWithNoPID = false; |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| if (deployedProcess.getDuccProcess() != null) { |
| // Check if process has been deployed but has not yet |
| // reported its PID. |
| // This is normal. It takes a bit of time until the JP |
| // reports |
| // its PID to the Agent. If there is at least one process in |
| // Agent |
| // deploy list with no PID we assume it is the one. |
| if (deployedProcess.getDuccProcess().getPID() == null) { |
| foundDeployedProcessWithNoPID = true; |
| break; |
| } |
| String dppid = deployedProcess.getDuccProcess().getPID(); |
| // process in inventory, not rogue |
| if (dppid != null && dppid.equals(String.valueOf(cpi.getPid()))) { |
| return false; |
| } |
| } |
| } |
| // not found |
| if (foundDeployedProcessWithNoPID) { |
| return false; |
| } else if ( cpi.getPPid() == 1 ) { // Any process owned by init is rogue |
| // interrupt agent's thread blocking in waitFor() awaiting process termination. |
| // This process is a zombie and there is no need to waste the thread. |
| interruptThreadInWaitFor(String.valueOf(cpi.getPid())); |
| return true; |
| } else { |
| return isParentProcessRogue(processList, cpi); |
| } |
| } |
| //return false; |
| } |
| |
| private boolean isParentProcessRogue(Set<NodeUsersCollector.ProcessInfo> processList, |
| NodeUsersCollector.ProcessInfo cpi) { |
| // boolean found = false; |
| for (NodeUsersCollector.ProcessInfo pi : processList) { |
| if (pi.getPid() == cpi.getPPid()) { // is the current process a |
| // child of another java |
| // Process? |
| // found = true; |
| if (pi.isRogue()) { // if parent is rogue, a child is rogue as |
| // well |
| return true; |
| } |
| return false; |
| } else { |
| if (pi.getChildren().size() > 0) { |
| return isParentProcessRogue(pi.getChildren(), cpi); |
| } |
| } |
| } |
| // if ( !found ) { |
| // return true; // rogue process |
| // } |
| // return false; |
| return true; |
| |
| } |
| |
| /** |
| * Process resident memory collector routes. Collects resident memory at fixed interval from the |
| * OS. |
| * |
| */ |
| public class ProcessMemoryUsageRoute extends RouteBuilder { |
| private NodeAgent agent; |
| |
| private IDuccProcess process; |
| |
| private ManagedProcess managedProcess; |
| |
| public ProcessMemoryUsageRoute(NodeAgent agent, IDuccProcess process, |
| ManagedProcess managedProcess) { |
| this.process = process; |
| this.managedProcess = managedProcess; |
| this.agent = agent; |
| } |
| |
| public void configure() throws Exception { |
| Processor nmp = configurationFactory.processMetricsProcessor(agent, process, managedProcess); |
| int fixedRate = configurationFactory.getNodeInventoryPublishDelay(); |
| from("timer:processMemPollingTimer?fixedRate=true&delay=100&period=" + fixedRate).routeId(process.getPID()) |
| .autoStartup(true) |
| .process(nmp); |
| } |
| } |
| |
| public void stop() throws Exception { |
| if (stopping) { |
| return; |
| } |
| stopping = true; |
| logger.info("stop", null, "Agent stopping managed processes"); |
| // Stop monitor thread watching for Agent pings |
| // if ( nodeMonitor != null ) { |
| // nodeMonitor.shutdown(); |
| // } |
| // if ( configurationFactory.getAgentPingDispatcher() != null ) { |
| // configurationFactory.getAgentPingDispatcher().stop(); |
| // } |
| synchronized (monitor) { |
| Iterator<ManagedProcess> it = deployedProcesses.iterator(); |
| while (it.hasNext()) { |
| ManagedProcess mp = it.next(); |
| // mp.kill(); |
| stopProcess(mp.getDuccProcess()); |
| } |
| } |
| logger.info("stop", null, "Agent dispatched STOP to all managed processes"); |
| |
| // wait until all JPs stop |
| while (true) { |
| try { |
| // break if no processes in the inventory |
| if (deployedProcesses.size() == 0) { |
| break; |
| } |
| boolean atLeastOneProcessStillRunning = false; |
| synchronized (monitor) { |
| // check state of each process. If there is a process that |
| // is not dead yet |
| // just wait a little while and check the state again. |
| Iterator<ManagedProcess> pit = deployedProcesses.iterator(); |
| // find at least one process that is not dead yet |
| while (pit.hasNext()) { |
| ManagedProcess mp = pit.next(); |
| // if the process is not dead, |
| if (!mp.getDuccProcess().getProcessState().equals(ProcessState.Stopped) |
| && !mp.getDuccProcess().getProcessState().equals(ProcessState.Failed) |
| && !mp.getDuccProcess().getProcessState().equals(ProcessState.Killed)) { |
| atLeastOneProcessStillRunning = true; |
| break; |
| } |
| } |
| } |
| // if there are no running processes just break from the |
| // 'while(true)' loop |
| if (atLeastOneProcessStillRunning == false) { |
| break; |
| } |
| synchronized (this) { |
| wait(100); |
| } |
| } catch (Exception e) { |
| } |
| } |
| logger.info("stop", null, "Agent managed processes have stopped"); |
| // Stop publishing inventory. Once the route is down the agent forces last publication |
| // sending an empty process map. |
| configurationFactory.stopInventoryRoute(); |
| |
| // Send an empty process map as the final inventory |
| HashMap<DuccId, IDuccProcess> emptyMap = |
| new HashMap<DuccId, IDuccProcess>(); |
| DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(emptyMap); |
| inventoryDispatcher.dispatch(duccEvent); |
| logger.info("stop", null, "Agent published final inventory"); |
| |
| // Delay this thread to make sure that at least one last node inventory publish occurs before Agent goes away. Add extra 30 secs |
| // to the delay to make sure the publish happens. |
| // synchronized (this) { |
| // long waittime = configurationFactory.getNodeInventoryPublishDelay() +30000; |
| // logger.info("stop", null, "Waiting", waittime, "ms to send final NodeInventory."); |
| // wait(waittime); |
| // } |
| super.stop(); |
| |
| } |
| |
| public Future<?> getDeployedJPFuture(IDuccId duccId) { |
| for (ManagedProcess deployedProcess : deployedProcesses) { |
| // ignore duplicate start request for the same process |
| if (deployedProcess.getDuccId().equals(duccId)) { |
| return deployedProcess.getFuture(); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Copies reservations sent by the PM. It copies reservations associated with this node. |
| * |
| * @param reserves |
| * - list of ALL reservations |
| * @throws Exception |
| */ |
| public void setReservations(List<DuccUserReservation> reserves) throws Exception { |
| try { |
| reservationsSemaphore.acquire(); |
| if (reserves != null) { |
| // clear old entries |
| reservations.clear(); |
| // Only copy reservations for this node |
| IDuccReservationMap reserveMap = new DuccReservationMap(); |
| for (DuccUserReservation r : reserves) { |
| reserveMap.clear(); |
| for (Map.Entry<DuccId, IDuccReservation> entry : r.getUserReservations().entrySet()) { |
| if (Utils.isThisNode(getIdentity().getIp(), entry.getValue().getNodeIdentity().getIp())) { |
| reserveMap.addReservation(entry.getValue()); |
| } |
| } |
| if (reserveMap.getMap().size() > 0) { |
| DuccUserReservation reserve = new DuccUserReservation(r.getUserId(), r.getReserveID(), |
| reserveMap); |
| reservations.add(reserve); |
| } |
| } |
| } |
| |
| // this.reservations = reservations; |
| logger.info("setReservations", null, "+++++++++++ Copied User Reservations - List Size:" |
| + reservations.size()); |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| } |
| |
| public List<DuccId> getUserReservations(String uid) { |
| List<DuccId> reservationIds = new ArrayList<DuccId>(); |
| try { |
| reservationsSemaphore.acquire(); |
| if (reservations != null) { |
| for (DuccUserReservation r : reservations) { |
| if (r.getUserId().equals(uid)) { |
| for (Map.Entry<DuccId, IDuccReservation> entry : r.getUserReservations().entrySet()) { |
| reservationIds.add(entry.getValue().getDuccId()); |
| } |
| break; |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| return reservationIds; |
| } |
| |
| public void copyAllUserReservations(TreeMap<String, NodeUsersInfo> map) { |
| try { |
| reservationsSemaphore.acquire(); |
| if (reservations != null) { |
| logger.info("copyAllUserReservations", null, |
| "+++++++++++ Copying User Reservations - List Size:" + reservations.size()); |
| for (DuccUserReservation r : reservations) { |
| if ("System".equals(r.getUserId())) { |
| continue; |
| } |
| NodeUsersInfo nui = null; |
| if (map.containsKey(r.getUserId())) { |
| nui = map.get(r.getUserId()); |
| } else { |
| nui = new NodeUsersInfo(r.getUserId()); |
| map.put(r.getUserId(), nui); |
| } |
| nui.addReservation(r.getReserveID()); |
| } |
| } else { |
| logger.info("copyAllUserReservations", null, " *********** No Reservations"); |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| |
| } |
| |
| public boolean userHasReservation(String uid) throws Exception { |
| try { |
| reservationsSemaphore.acquire(); |
| |
| for (DuccUserReservation r : reservations) { |
| if (r.getUserId().equals(uid)) { |
| return true; |
| } |
| } |
| } catch (InterruptedException e) { |
| } finally { |
| reservationsSemaphore.release(); |
| } |
| return false; |
| } |
| |
| public Object deepCopy(Object original) throws Exception { |
| ObjectInputStream ois = null; |
| ObjectOutputStream oos; |
| ByteArrayInputStream bis; |
| ByteArrayOutputStream bos; |
| Object copy; |
| try { |
| // serialize object to bytes |
| bos = new ByteArrayOutputStream(); |
| oos = new ObjectOutputStream(bos); |
| oos.writeObject(original); |
| oos.close(); |
| |
| // construct an object from the bytes |
| bis = new ByteArrayInputStream(bos.toByteArray()); |
| ois = new ObjectInputStream(bis); |
| copy = ois.readObject(); |
| return copy; |
| } catch (Exception e) { |
| throw e; |
| } finally { |
| if (ois != null) { |
| ois.close(); |
| } |
| } |
| } |
| |
| public RogueProcessReaper getRogueProcessReaper() { |
| return rogueProcessReaper; |
| } |
| |
| /** |
| * Called when an Agent receives self dispatched Ping message. |
| */ |
| // public void ping(AgentPingEvent agentPing) { |
| // nodeMonitor.nodeArrives(agentPing.getNode()); |
| // } |
| /* |
| * public boolean excludeUser(String userId ) { if ( configurationFactory.userExclusionList != |
| * null ) { // exclusion list contains comma separated user ids String[] excludedUsers = |
| * configurationFactory.userExclusionList.split(","); for ( String excludedUser : excludedUsers ) |
| * { if ( excludedUser.equals(userId)) { return true; } } } return false; } public boolean |
| * excludeProcess(String process ) { if ( configurationFactory.processExclusionList != null ) { // |
| * exclusion list contains comma separated user ids String[] excludedProcesses = |
| * configurationFactory.processExclusionList.split(","); for ( String excludedProcess : |
| * excludedProcesses ) { if ( excludedProcess.equals(process)) { return true; } } } return false; |
| * } |
| */ |
| public static void main(String[] args) { |
| try { |
| NodeIdentity node = new NodeIdentity(InetAddress.getLocalHost().getHostAddress(), InetAddress |
| .getLocalHost().getHostName()); |
| NodeAgent agent = new NodeAgent(node); |
| |
| List<DuccUserReservation> reserves = new ArrayList<DuccUserReservation>(); |
| |
| IDuccReservationMap reserveMap = new DuccReservationMap(); |
| IDuccReservationMap reserveMap2 = new DuccReservationMap(); |
| |
| NodeIdentity ni1 = node; |
| // new NodeIdentity(, name); |
| NodeIdentity ni2 = new NodeIdentity("111.111.111.111", "node100"); |
| NodeIdentity ni3 = node; |
| NodeIdentity ni4 = new NodeIdentity("222.222.222.222", "node102"); |
| |
| DuccId id1 = new DuccId(100); |
| DuccId id2 = new DuccId(101); |
| DuccId id3 = new DuccId(102); |
| DuccId id4 = new DuccId(103); |
| |
| IDuccReservation reservation1 = new DuccReservation(id1, ni1, 1); |
| reserveMap.addReservation(reservation1); |
| IDuccReservation reservation2 = new DuccReservation(id2, ni2, 1); |
| reserveMap.addReservation(reservation2); |
| IDuccReservation reservation4 = new DuccReservation(id4, ni4, 1); |
| reserveMap.addReservation(reservation4); |
| |
| IDuccReservation reservation3 = new DuccReservation(id3, ni3, 1); |
| reserveMap2.addReservation(reservation3); |
| |
| DuccUserReservation reserve = new DuccUserReservation("joe", new DuccId(500), reserveMap); |
| DuccUserReservation reserve2 = new DuccUserReservation("jane", new DuccId(500), reserveMap2); |
| reserves.add(reserve); |
| reserves.add(reserve2); |
| |
| agent.setReservations(reserves); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| private class NodeExclusionParser { |
| private boolean excludeNodeFromCGroups = false; |
| |
| private boolean excludeAP = false; |
| |
| public void parse(String exclFile) throws Exception { |
| // <node>=cgroup,ap |
| File exclusionFile = new File(exclFile); |
| if (!exclusionFile.exists()) { |
| return; |
| } |
| BufferedReader br = new BufferedReader(new FileReader(exclusionFile)); |
| String line; |
| NodeIdentity node = getIdentity(); |
| String nodeName = node.getName(); |
| if (nodeName.indexOf(".") > -1) { |
| nodeName = nodeName.substring(0, nodeName.indexOf(".")); |
| } |
| |
| while ((line = br.readLine()) != null) { |
| if (line.startsWith(nodeName)) { |
| String exclusions = line.substring(line.indexOf("=") + 1); |
| String[] parsedExclusions = exclusions.split(","); |
| for (String exclusion : parsedExclusions) { |
| |
| if (exclusion.trim().equals("cgroup")) { |
| excludeNodeFromCGroups = true; |
| |
| } else if (exclusion.trim().equals("ap")) { |
| excludeAP = true; |
| |
| } |
| } |
| break; |
| } |
| } |
| br.close(); |
| } |
| |
| public boolean apExcluded() { |
| return excludeAP; |
| } |
| |
| public boolean cgroupsExcluded() { |
| return excludeNodeFromCGroups; |
| } |
| } |
| |
| public DuccLogger getLogger() { |
| return logger; |
| } |
| |
| public void handleAdminEvent(DuccAdminEvent event) throws Exception { |
| if (event instanceof DuccAdminEventStopMetrics) { |
| // Get target machines from the message |
| String[] nodes = ((DuccAdminEventStopMetrics) event).getTargetNodes().split(","); |
| // Check if this message applies to this node |
| for (String targetNode : nodes) { |
| if (Utils.isMachineNameMatch(targetNode.trim(), getIdentity().getName())) { |
| logger.info("handleAdminEvent", null, |
| "... Agent Received an Admin Request to Stop Metrics Collection and Publishing"); |
| // Stop Camel route responsible for driving collection and publishing of metrics |
| configurationFactory.stopMetricsRoute(); |
| logger.info("handleAdminEvent", null, |
| "... Agent Stopped Metrics Collection and Publishing"); |
| break; |
| } else { |
| logger.info("handleAdminEvent", null, "... Agent Not Target For Message:" |
| + event.getClass().getName()); |
| } |
| } |
| } else { |
| logger.info("handleAdminEvent", null, "... Agent Received Unexpected Message of Type:" |
| + event.getClass().getName()); |
| |
| } |
| } |
| } |