blob: f9220c2acc81a0589235c2db383e9fbbb30b445a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelContext;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.builder.RouteBuilder;
import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.uima.ducc.agent.config.AgentConfiguration;
import org.apache.uima.ducc.agent.event.AgentEventListener;
import org.apache.uima.ducc.agent.event.ProcessLifecycleObserver;
import org.apache.uima.ducc.agent.launcher.CGroupsManager;
import org.apache.uima.ducc.agent.launcher.DefunctProcessDetector;
import org.apache.uima.ducc.agent.launcher.ICommand;
import org.apache.uima.ducc.agent.launcher.Launcher;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.agent.launcher.ManagedProcess.StopPriority;
import org.apache.uima.ducc.agent.launcher.SigKillCommand;
import org.apache.uima.ducc.agent.launcher.SigTermCommand;
import org.apache.uima.ducc.agent.metrics.collectors.NodeUsersCollector;
import org.apache.uima.ducc.agent.processors.DefaultNodeInventoryProcessor;
import org.apache.uima.ducc.common.Node;
import org.apache.uima.ducc.common.NodeIdentity;
import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventQuiesceAndStop;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventStop;
import org.apache.uima.ducc.common.admin.event.DuccAdminEventStopMetrics;
import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
import org.apache.uima.ducc.common.component.AbstractDuccComponent;
import org.apache.uima.ducc.common.main.DuccService;
import org.apache.uima.ducc.common.node.metrics.NodeUsersInfo;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.TimeStamp;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.common.utils.IDuccLoggerComponents.Daemon;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.common.utils.id.IDuccId;
import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
import org.apache.uima.ducc.transport.cmdline.ICommandLine;
import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.apache.uima.ducc.transport.event.AgentProcessLifecycleReportDuccEvent;
import org.apache.uima.ducc.transport.event.AgentProcessLifecycleReportDuccEvent.LifecycleEvent;
import org.apache.uima.ducc.transport.event.DuccEvent.EventType;
import org.apache.uima.ducc.transport.event.DaemonDuccEvent;
import org.apache.uima.ducc.transport.event.DuccEvent;
import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
import org.apache.uima.ducc.transport.event.ProcessStateUpdateDuccEvent;
import org.apache.uima.ducc.transport.event.common.DuccProcess;
import org.apache.uima.ducc.transport.event.common.DuccReservation;
import org.apache.uima.ducc.transport.event.common.DuccReservationMap;
import org.apache.uima.ducc.transport.event.common.DuccUserReservation;
import org.apache.uima.ducc.transport.event.common.IDuccJobDeployment;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IDuccReservation;
import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
import org.apache.uima.ducc.transport.event.common.ITimeWindow;
import org.apache.uima.ducc.transport.event.common.ProcessMemoryAssignment;
import org.apache.uima.ducc.transport.event.common.TimeWindow;
public class NodeAgent extends AbstractDuccComponent implements Agent, ProcessLifecycleObserver {
public static DuccLogger logger = DuccLogger.getLogger(NodeAgent.class, COMPONENT_NAME);
private static DuccId jobid = null;
// Replaced by duplicate in org.apache.uima.ducc.transport.agent.ProcessStateUpdate
// public static final String ProcessStateUpdatePort = "ducc.agent.process.state.update.port";
public static int SIGKILL = 9;
public static int SIGTERM = 15;
// for LinuxNodeMetrics logging
public static AtomicLong logCounter = new AtomicLong();
public static String cgroupFailureReason;
// Map of known processes this agent is managing. This map is published
// at regular intervals as part of agent's inventory update.
private Map<DuccId, IDuccProcess> inventory = new ConcurrentHashMap<>();
// Semaphore controlling access to inventory Map
private Semaphore inventorySemaphore = new Semaphore(1);
List<ManagedProcess> deployedProcesses = new ArrayList<ManagedProcess>();
// This agent's identity ( host name and IP address)
private NodeIdentity nodeIdentity;
// Component to launch processes
private Launcher launcher;
// Reference to the Agent's configuration factory component. This is where
// Agent
// dependencies are instantiated and injected via Agent's c'tor.
public AgentConfiguration configurationFactory;
private static Semaphore agentLock = new Semaphore(1);
private DuccEventDispatcher commonProcessDispatcher;
private DuccEventDispatcher ORDispatcher;
private Object monitor = new Object();
boolean duccLingExists = false;
boolean runWithDuccLing = false;
private List<DuccUserReservation> reservations = new ArrayList<DuccUserReservation>();
private Semaphore reservationsSemaphore = new Semaphore(1);
// private AgentMonitor nodeMonitor;
private volatile boolean stopping = false;
private Object stopLock = new Object();
private RogueProcessReaper rogueProcessReaper = new RogueProcessReaper(logger, 5, 10);
public volatile boolean useCgroups = false;
public CGroupsManager cgroupsManager = null;
public Node node = null;
public volatile boolean excludeAPs = false;
public int shareQuantum;
private boolean agentVirtual = System.getProperty("ducc.agent.virtual") == null ? false : true;
// This flag, when true, forces the agent to be "real"
// regardless of the value of ducc.agent.virtual.
// In the future, this flag and support for ducc.agent.virtual
// should be removed altogether.
// This is to support CGroups for all agents,
// which was previously disabled for virtual ones.
private boolean agentRealOnly = true;
public boolean pageSizeFetched = false;
public int pageSize = 4096; // default
public int cpuClockRate = 100;
public int numProcessors = 0;
ExecutorService defunctDetectorExecutor = Executors.newCachedThreadPool();
private AgentEventListener eventListener;
// indicates whether or not this agent received at least one publication
// from the PM. This flag is used to determine if the agent should use
// rogue process detector. The detector will be used if this flag is true.
public volatile boolean receivedDuccState = false;
private String stateChangeEndpoint;
long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
public void setStateChangeEndpoint(String stateChangeEndpoint) {
this.stateChangeEndpoint = stateChangeEndpoint;
}
/**
* Ctor used exclusively for black-box testing of this class.
*/
public NodeAgent() {
super(COMPONENT_NAME, null);
}
public NodeAgent(NodeIdentity ni) {
this();
this.nodeIdentity = ni;
Utils.findDuccHome(); // add DUCC_HOME to System.properties
if (configurationFactory.processStopTimeout != null) {
maxTimeToWaitForProcessToStop = Long.valueOf(configurationFactory.processStopTimeout);
}
}
public long getLastORSequence() {
long lastORSequence = 0;
if (eventListener != null) {
lastORSequence = eventListener.getLastSequence();
}
return lastORSequence;
}
public AgentEventListener getEventListener() {
return eventListener;
}
public void setAgentEventListener(AgentEventListener listener) {
eventListener = listener;
}
public boolean isVirtual() {
boolean retVal = agentVirtual;
if (agentRealOnly) {
retVal = false;
}
return retVal;
}
/**
* Tell Orchestrator about state change for recording into system-events.log
*/
private void stateChange(EventType eventType) {
String methodName = "stateChange";
try {
Daemon daemon = Daemon.Agent;
NodeIdentity nodeIdentity = new NodeIdentity();
DaemonDuccEvent ev = new DaemonDuccEvent(daemon, eventType, nodeIdentity);
ORDispatcher.dispatch(stateChangeEndpoint, ev, "");
logger.info(methodName, null, stateChangeEndpoint, eventType.name(),
nodeIdentity.getCanonicalName());
} catch (Exception e) {
logger.error(methodName, null, e);
}
}
/*
* Report process lifecycle events on same channel that inventory is reported
*/
public DuccEventDispatcher getProcessLifecycleReportDispatcher() {
return ORDispatcher;
}
/*
* Send process lifecycle event to interested listeners
*/
private void sendProcessLifecycleEventReport(IDuccProcess process,
LifecycleEvent lifecycleEvent) {
String location = "sendProcessLifecycleEventReport";
try {
NodeIdentity nodeIdentity = getIdentity();
AgentProcessLifecycleReportDuccEvent duccEvent = new AgentProcessLifecycleReportDuccEvent(
process, nodeIdentity, lifecycleEvent);
DuccEventDispatcher dispatcher = getProcessLifecycleReportDispatcher();
dispatcher.dispatch(duccEvent);
StringBuffer sb = new StringBuffer();
sb.append("id:" + process.getDuccId().toString() + " ");
sb.append("lifecycleEvent:" + lifecycleEvent.name() + " ");
String args = sb.toString().trim();
logger.info(location, jobid, args);
} catch (Exception e) {
logger.error(location, jobid, e);
}
}
/*
* Add ManagedProcess to map and send lifecycle event
*/
private void processDeploy(ManagedProcess mp) {
String location = "processDeploy";
if (mp != null) {
if (deployedProcesses.contains(mp)) {
String args = "mp:" + mp.getProcessId();
logger.error(location, jobid, args);
} else {
String args = "mp:" + mp.getProcessId();
logger.debug(location, jobid, args);
deployedProcesses.add(mp);
IDuccProcess process = mp.getDuccProcess();
sendProcessLifecycleEventReport(process, LifecycleEvent.Launch);
}
} else {
String args = "mp:" + mp;
logger.error(location, jobid, args);
}
}
/*
* Remove ManagedProcess from map and send lifecycle event
*/
private void processUndeploy(ManagedProcess mp, Iterator<ManagedProcess> it) {
String location = "processUndeploy";
if (mp != null) {
if (!deployedProcesses.contains(mp)) {
String args = "mp:" + mp.getProcessId();
logger.error(location, jobid, args);
} else {
String args = "mp:" + mp.getProcessId();
logger.debug(location, jobid, args);
// deployedProcesses.remove(mp);
// it.remove();
IDuccProcess process = mp.getDuccProcess();
sendProcessLifecycleEventReport(process, LifecycleEvent.Terminate);
}
} else {
String args = "mp:" + mp;
logger.error(location, jobid, args);
}
}
/**
* C'tor for dependecy injection
*
* @param nodeIdentity
* - this Agent's identity
* @param launcher
* - component to launch processes
* @param context
* - camel context
*/
public NodeAgent(NodeIdentity nodeIdentity, Launcher launcher, CamelContext context,
AgentConfiguration factory) throws Exception {
super(COMPONENT_NAME, context);
Utils.findDuccHome(); // add DUCC_HOME to System.properties
// Running a real agent
agentVirtual = System.getProperty("ducc.agent.virtual") == null ? false : true;
this.nodeIdentity = nodeIdentity;
this.launcher = launcher;
this.configurationFactory = factory;
this.commonProcessDispatcher = factory.getCommonProcessDispatcher(context);
this.ORDispatcher = factory.getORDispatcher(context);
// fetch Page Size from the OS and cache it
pageSize = getOSPageSize();
numProcessors = getNodeProcessors();
logger.info("NodeAgent", null, "OS Page Size:" + pageSize);
cpuClockRate = getOSClockRate();
logger.info("NodeAgent", null, "OS Clock Rate:" + cpuClockRate);
if (System.getProperty("ducc.rm.share.quantum") != null
&& System.getProperty("ducc.rm.share.quantum").trim().length() > 0) {
shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
}
/* Enable CGROUPS */
String cgroups;
String cgUtilsPath = null;
boolean excludeNodeFromCGroups = false;
if (!isVirtual()
&& (cgroups = System.getProperty("ducc.agent.launcher.cgroups.enable")) != null) {
if (cgroups.equalsIgnoreCase("true")) {
logger.info("nodeAgent", null, "ducc.properties [ducc.agent.launcher.cgroups.enable=true]");
// Load exclusion file. Some nodes may be excluded from cgroups
String exclusionFile;
// get the name of the exclusion file from ducc.properties
if ((exclusionFile = System.getProperty("ducc.agent.exclusion.file")) != null) {
logger.info("nodeAgent", null,
"Ducc configured with cgroup node exclusion file - ducc.properties [ducc.agent.exclusion.file="
+ exclusionFile + "]");
// Parse node exclusion file and determine if cgroups and AP
// deployment
// is allowed on this node
NodeExclusionParser exclusionParser = new NodeExclusionParser();
exclusionParser.parse(exclusionFile);
excludeNodeFromCGroups = exclusionParser.cgroupsExcluded();
excludeAPs = exclusionParser.apExcluded();
if (excludeNodeFromCGroups) {
logger.info("nodeAgent", null,
"------- Node Explicitly Excluded From Using CGroups. Check File:"
+ exclusionFile);
cgroupFailureReason = "------- Node Explicitly Excluded From Using CGroups. Check File:"
+ exclusionFile;
}
System.out.println(
"excludeNodeFromCGroups=" + excludeNodeFromCGroups + " excludeAPs=" + excludeAPs);
} else {
logger.info("nodeAgent", null, "Agent node *not* excluded from using cgroups");
}
// node not in the exclusion list for cgroups
if (!excludeNodeFromCGroups) {
// fetch a list of paths the agent will search to find cgroups utils
// like cgexec. The default location is /usr/bin
logger.info("nodeAgent", null,
"Testing cgroups to check if runtime utilities (cgexec) exist in expected locations in the filesystem");
String cgroupsUtilsDirs = System.getProperty("ducc.agent.launcher.cgroups.utils.dir");
if (cgroupsUtilsDirs == null) {
cgUtilsPath = "/usr/bin"; // default
} else {
String[] paths = cgroupsUtilsDirs.split(",");
for (String path : paths) {
File file = new File(path.trim() + "/cgexec");
if (file.exists()) {
cgUtilsPath = path;
break;
}
}
}
// scan /proc/mounts for base cgroup dir
String cgroupsBaseDir = fetchCgroupsBaseDir("/proc/mounts");
if (cgUtilsPath == null) {
useCgroups = false;
logger.info("nodeAgent", null,
"------- CGroups Disabled - Unable to Find Cgroups Utils Directory. Add/Modify ducc.agent.launcher.cgroups.utils.dir property in ducc.properties");
} else if (cgroupsBaseDir == null || cgroupsBaseDir.trim().length() == 0) {
useCgroups = false;
logger.info("nodeAgent", null,
"------- CGroups Disabled - Unable to Find Cgroups Root Directory in /proc/mounts");
} else {
logger.info("nodeAgent", null, "Agent found cgroups runtime in " + cgUtilsPath
+ " cgroups base dir=" + cgroupsBaseDir);
// if cpuacct is configured in cgroups, the subsystems list will be updated
String cgroupsSubsystems = "memory,cpu";
long maxTimeToWaitForProcessToStop = 60000; // default 1 minute
if (configurationFactory.processStopTimeout != null) {
maxTimeToWaitForProcessToStop = Long.valueOf(configurationFactory.processStopTimeout);
}
cgroupsManager = new CGroupsManager(cgUtilsPath, cgroupsBaseDir, cgroupsSubsystems,
logger, maxTimeToWaitForProcessToStop);
cgroupsManager.configure(this);
// check if cgroups base directory exists in the filesystem
// which means that cgroups
// and cgroups convenience package are installed and the
// daemon is up and running.
if (cgroupsManager.cgroupExists(cgroupsBaseDir)) {
logger.info("nodeAgent", null,
"Agent found cgroup base directory in " + cgroupsBaseDir);
try {
String containerId = "test";
// validate cgroups by creating a dummy cgroup. The code checks if cgroup actually
// got created by
// verifying existence of test cgroup file. The second step in verification is to
// check if
// CPU control is working. Configured in cgconfig.conf, the CPU control allows for
// setting
// cpu.shares. The code will attempt to set the shares and subsequently tries to
// read the
// value from cpu.shares file to make sure the values match. Any exception in the
// above steps
// will cause cgroups to be disabled.
//
cgroupsManager.validator(cgroupsBaseDir, containerId,
System.getProperty("user.name"), false).cgcreate().cgset(100); // write
// cpu.shares=100
// and
// validate
// cleanup dummy cgroup
cgroupsManager.destroyContainer(containerId, System.getProperty("user.name"),
SIGKILL);
useCgroups = true;
} catch (CGroupsManager.CGroupsException ee) {
logger.info("nodeAgent", null, ee);
cgroupFailureReason = ee.getMessage();
useCgroups = false;
}
if (useCgroups) {
try {
// remove stale CGroups
cgroupsManager.cleanup();
} catch (Exception e) {
logger.error("nodeAgent", null, e);
useCgroups = false;
logger.info("nodeAgent", null,
"Agent cgroup cleanup failed on this machine base directory in "
+ cgroupsBaseDir
+ ". Check if cgroups is installed on this node, Agent has correct permissions (consistent with cgconfig.conf), and the cgroup daemon is running");
cgroupFailureReason = "------- CGroups Not Working on this Machine";
}
} else {
logger.info("nodeAgent", null,
"Agent cgroup test failed on this machine base directory in "
+ cgroupsBaseDir
+ ". Check if cgroups is installed on this node, Agent has correct permissions (consistent with cgconfig.conf), and the cgroup daemon is running");
cgroupFailureReason = "------- CGroups Not Working on this Machine";
}
} else {
logger.info("nodeAgent", null, "Agent failed to find cgroup base directory in "
+ cgroupsBaseDir
+ ". Check if cgroups is installed on this node and the cgroup daemon is running");
// logger.info("nodeAgent", null, "------- CGroups Not Installed on this Machine");
cgroupFailureReason = "------- CGroups Not Installed on this Machine";
}
}
}
}
} else {
logger.info("nodeAgent", null, "------- CGroups Not Enabled on this Machine");
cgroupFailureReason = "------- CGroups Not Enabled on this Machine - check ducc.properties: ducc.agent.launcher.cgroups.enable ";
}
// begin publishing node metrics
factory.startNodeMetrics(this);
logger.info("nodeAgent", null,
"CGroup Support=" + useCgroups + " excludeNodeFromCGroups=" + excludeNodeFromCGroups
+ " excludeAPs=" + excludeAPs + " CGroups utils Dir:" + cgUtilsPath);
String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null && useSpawn.toLowerCase().equals("true")) {
runWithDuccLing = true;
String c_launcher_path = Utils.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"), System.getProperties());
try {
File duccLing = new File(c_launcher_path);
if (duccLing.exists()) {
duccLingExists = true;
}
} catch (Exception e) {
logger.info("nodeAgent", null,
"------- Agent failed while checking for existence of ducc_ling", e);
}
}
}
private String fetchCgroupsBaseDir(String mounts) {
String cbaseDir = null;
BufferedReader br = null;
try {
FileInputStream fis = new FileInputStream(mounts);
// Construct BufferedReader from InputStreamReader
br = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
if (line.trim().startsWith("cgroup")) {
String[] cgroupsInfo = line.split(" ");
if (cgroupsInfo[1].trim().equals("/cgroup")) {
cbaseDir = cgroupsInfo[1].trim();
break;
} else if (cgroupsInfo[1].trim().endsWith("/memory")) {
// return the mount point minus the memory part
cbaseDir = cgroupsInfo[1].substring(0, cgroupsInfo[1].indexOf("/memory"));
break;
}
}
} // while
} catch (Exception e) {
logger.info("nodeAgent", null,
"------- Agent failed while checking for existence of CGroups", e.getMessage());
} finally {
if (br != null) {
try {
br.close();
} catch (Exception ex) {
}
}
}
return cbaseDir;
}
public int getNodeProcessors() {
return runOSCommand(new String[] { "/usr/bin/getconf", "_NPROCESSORS_ONLN" });
}
public int getOSPageSize() {
return runOSCommand(new String[] { "/usr/bin/getconf", "PAGESIZE" });
}
public int getOSClockRate() {
return runOSCommand(new String[] { "/usr/bin/getconf", "CLK_TCK" });
}
private int runOSCommand(String[] cmd) {
InputStreamReader in = null;
BufferedReader reader = null;
int retVal = 0;
try {
ProcessBuilder pb = new ProcessBuilder();
pb.command(cmd);
pb.redirectErrorStream(true);
Process p = pb.start();
in = new InputStreamReader(p.getInputStream());
reader = new BufferedReader(in);
String line = null;
while ((line = reader.readLine()) != null) {
retVal = Integer.parseInt(line.trim());
}
} catch (Exception e) {
logger.error("runOSCommand", null, e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (Exception ex) {
}
}
}
return retVal;
}
public void setNodeInfo(Node node) {
this.node = node;
}
public Node getNodeInfo() {
return node;
}
public int getNodeTotalNumberOfShares() {
int shareQuantum = 0;
int shares = 1;
if (System.getProperty("ducc.rm.share.quantum") != null
&& System.getProperty("ducc.rm.share.quantum").trim().length() > 0) {
shareQuantum = Integer.parseInt(System.getProperty("ducc.rm.share.quantum").trim());
shares = (int) getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() / shareQuantum; // get
// number
// of
// shares
if ((getNodeInfo().getNodeMetrics().getNodeMemory().getMemTotal() % shareQuantum) > 0)
shares++; // ciel
}
return shares;
}
public void start(DuccService service) throws Exception {
super.start(service, null);
String methodName = "start";
String name = nodeIdentity.getShortName();
String ip = nodeIdentity.getIp();
String jmxUrl = getProcessJmxUrl();
DuccDaemonRuntimeProperties.getInstance().bootAgent(name, ip, jmxUrl);
String key = "ducc.broker.url";
String value = System.getProperty(key);
logger.info(methodName, null, key + "=" + value);
stateChange(EventType.BOOT);
}
public DuccEventDispatcher getEventDispatcherForRemoteProcess() {
return commonProcessDispatcher;
}
public boolean duccLingExists() {
return duccLingExists;
}
public void duccLingExists(boolean duccLingExists) {
this.duccLingExists = duccLingExists;
}
public boolean runWithDuccLing() {
return runWithDuccLing;
}
public void runWithDuccLing(boolean runWithDuccLing) {
this.runWithDuccLing = runWithDuccLing;
}
/**
* Returns deep copy (by way of java serialization) of the Agents inventory.
*/
@SuppressWarnings("unchecked")
public Map<DuccId, IDuccProcess> getInventoryCopy() {
Object deepCopy = null;
try {
inventorySemaphore.acquire();
deepCopy = SerializationUtils.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory);
} catch (InterruptedException e) {
} finally {
inventorySemaphore.release();
}
return (Map<DuccId, IDuccProcess>) deepCopy;
}
/**
* Returns shallow copy of the Agent's inventory
*/
public Map<DuccId, IDuccProcess> getInventoryRef() {
return inventory;
}
/*
* Check if both the command and its args are missing, since the command defaults to the DUCC JVM.
*/
private boolean invalidCommand(ICommandLine commandLine) {
if (commandLine != null) {
if (commandLine.getExecutable() != null && commandLine.getExecutable().length() > 0)
return false;
if (commandLine.getCommandLine() != null && commandLine.getCommandLine().length > 0)
return false;
}
return true;
}
private boolean isProcessDeallocated(IDuccProcess process) {
return (process.getProcessState().equals(ProcessState.Undefined) && process.isDeallocated());
}
/**
* Stops any process that is in agent's inventory but not in provided job list sent by the PM.
*
* @param lifecycleController
* - instance implementing stopProcess() method
* @param jobDeploymentList
* - all DUCC jobs sent by PM
*/
public void takeDownProcessWithNoJob(ProcessLifecycleController lifecycleController,
List<IDuccJobDeployment> jobDeploymentList) {
String methodName = "takeDownProcessWithNoJob";
try {
inventorySemaphore.acquire();
List<IDuccProcess> purgeList = new ArrayList<IDuccProcess>();
boolean hasAjob = false;
// Check if every process in agent's inventory is associated with a
// job in a given
// jobDeploymentList
for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
// if a job list is empty, take down all agent processes that
// are in the inventory
if (jobDeploymentList.isEmpty()) {
logger.info(methodName, null, "...Agent Process:" + processEntry.getValue().getDuccId()
+ " Not in JobDeploymentList. Ducc Currently Has No Jobs Running");
hasAjob = false;
} else {
// iterate over all jobs
for (IDuccJobDeployment job : jobDeploymentList) {
// check if current process is a JD
if (job.getJdProcess() != null) {
if (processEntry.getValue().getDuccId().equals(job.getJdProcess().getDuccId())) {
hasAjob = true;
break;
}
}
// check if current process is a JP
for (IDuccProcess jProcess : job.getJpProcessList()) {
if (processEntry.getValue().getDuccId().equals(jProcess.getDuccId())) {
hasAjob = true;
break;
}
}
if (hasAjob) {
break;
}
}
}
if (!hasAjob) {
// if a process in agent inventory has no job and is still
// alive, stop it
if (isAlive(processEntry.getValue())) {
logger.error(methodName, null,
"<<<<<<<<< Stopping Process with no Job Assignement (Ghost Process) - DuccId:"
+ processEntry.getValue().getDuccId() + " PID:"
+ processEntry.getValue().getPID());
processEntry.getValue().setReasonForStoppingProcess(
ReasonForStoppingProcess.JPHasNoActiveJob.toString());
lifecycleController.stopProcess(processEntry.getValue());
} else {
// add process to purge list
purgeList.add(processEntry.getValue());
}
} else {
hasAjob = false;
}
}
for (IDuccProcess processToPurge : purgeList) {
logger.error(methodName, null,
"XXXXXXXXXX Purging Process:" + processToPurge.getDuccId() + " Process State:"
+ processToPurge.getProcessState() + " Process Resource State:"
+ processToPurge.getResourceState());
getInventoryRef().remove(processToPurge.getDuccId());
}
} catch (Exception e) {
} finally {
inventorySemaphore.release();
}
}
private void stopProcessIfAlive(IDuccProcess process,
ProcessLifecycleController lifecycleController) {
String methodName = "stopProcessIfAlive";
if (isAlive(process)) {
logger.error(methodName, null,
"<<<<<<<<< Stopping Process with no Job Assignement (Ghost Process) - DuccId:"
+ process.getDuccId() + " PID:" + process.getPID());
process.setReasonForStoppingProcess(ReasonForStoppingProcess.JPHasNoActiveJob.toString());
lifecycleController.stopProcess(process);
} else {
logger.error(methodName, null,
"XXXXXXXXXX Purging Process:" + process.getDuccId() + " Process State:"
+ process.getProcessState() + " Process Resource State:"
+ process.getResourceState());
getInventoryRef().remove(process.getDuccId());
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) {
it.remove();
break;
}
}
logger.info(methodName, null, "After Purge Inventory size:" + inventory.size()
+ " deployedProcesses size:" + deployedProcesses.size());
}
}
/*
* Valid process exists in agent inventory and in an incoming OR state. If process exists in agent
* inventory but not in OR state than such process is invalid
*/
private boolean validProcess(IDuccProcess process, List<IDuccJobDeployment> jobDeploymentList) {
// iterate over all jobs
for (IDuccJobDeployment job : jobDeploymentList) {
// check if current process is a JD
if ((job.getJdProcess() != null
&& process.getDuccId().equals(job.getJdProcess().getDuccId()))) {
return true;
} else {
// check if current process is a JP
for (IDuccProcess jProcess : job.getJpProcessList()) {
if (process.getDuccId().equals(jProcess.getDuccId())) {
return true;
}
}
}
}
return false;
}
public void takeDownProcessWithNoJobV2(ProcessLifecycleController lifecycleController,
List<IDuccJobDeployment> jobDeploymentList) {
String methodName = "takeDownProcessWithNoJobV2";
try {
inventorySemaphore.acquire();
// iterate over all processes in agent inventory
for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
// if OR deployment list is empty, take down all agent processes that
// are in the inventory
if (jobDeploymentList.isEmpty()
|| !validProcess(processEntry.getValue(), jobDeploymentList)) {
logger.info(methodName, null, "...Agent Process:" + processEntry.getValue().getDuccId()
+ " Not in OR JobDeploymentList");
stopProcessIfAlive(processEntry.getValue(), lifecycleController);
}
}
} catch (Exception e) {
logger.error(methodName, null, e);
} finally {
inventorySemaphore.release();
}
}
/**
* Reconciles agent inventory with job processes sent by PM
*
* @param lifecycleController
* - instance implementing stopProcess and startProcess
* @param process
* - job process from a Job List
* @param commandLine
* - in case this process is not in agents inventory we need cmd line to start it
* @param info
* - DUCC common info including user log dir, user name, etc
* @param workDuccId
* - job id
*/
public void reconcileProcessStateAndTakeAction(ProcessLifecycleController lifecycleController,
IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
ProcessMemoryAssignment processMemoryAssignment, DuccId workDuccId,
boolean isPreemptable) {
String methodName = "reconcileProcessStateAndTakeAction";
try {
inventorySemaphore.acquire();
// Check if process exists in agent's inventory
if (getInventoryRef().containsKey(process.getDuccId())) {
IDuccProcess agentManagedProcess = getInventoryRef().get(process.getDuccId());
// check if process is Running, Initializing, Started, or Starting
if (isAlive(agentManagedProcess)) {
// Stop the process if it has been deallocated
if (process.isDeallocated()) {
// if agent is in stopping state, it will try to stop
// its processes.
if (stopping && isPreemptable) {
logger.info(methodName, workDuccId, ">>>>>>>> Agent is stopping - process with PID:"
+ process.getPID() + " is stopping");
return; // agent is stopping. All processes are stopping
} else {
agentManagedProcess.setResourceState(ResourceState.Deallocated);
logger.info(methodName, workDuccId,
"<<<<<<<< Agent Stopping Process:" + process.getDuccId() + " PID:"
+ process.getPID() + " Reason: Ducc Deallocated the Process.");
lifecycleController.stopProcess(agentManagedProcess);
}
}
// else nothing to do. Process has been deallocated
}
} else { // Process not in agent's inventory
// Add this process to the inventory so that it gets published.
getInventoryRef().put(process.getDuccId(), process);
if (process.isFailed()) {
// When a process scheduling class is invalid, the AgentEventListener will
// tag it as FAILED
process.setReasonForStoppingProcess(
IDuccProcess.ReasonForStoppingProcess.InvalidSchedulingClass.name());
ITimeWindow twr = new TimeWindow();
process.setTimeWindowRun(twr);
twr.setStartLong(0);
twr.setEndLong(0);
ITimeWindow twi = new TimeWindow();
process.setTimeWindowInit(twi);
twi.setStartLong(0);
twi.setEndLong(0);
} else if (process.isDeallocated()) {
// process not in agent's inventory and it is marked as
// deallocated. This can happen when an agent is restarted
// while the rest of DUCC is running.
// markAsStopped(process);
process.setProcessState(ProcessState.Stopped);
} else if (process.getResourceState().equals(ResourceState.Allocated)) {
// check if OR thinks that this process is still running. If
// so, this agent was restarted while the OR was running and
// we need to mark the process as Failed.
if (process.getProcessState().equals(ProcessState.Initializing)
|| process.getProcessState().equals(ProcessState.Running)) {
process.setProcessState(ProcessState.Failed);
} else {
// enforce presence of command line
if (invalidCommand(commandLine)) {
process.setProcessState(ProcessState.Failed);
logger.info(methodName, workDuccId,
"Rejecting Process Start Request. Command line not provided for Process ID:"
+ process.getDuccId());
process.setReasonForStoppingProcess(
IDuccProcess.ReasonForStoppingProcess.CommandLineMissing.name());
} else {
if (stopping) {
process.setProcessState(ProcessState.Rejected);
logger.info(methodName, workDuccId, ">>>>>>> Agent Rejected Process:"
+ process.getDuccId() + " Start Request - Agent is stopping");
return;
}
process.setProcessState(ProcessState.Starting);
logger.info(methodName, workDuccId,
">>>>>>> Agent Starting Process:" + process.getDuccId() + " Process State:"
+ process.getProcessState() + " Process Resource State:"
+ process.getResourceState());
lifecycleController.startProcess(process, commandLine, info, workDuccId,
processMemoryAssignment, isPreemptable);
}
}
}
}
} catch (Exception e) {
logger.error(methodName, workDuccId, e);
} finally {
inventorySemaphore.release();
}
}
public void doStartProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
DuccId workDuccId, boolean isPreemptable) {
String methodName = "doStartProcess";
try {
inventorySemaphore.acquire();
if (getInventoryRef().containsKey(process.getDuccId())) {
logger.error(methodName, null, "Rejecting Process Start Request. Process with a Ducc ID:"
+ process.getDuccId() + " is already in agent's inventory.");
return;
}
startProcess(process, commandLine, info, workDuccId, new ProcessMemoryAssignment(),
isPreemptable);
} catch (InterruptedException e) {
logger.error(methodName, null, e);
} finally {
inventorySemaphore.release();
}
}
private boolean isProcessRunning(IDuccProcess process) {
if (process.getProcessState().equals(ProcessState.Running)
|| process.getProcessState().equals(ProcessState.Initializing)) {
return true;
}
return false;
}
private boolean isOverSwapLimit(IDuccProcess process) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
// for (ManagedProcess deployedProcess : deployedProcesses) {
// Check if this process exceeds its alloted max swap usage
if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())
&& process.getSwapUsage() > deployedProcess.getMaxSwapThreshold()) {
return true;
}
}
return false;
}
private long getSwapOverLimit(IDuccProcess process) {
long overLimit = 0;
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// for (ManagedProcess deployedProcess : deployedProcesses) {
ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) {
overLimit = deployedProcess.getMaxSwapThreshold() - process.getSwapUsage();
}
}
if (overLimit < 0) {
overLimit = 0;
}
return overLimit;
}
/**
* Called when swap space on a node reached minimum as defined by ducc.node.min.swap.threshold in
* ducc.properties. The agent will find the biggest (in terms of memory) process in its inventory
* and stop it.
*/
public void killProcessDueToLowSwapSpace(long minSwapThreshold) {
String methodName = "killProcessDueToLowSwapSpace";
IDuccProcess biggestProcess = null;
try {
inventorySemaphore.acquire();
// find the fattest process in terms of absolute use of swap over the process limit
for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
if (isProcessRunning(processEntry.getValue()) && isOverSwapLimit(processEntry.getValue())
&& (biggestProcess == null || getSwapOverLimit(biggestProcess) < getSwapOverLimit(
processEntry.getValue()))) {
biggestProcess = processEntry.getValue();
}
}
} catch (InterruptedException e) {
logger.error(methodName, null, e);
} finally {
inventorySemaphore.release();
}
if (biggestProcess != null) {
biggestProcess.setReasonForStoppingProcess(ReasonForStoppingProcess.LowSwapSpace.toString());
logger.info(methodName, null, "Stopping Process:" + biggestProcess.getDuccId() + " PID:"
+ biggestProcess.getPID()
+ " Due to a low swap space. Process' RSS exceeds configured swap threshold of "
+ minSwapThreshold
+ " Defined in ducc.properties. Check ducc.node.min.swap.threshold property");
stopProcess(biggestProcess);
}
}
public void interruptThreadInWaitFor(String pid) throws Exception {
String methodName = "interruptZombieProcess";
synchronized (monitor) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// for (ManagedProcess dProcess : deployedProcesses) {
ManagedProcess dProcess = it.next();
if (dProcess.getPid() != null && dProcess.getPid().equals(pid)) {
Future<?> future = dProcess.getFuture();
if (future != null && !future.isDone() && !future.isCancelled()) {
future.cancel(true); // interrupt the thread blocked on waitFor()
logger.info(methodName, dProcess.getDuccProcess().getDuccId(),
"Interrupted Thread - Zombie Process with PID:" + dProcess.getPid());
}
break;
}
}
}
}
/**
* Called when Agent receives request to start a process.
*
* @param process
* - IDuccProcess instance with identity (DuccId)
* @param commandLine
* - fully defined command line that will be used to exec the process.
*
*/
public void startProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment,
boolean isPreemptable) {
String methodName = "startProcess";
try {
// Add process to the Agent's inventory before it is started
getInventoryRef().put(process.getDuccId(), process);
// enforce presence of command line
if (invalidCommand(commandLine)) {
process.setProcessState(ProcessState.Failed);
logger.info(methodName, null,
"Rejecting Process Start Request. Command line not provided for Process ID:"
+ process.getDuccId());
} else if (isProcessDeallocated(process)) {
process.setProcessState(ProcessState.Stopped);
logger.info(methodName, null, "Rejecting Process Start Request. Process ID:"
+ process.getDuccId() + " hava already been deallocated due to Shrink");
} else {
deployProcess(process, commandLine, info, workDuccId, processMemoryAssignment,
isPreemptable);
}
} catch (Exception e) {
logger.error(methodName, null, e);
}
}
public boolean isAlive(IDuccProcess invProcess) {
return invProcess.getProcessState().equals(ProcessState.Initializing)
|| invProcess.getProcessState().equals(ProcessState.Running)
|| invProcess.getProcessState().equals(ProcessState.Stopping)
|| invProcess.getProcessState().equals(ProcessState.Starting)
|| invProcess.getProcessState().equals(ProcessState.Started);
}
public void doStopProcess(IDuccProcess process) {
String methodName = "stopProcess";
try {
inventorySemaphore.acquire();
stopProcess(process);
} catch (InterruptedException e) {
logger.error(methodName, null, e);
} finally {
inventorySemaphore.release();
}
}
/**
* Called when Agent receives request to stop a process.
*
* @param process
* - IDuccProcess instance with identity (DuccId)
*
*/
public void stopProcess(IDuccProcess process) {
String methodName = "stopProcess";
try {
IDuccProcess invProcess = null;
if ((invProcess = getInventoryRef().get(process.getDuccId())) != null
&& isAlive(invProcess)) {
logger.info(methodName, null, "Undeploing Process with PID:" + process.getPID());
undeployProcess(process);
} else if (invProcess == null) { // process not in inventory
logger.info(methodName, null,
"Agent received Stop request for a process which is not in the Agent's inventory. "
+ "It looks like this Agent was killed along with its child processes. Adding stale process to the inventory. PID:"
+ process.getPID() + " DuccId:" + process.getDuccId() + "");
// Received a request to stop a process that this is not in the
// current
// inventory. Most likely this agent was killed while its
// processes were
// still running. Add the process to the agent's inventory so
// that its
// included in the next published inventory. This is done so
// that the
// orchestrator can cleanup its state.
if (process.getProcessState() != ProcessState.Stopped
&& process.getProcessState() != ProcessState.Failed
&& process.getProcessState() != ProcessState.InitializationTimeout
&& process.getProcessState() != ProcessState.FailedInitialization) {
// Force the Stopped state if not already stopped or failed
process.setProcessState(ProcessState.Stopped);
}
// Add stale process to the inventory. This will eventually be
// cleaned up
// when the PM sends purge request.
getInventoryRef().put(process.getDuccId(), process);
}
} catch (Exception e) {
logger.error(methodName, null, e);
}
}
/**
* Checks if process with a given PID has already registered memory collector Camel route. This
* route periodically fetches resident memory of a process with a given PID. Each process
* collector route is identified by process PID.
*
* @param pid
* - process PID also id of its route
* @return - true if memory collector route has already been created. False, otherwise
*/
private boolean addProcessMemoryCollector(String pid) {
// search all camel routes for one with a given id
for (Route route : super.getContext().getRoutes()) {
if (route.getId().equals(pid)) {
return false;
}
}
return true;
}
/**
* Remove given process from Agent's inventory
*
* @param process
* - process to purge from inventory
* @throws Exception
*/
public void purgeProcess(IDuccProcess process) throws Exception {
String methodName = "purgeProcess";
DuccId key = null;
String pid = "";
try {
inventorySemaphore.acquire();
for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
// Check if process with a given unique DuccId exist in the
// local map
if (processEntry.getKey().equals(process.getDuccId())) {
key = processEntry.getKey();
pid = processEntry.getValue().getPID();
break;
}
}
if (key != null) {
getInventoryRef().remove(key);
logger.info(methodName, null, ">>>> Agent Purged Process with PID:" + pid);
}
} catch (InterruptedException e) {
} finally {
inventorySemaphore.release();
}
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// for (ManagedProcess deployedProcess : deployedProcesses) {
ManagedProcess deployedProcess = it.next();
// Find ManagedProcess instance the DuccProcess instance is
// associated with
if (deployedProcess.getDuccProcess().getDuccId().equals(process.getDuccId())) {
processUndeploy(deployedProcess, it);
break;
}
}
}
private boolean changeState(ProcessState state) {
switch (state) {
case FailedInitialization:
case InitializationTimeout:
case Stopped:
case Stopping:
return false;
case Starting:
case Started:
case Initializing:
case Running:
return true;
default:
break;
}
return false;
}
/**
* Called when a service wrapper sends status update
*
* @param duccEvent
* - Ducc event object
*
* @throws Exception
*/
public void updateProcessStatus(ProcessStateUpdateDuccEvent duccEvent) throws Exception {
String methodName = "updateProcessStatus";
try {
inventorySemaphore.acquire();
for (Entry<DuccId, IDuccProcess> processEntry : getInventoryRef().entrySet()) {
// Check if process with a given unique DuccId exist in the
// local map
if (processEntry.getKey().getUnique().equals(duccEvent.getDuccProcessId())) {
// found it. Update pid and state of the process
if (duccEvent.getPid() != null && processEntry.getValue().getPID() == null) {
processEntry.getValue().setPID(duccEvent.getPid());
}
if (duccEvent.getProcessJmxUrl() != null
&& processEntry.getValue().getProcessJmxUrl() == null) {
processEntry.getValue().setProcessJmxUrl(duccEvent.getProcessJmxUrl());
}
ITimeWindow tw = processEntry.getValue().getTimeWindowInit();
if (tw != null) {
if (!duccEvent.getState().equals(ProcessState.Initializing)) {
// Mark the time the process ended initialization. It also
// covers a case when the process terminates while initializing
tw.setEnd(TimeStamp.getCurrentMillis());
if (duccEvent.getState().equals(ProcessState.Running)) {
ITimeWindow twr = new TimeWindow();
String millis;
millis = TimeStamp.getCurrentMillis();
// Mark the time the process started running
processEntry.getValue().setTimeWindowRun(twr);
twr.setStart(millis);
}
}
} else {
logger.info(methodName, null,
"++++++++++++ Agent Init TimeWindow not available - tw==null");
}
ManagedProcess deployedProcess = null;
synchronized (monitor) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// Find ManagedProcess instance the DuccProcess
// instance is associated with
ManagedProcess dProcess = it.next();
if (dProcess.getDuccProcess().getDuccId().getUnique()
.equals(duccEvent.getDuccProcessId())) {
deployedProcess = dProcess;
break;
}
}
}
if (processEntry.getValue().getProcessState() != ProcessState.Running
&& duccEvent.getState().equals(ProcessState.Running) && deployedProcess != null) {
// cancel process initialization timer.
deployedProcess.stopInitializationTimer();
}
logger.info(methodName, null,
">>>> Agent Handling Process Update - Ducc Id: "
+ processEntry.getValue().getDuccId() + " PID:"
+ processEntry.getValue().getPID() + " Status:" + duccEvent.getState()
+ " Deallocated:" + processEntry.getValue().isDeallocated());
if (deployedProcess != null && deployedProcess.getSocketEndpoint() == null
&& duccEvent.getServiceEdnpoint() != null) {
deployedProcess.setSocketEndpoint(duccEvent.getServiceEdnpoint());
}
// This is a delayed stop. Previously a request to stop the
// process was received
// but the PID was not available yet. Instead a flag was set
// to initiate a
// stop after the process reports the PID.
if (deployedProcess != null && deployedProcess.killAfterLaunch()) {
logger.info(methodName, null,
">>>> Process Ducc Id:" + processEntry.getValue().getDuccId()
+ " Was Previously Tagged for Kill While It Was Starting");
undeployProcess(processEntry.getValue());
} else if (deployedProcess != null && deployedProcess.doKill() && deployedProcess
.getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
deployedProcess.getDuccProcess()
.setReasonForStoppingProcess(ReasonForStoppingProcess.KilledByDucc.toString());
} else if (deployedProcess != null && (deployedProcess.doKill()
|| deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Failed)
|| deployedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Killed))) {
// The process has already stopped, but managed to send
// the last update before dying. Ignore the update
return;
} else if (changeState(processEntry.getValue().getProcessState())) {
logger.info(methodName, null, "=============== PID:" + processEntry.getValue().getPID()
+ " Changing State - current state:" + processEntry.getValue().getProcessState()
+ " New State:" + duccEvent.getState());
processEntry.getValue().setProcessState(duccEvent.getState());
DuccEventDispatcher dispatcher = configurationFactory
.getORDispatcher(super.getContext());
try {
DefaultNodeInventoryProcessor processor = configurationFactory
.nodeInventoryProcessor(this);
Map<DuccId, IDuccProcess> inventoryCopy = (Map<DuccId, IDuccProcess>) SerializationUtils
.clone((ConcurrentHashMap<DuccId, IDuccProcess>) inventory);
processor.dispatchInventoryUpdate(dispatcher,
configurationFactory.getInventoryUpdateEndpoint(), inventoryCopy);
logger.info(methodName, null, "Sent Node Inventory Update to the OR - process PID:"
+ processEntry.getValue().getPID());
} catch (Exception e) {
logger.warn("", null, e);
}
// if the process is Stopping, it must have hit an error threshold
}
// Check if MemoryCollector should be created for this
// process. It collects
// resident memory of the process at regular intervals.
// Should only be added
// once for each process. This route will have its id set to
// process PID.
if (addProcessMemoryCollector(duccEvent.getPid())
&& (duccEvent.getState().equals(ProcessState.Initializing)
|| duccEvent.getState().equals(ProcessState.Running))) {
if (duccEvent.getState().equals(ProcessState.Running)) {
if (processEntry.getValue().getUimaPipelineComponents() != null
&& processEntry.getValue().getUimaPipelineComponents().size() > 0) {
processEntry.getValue().getUimaPipelineComponents().clear();
if (duccEvent.getUimaPipeline() != null) {
duccEvent.getUimaPipeline().clear();
}
}
}
} else if (duccEvent.getState().equals(ProcessState.Stopped)
|| duccEvent.getState().equals(ProcessState.Failed)
|| duccEvent.getState().equals(ProcessState.Killed)) {
if (deployedProcess.getMetricsProcessor() != null) {
deployedProcess.getMetricsProcessor().close(); // close open fds (stat and statm
// files)
}
logger.info(methodName, null,
"----------- Agent Stopped ProcessMemoryUsagePollingRouter for Process:"
+ duccEvent.getPid());
} else if (duccEvent.getState().equals(ProcessState.FailedInitialization)) {
logger.info(methodName, null,
">>>> Agent Handling Process FailedInitialization. PID:" + duccEvent.getPid());
deployedProcess.getDuccProcess().setReasonForStoppingProcess(
ReasonForStoppingProcess.FailedInitialization.toString());
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
deployedProcess.kill();
logger.info(methodName, null, ">>>> Agent Handling Process FailedInitialization. PID:"
+ duccEvent.getPid() + " Killing Process");
undeployProcess(processEntry.getValue());
} else if (duccEvent.getState().equals(ProcessState.InitializationTimeout)) {
deployedProcess.getDuccProcess().setReasonForStoppingProcess(
ReasonForStoppingProcess.InitializationTimeout.toString());
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
// Mark process for death. Doesnt actually kill the process
deployedProcess.kill();
logger.info(methodName, null, ">>>> Agent Handling Process InitializationTimeout. PID:"
+ duccEvent.getPid() + " Killing Process");
undeployProcess(processEntry.getValue());
} else if (duccEvent.getState().equals(ProcessState.Stopping)) {
if (duccEvent.getMessage() != null && duccEvent.getMessage()
.equals(ReasonForStoppingProcess.ExceededErrorThreshold.toString())) {
processEntry.getValue().setReasonForStoppingProcess(
ReasonForStoppingProcess.ExceededErrorThreshold.toString());
}
if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)
&& !deployedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Stopping)) {
deployedProcess.getDuccProcess().setProcessState(ProcessState.Stopping);
deployedProcess.setStopping();
}
}
if (duccEvent.getUimaPipeline() != null) {
StringBuffer buffer = new StringBuffer("\t\tUima Pipeline -");
for (IUimaPipelineAEComponent uimaAeState : duccEvent.getUimaPipeline()) {
buffer.append("\n\t\tAE:").append(uimaAeState.getAeName()).append(" state:")
.append(uimaAeState.getAeState()).append(" InitTime:")
.append(uimaAeState.getInitializationTime() / 1000).append(" secs. Thread:")
.append(uimaAeState.getAeThreadId());
}
logger.info(methodName, null, buffer.toString());
((DuccProcess) processEntry.getValue())
.setUimaPipelineComponents(duccEvent.getUimaPipeline());
}
return; // found it. Done
}
}
} catch (InterruptedException e) {
} finally {
inventorySemaphore.release();
}
}
/**
* Deploys process using supplied command line
*
* @param process
* - Process with identity (DuccId)
* @param commandLine
* - fully defined command line that will be used to exec the process.
*/
private void deployProcess(IDuccProcess process, ICommandLine commandLine, IDuccStandardInfo info,
DuccId workDuccId, ProcessMemoryAssignment processMemoryAssignment, boolean preemptable) {
String methodName = "deployProcess";
synchronized (monitor) {
boolean deployProcess = true;
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// for (ManagedProcess deployedProcess : deployedProcesses) {
ManagedProcess deployedProcess = it.next();
// ignore duplicate start request for the same process
if (deployedProcess.getDuccId().equals(process.getDuccId())) {
deployProcess = false;
break;
}
}
if (deployProcess) {
try {
logger.info(methodName, workDuccId,
"Agent [" + getIdentity().getIp() + "] Deploying Process - DuccID:"
+ process.getDuccId().getFriendly() + " Unique DuccID:"
+ process.getDuccId().getUnique() + " Node Assignment:"
+ process.getNodeIdentity().getIp() + " Process Memory Assignment:"
+ processMemoryAssignment + " MBs");
TimeWindow tw = new TimeWindow();
tw.setStart(TimeStamp.getCurrentMillis());
tw.setEnd(null);
process.setTimeWindowInit(tw);
ManagedProcess managedProcess = new ManagedProcess(process, commandLine, this, logger,
processMemoryAssignment, preemptable);
managedProcess.setProcessInfo(info);
managedProcess.setWorkDuccId(workDuccId);
// enrich process spec with unique ducc id which will be
// used to correlate message
// exchanges
// between the agent and launched process
ManagedProcess deployedProcess = launcher.launchProcess(this, getIdentity(), process,
commandLine, this, managedProcess);
processDeploy(deployedProcess);
} catch (Exception e) {
logger.error(methodName, null, e);
}
} else {
logger.info(methodName, workDuccId,
"Ignoring duplicate request to start process - DuccID:"
+ process.getDuccId().getFriendly() + " Unique DuccID:"
+ process.getDuccId().getUnique());
}
}
}
class AgentStreamConsumer implements Runnable {
private InputStream theStream;
AgentStreamConsumer(InputStream is) {
theStream = is;
}
public void run() {
String methodName = "AgentStreamConsumer.run";
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(theStream));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
StringBuffer outputBuffer = new StringBuffer();
outputBuffer.append(line + "\n");
}
} catch (Throwable t) {
logger.warn(methodName, null, t);
t.printStackTrace();
} finally {
try {
bufferedReader.close();
} catch (Exception e) {
}
}
}
}
enum SIGNAL {
SIGTERM("-15"), SIGKILL("-9");
String signal = "";
SIGNAL(String kind) {
signal = kind;
}
public String get() {
return signal;
}
};
class ProcessRunner implements Runnable {
ManagedProcess deployedProcess;
public ProcessRunner(final ManagedProcess deployedProcess) {// final String pid, SIGNAL signal )
// {
this.deployedProcess = deployedProcess;
}
public void run() {
stopProcess(deployedProcess.getDuccProcess());
}
}
private boolean runnable(ManagedProcess process) {
return (process.getDuccProcess().getProcessState().equals(ProcessState.Initializing)
|| process.getDuccProcess().getProcessState().equals(ProcessState.Starting)
|| process.getDuccProcess().getProcessState().equals(ProcessState.Started)
|| process.getDuccProcess().getProcessState().equals(ProcessState.Running));
}
/**
* This method is called when an agent receives a STOP request. It sends SIGTERM to all
* non-preemptable child processes and starts a timer. If the timer pops and child processes are
* still running, the agent takes itself out via halt()
*/
private boolean stopChildProcesses(boolean quiesceMode) {
String methodName = "stopNow";
boolean wait = false;
try {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
String pid = deployedProcess.getDuccProcess().getPID();
logger.info(methodName, null, "....Process:" + pid + " is JD=" + deployedProcess.isJd()
+ " Preemptable:" + deployedProcess.isPreemptable());
// dont send SIGTERM to non-preemptable processes in quiesce mode
if ((quiesceMode && !deployedProcess.isPreemptable()) || deployedProcess.isStopping()
|| pid == null || pid.trim().length() == 0 || !runnable(deployedProcess)) {
continue;
}
logger.info(methodName, null,
"....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+ " PID:" + pid + " Sending SIGTERM Process State:"
+ deployedProcess.getDuccProcess().getProcessState().toString()
+ " Process Type:" + deployedProcess.getDuccProcess().getProcessType()
+ " Uima AS:" + deployedProcess.isUimaAs() + " Preemtable:"
+ deployedProcess.isPreemptable());
wait = true;
deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
// Stop each child process in its own thread to parallelize SIGTERM requests
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new ProcessRunner(deployedProcess));
}
} catch (Exception e) {
logger.warn(methodName, null, e);
}
return wait;
}
private void killChildProcesses(boolean killOnlyUimaAs, boolean quiesce) {
String methodName = "killChildProcesses";
try {
if (useCgroups) {
logger.info(methodName, null, "CgroupsManager.cleanup() before ");
if (killOnlyUimaAs) {
Set<String> pidsToKill = new HashSet<>();
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess p = it.next();
if (!p.isPreemptable() && p.getPid() != null && p.isUimaAs()) {
pidsToKill.add(p.getPid());
}
}
if (!pidsToKill.isEmpty()) {
logger.info(methodName, null, ">>>>>>>> Found " + pidsToKill.size()
+ " UIMA-AS processes still running - killing all non-preemptables via kill -9");
// Since SIGTERM may not be enough to take down a process, use cgroups to find
// any UIMA-AS process still standing and do hard kill
cgroupsManager.cleanupPids(pidsToKill);
}
} else {
Set<String> pidsToKill = new HashSet<>();
Iterator<ManagedProcess> it = deployedProcesses.iterator();
// Since SIGTERM may not be enough to take down a process, use cgroups to find
// any process still standing and do hard kill
while (it.hasNext()) {
ManagedProcess p = it.next();
if ((!quiesce && !p.isPreemptable()) && p.getPid() != null && p.isUimaAs()) {
pidsToKill.add(p.getPid());
}
}
// cgroupsManager.cleanup();
cgroupsManager.cleanupPids(pidsToKill);
}
logger.info(methodName, null, "CgroupsManager.cleanup() after ");
} else {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
String pid = deployedProcess.getDuccProcess().getPID();
if ((quiesce && !deployedProcess.isPreemptable()) || pid == null
|| pid.trim().length() == 0 || !runnable(deployedProcess)) {
continue;
}
logger.info(methodName, null,
"....Stopping Process - DuccId:" + deployedProcess.getDuccProcess().getDuccId()
+ " PID:" + pid + " Sending SIGKILL Process State:"
+ deployedProcess.getDuccProcess().getProcessState().toString());
ICommandLine cmdLine;
if (Utils.isWindows()) {
cmdLine = new NonJavaCommandLine("taskkill");
cmdLine.addArgument("/PID");
} else {
cmdLine = new NonJavaCommandLine("/bin/kill");
cmdLine.addArgument("-9");
}
cmdLine.addArgument(pid);
deployedProcess.setStopping();
deployedProcess.setStopPriority(StopPriority.DONT_WAIT);
launcher.launchProcess(this, getIdentity(), deployedProcess.getDuccProcess(), cmdLine,
this, deployedProcess);
}
}
} catch (Exception e) {
logger.warn(methodName, null, e);
}
}
private void handleSigTermTimeout(ManagedProcess deployedProcess) {
String methodName = "handleSigTermTimeout";
if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
logger.info(methodName, deployedProcess.getDuccId(),
"------------ Agent Timed-out Waiting for Process with PID:"
+ deployedProcess.getDuccProcess().getPID() + " to Stop. Process State:"
+ deployedProcess.getDuccProcess().getProcessState()
+ " .Process did not stop in allotted time of "
+ maxTimeToWaitForProcessToStop + " millis");
logger.info(methodName, deployedProcess.getDuccId(),
">>>>>>>>>>>>>>> Killing Process:" + deployedProcess.getDuccProcess().getPID()
+ " .Process State:" + deployedProcess.getDuccProcess().getProcessState());
}
ICommand sigKillCommand = new SigKillCommand(deployedProcess, logger);
launcher.launchOSCommand(sigKillCommand);
}
/**
* Kills a given process
*
* @param process
* - process to kill
*/
private void undeployProcess(IDuccProcess process) {
String methodName = "undeployProcess";
synchronized (monitor) {
ManagedProcess deployedProcess = getDeployedProcess(process.getDuccId());
// Given process does not exist in agent's inventory
if (Objects.isNull(deployedProcess)) {
logger.info(methodName, null,
".... Process - DuccId:" + process.getDuccId() + " PID:" + process.getPID()
+ " Not in Agent's inventory. Adding to the inventory with state=Stopped");
process.setProcessState(ProcessState.Stopped);
inventory.put(process.getDuccId(), process);
processDeploy(new ManagedProcess(process, null, this, logger, new ProcessMemoryAssignment(),
true));
return;
}
if (Objects.isNull(deployedProcess.getPid())) {
if (!deployedProcess.getDuccProcess().getProcessState().equals(ProcessState.Stopped)) {
// process not reported its PID yet when the process
// reports its PID, check if it should be killed.
logger.info(methodName, deployedProcess.getDuccId(), ".... Process - Ducc ID:"
+ deployedProcess.getDuccId()
+ " Has Not Started Yet. PID Not Available. Tagging Process For Kill When It Reports Status");
deployedProcess.killAfterLaunch(true);
}
return;
}
ICommand sigTermCommand = new SigTermCommand(deployedProcess, logger);
// Mark the process as stopping. When the process exits,
// the agent can determine if the process died on its own
// (due to say, user code problem) or if it died
// due to Agent initiated stop.
deployedProcess.setStopping();
try {
logger.info(methodName, deployedProcess.getDuccId(),
"------------ Agent Starting Killer Timer Task For Process with PID:"
+ deployedProcess.getDuccProcess().getPID() + " Process State: "
+ deployedProcess.getDuccProcess().getProcessState());
// launch SIGTERM
launcher.launchOSCommand(sigTermCommand);
// fetch deployed process Future object which
// will be used to determine if its still
// running
Future<?> future = deployedProcess.getFuture();
// future.get() blocks until process stops or timeout occurs
future.get(maxTimeToWaitForProcessToStop, TimeUnit.MILLISECONDS);
logger.info(methodName, deployedProcess.getDuccId(), ">>>>>>>>>>>>> Process with PID:"
+ deployedProcess.getDuccProcess().getPID() + " Terminated");
} catch (TimeoutException e) {
logger.info(methodName, deployedProcess.getDuccId(),
">>>>>>>>> Timed Out Waiting For Process To Terminate After SIGTERM Was Sent");
handleSigTermTimeout(deployedProcess);
} catch (Exception e) {
logger.error(methodName, deployedProcess.getDuccId(), e);
}
try {
monitor.wait(500);
} catch (InterruptedException ee) {
}
// check if defunct process
if (isProcessRunning(deployedProcess.getDuccProcess())) {
// wait for a short while before running DefunctProcessDetector
// Sometimes when a process is killed via kill -9 it shows as
// defunct in ps output. Not sure why this is so.
// spin a thread where we check if the process is defunct. If true,
// the process state is changed to Stopped and reason set to 'defunct'.
// Next inventory publication will include this new state and the OR
// can terminate a job.
defunctDetectorExecutor.execute(new DefunctProcessDetector(deployedProcess, logger));
}
logger.info(methodName, deployedProcess.getDuccId(), "Inventory size:" + inventory.size()
+ " deployedProcesses size:" + deployedProcesses.size());
}
}
public NodeIdentity getIdentity() {
return nodeIdentity;
}
/**
* Called when a process exits.
*/
public void onProcessExit(IDuccProcess process) {
String methodName = "onProcessExit";
if (process == null) {
return;
}
try {
ProcessStateUpdate processStateUpdate = new ProcessStateUpdate(process.getProcessState(),
process.getPID(), process.getDuccId().getUnique());
ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate);
// cleanup Camel route associated with a process that just stopped
if (process.getPID() != null && super.getContext().getRoute(process.getPID()) != null) {
try {
// stop collecting process stats from /proc/<pid>/statm
super.getContext().stopRoute(process.getPID());
} catch (Exception e) {
logger.error(methodName, null,
"....Unable to stop Camel route for PID:" + process.getPID());
}
// remove route from context, otherwise the routes accumulate over time causing memory leak
super.getContext().removeRoute(process.getPID());
StringBuilder sb = new StringBuilder("\n");
logger.info(methodName, null,
"Removed Camel Route from Context for PID:" + process.getPID());
for (Route route : super.getContext().getRoutes()) {
sb.append("Camel Context - RouteId:" + route.getId() + "\n");
}
logger.info(methodName, null, sb.toString());
}
updateProcessStatus(event);
} catch (Exception e) {
logger.error(methodName, null, e);
} finally {
}
}
public void onJPInitTimeout(IDuccProcess process, long timeout) {
String methodName = "onJPInitTimeout";
try {
System.out.println("--------- Agent Timedout While Waiting For JP (PID:" + process.getPID()
+ ") to initialize. The JP exceeded configured timeout of " + timeout / (60 * 1000)
+ " minutes");
ProcessStateUpdate processStateUpdate = new ProcessStateUpdate(
ProcessState.InitializationTimeout, process.getPID(),
process.getDuccId().getUnique());
ProcessStateUpdateDuccEvent event = new ProcessStateUpdateDuccEvent(processStateUpdate);
updateProcessStatus(event);
} catch (Exception e) {
logger.error(methodName, null, e);
}
}
public void shutdown(String reason) {
String methodName = "shutdown";
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
try {
undeployProcess(deployedProcess.getDuccProcess());
} catch (Exception e) {
logger.error(methodName, null, e);
}
}
}
public static void lock() throws Exception {
agentLock.acquire();
}
public static void unlock() throws Exception {
agentLock.release();
}
public boolean isManagedProcess(Set<NodeUsersCollector.ProcessInfo> processList,
NodeUsersCollector.ProcessInfo cpi) {
synchronized (monitor) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess() != null) {
// Check if process has been deployed but has not yet
// reported its PID.
// This is normal. It takes a bit of time until the JP
// reports
// its PID to the Agent. If there is at least one process in
// Agent
// deploy list with no PID we assume it is the one.
String dppid = deployedProcess.getDuccProcess().getPID();
if (dppid == null || dppid.equals(String.valueOf(cpi.getPid()))) {
return true;
}
}
}
for (NodeUsersCollector.ProcessInfo pi : processList) {
if (pi.getPid() == cpi.getPPid() && pi.getChildren().size() > 0) { // is
// the current process a child of another java
// Process?
return isManagedProcess(pi.getChildren(), pi);
}
}
}
return false;
}
public boolean isRogueProcess(String uid, Set<NodeUsersCollector.ProcessInfo> processList,
NodeUsersCollector.ProcessInfo cpi) throws Exception {
synchronized (monitor) {
// if cgroups are enabled, check if a given PID (cpi) exists in any of
// the containers. If so, the process is not rogue.
if (useCgroups) {
if (cgroupsManager.isPidInCGroup(String.valueOf(cpi.getPid()))) {
return false;
}
}
// Agent adds a process to its inventory before launching it. So it
// is
// possible that the inventory contains a process with no PID. If
// there
// is such process in the inventory we cannot determine that a given
// pid is rogue yet. Eventually, the launched process reports its
// PID
boolean foundDeployedProcessWithNoPID = false;
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
if (deployedProcess.getDuccProcess() != null) {
// Check if process has been deployed but has not yet
// reported its PID.
// This is normal. It takes a bit of time until the JP
// reports
// its PID to the Agent. If there is at least one process in
// Agent
// deploy list with no PID we assume it is the one.
if (deployedProcess.getDuccProcess().getPID() == null) {
foundDeployedProcessWithNoPID = true;
break;
}
String dppid = deployedProcess.getDuccProcess().getPID();
// process in inventory, not rogue
if (dppid != null && dppid.equals(String.valueOf(cpi.getPid()))) {
return false;
}
}
}
// not found
if (foundDeployedProcessWithNoPID) {
return false;
} else if (cpi.getPPid() == 1) { // Any process owned by init is rogue
// interrupt agent's thread blocking in waitFor() awaiting process termination.
// This process is a zombie and there is no need to waste the thread.
interruptThreadInWaitFor(String.valueOf(cpi.getPid()));
return true;
} else {
return isParentProcessRogue(processList, cpi);
}
}
}
private boolean isParentProcessRogue(Set<NodeUsersCollector.ProcessInfo> processList,
NodeUsersCollector.ProcessInfo cpi) {
// boolean found = false;
for (NodeUsersCollector.ProcessInfo pi : processList) {
if (pi.getPid() == cpi.getPPid()) { // is the current process a
if (pi.isRogue()) { // if parent is rogue, a child is rogue as
// well
return true;
}
return false;
} else {
if (pi.getChildren().size() > 0) {
return isParentProcessRogue(pi.getChildren(), cpi);
}
}
}
return true;
}
/**
* Process resident memory collector routes. Collects resident memory at fixed interval from the
* OS.
*
*/
public class ProcessMemoryUsageRoute extends RouteBuilder {
private NodeAgent agent;
private IDuccProcess process;
private ManagedProcess managedProcess;
public ProcessMemoryUsageRoute(NodeAgent agent, IDuccProcess process,
ManagedProcess managedProcess) {
this.process = process;
this.managedProcess = managedProcess;
this.agent = agent;
}
public void configure() throws Exception {
Processor nmp = configurationFactory.processMetricsProcessor(agent, process, managedProcess);
int fixedRate = configurationFactory.getNodeInventoryPublishDelay();
from("timer:processMemPollingTimer?fixedRate=true&delay=100&period=" + fixedRate)
.routeId(process.getPID()).autoStartup(true).process(nmp);
}
}
@Override
public boolean isStopping() {
return stopping;
}
@Override
public void quiesceAndStop() throws Exception {
stop(true, -1);
}
private void stop(boolean quiesce, long waitTimeInSecs) throws Exception {
synchronized (stopLock) {
logger.info("stop", null, "Agent stop() - quiesce:" + quiesce);
if (stopping) {
return;
}
stopping = true;
stateChange(EventType.SHUTDOWN);
// Dispatch SIGTERM to all processes. If this is quiesce mode we dont try to stop
// non-preemptable processes
boolean wait = stopChildProcesses(quiesce);
if (quiesce) {
logger.info("stop", null, "Agent stopping managed processes");
long waitTime = 60; // default
try {
waitTime = Long.valueOf(configurationFactory.processStopTimeout);
// Normalize. The configurationFactory.processStopTimeout from
// ducc.properties is in millis. The code below expects secs.
waitTime = (waitTime / 1000);
} catch (Exception e) {
}
// Version 2.10.2 of UIMA-AS is not supporting quiesce and stop
// so we need to implement wait than kill -9 strategy.
waitForChildProcessesToTerminateAndKill(wait, waitTime, true, quiesce);
logger.info("stop", null,
">>>>>>>>>>>> stop() waitForChildProcessesToTerminateAndKill() completed");
// wait for JD processes to terminate. Return only when all non-preemptables
// terminate.
waitForChildProcessesToTerminate(false);
logger.info("stop", null,
">>>>>>>>>>>> stop() waitForChildProcessesToTerminate() completed");
} else {
logger.info("stop", null, "Agent stopping managed processes with reaper delay of "
+ waitTimeInSecs + " secs");
// wait for 60 secs and sends SIGKILL to any process still standing
waitForChildProcessesToTerminateAndKill(wait, waitTimeInSecs, false, quiesce);
}
// Send an empty process map as the final inventory
DuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory, getLastORSequence(),
getIdentity());
ORDispatcher.dispatch(duccEvent);
logger.info("stop", null, "Agent published final inventory");
logger.info("stop", null, "Stopping Publishing Metrics and Inventory");
configurationFactory.stop();
logger.info("stop", null, "Reaper thread finished - calling super.stop()");
super.stop();
}
}
@Override
public void stop() throws Exception {
// not quiesce, 60=default timeout before kill -9 is used
stop(false, 60);
}
private void waitForChildProcessesToTerminateAndKill(boolean wait, long waitTimeInSecs,
boolean killJustUimaAs, boolean quiesce) throws Exception {
if (wait && !deployedProcesses.isEmpty()) {
logger.info("waitForChildProcessesToTerminateAndKill", null,
"Agent Sent SIGTERM to ALL Non-Preemptable Child Processes - Number of Deployed Processes:"
+ deployedProcesses.size());
Timer timer = new Timer(true);
logger.info("waitForChildProcessesToTerminateAndKill", null, "Waiting", waitTimeInSecs,
" secs before sending kill -9 to all ***non-preemptable*** child processes still running");
CountDownLatch completionLatch = new CountDownLatch(1);
// start a timer task which when triggered kills processes via kill -9
timer.schedule(new KillTimerTask(completionLatch, killJustUimaAs, quiesce),
waitTimeInSecs * 1000);
// block this thread until killer task finishes its work
completionLatch.await();
}
stopLock.wait(1000);
logger.info("waitForChildProcessesToTerminateAndKill", null, "Done");
}
private void waitForChildProcessesToTerminate(boolean quiesceMode) throws Exception {
logger.info("waitForChildProcessesToTerminate", null,
"Agent Sent SIGTERM to Child Processes - Waiting for them to Quiesce - Number of Deployed Processes:"
+ deployedProcesses.size());
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess p = it.next();
// dont wait for JDs to stop. In quiesce mode we keep them
// running until all JPs terminate and only than we stop them
// if ( quiesceMode && ( p.isJd() || p.isUimaAs() ) ) {
// in quiesce mode, skip UIMA-AS processes since currently
// there is no support for quiesce there. So only wait for
// POPs (JD), UIMA based JPs, and Services
if (quiesceMode && p.isUimaAs()) {
continue;
}
// block waiting for process to terminate
p.getFuture().get();
}
}
public Future<?> getDeployedJPFuture(IDuccId duccId) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
// for (ManagedProcess deployedProcess : deployedProcesses) {
ManagedProcess deployedProcess = it.next();
// ignore duplicate start request for the same process
if (deployedProcess.getDuccId().equals(duccId)) {
return deployedProcess.getFuture();
}
}
return null;
}
public ManagedProcess getDeployedProcess(IDuccId duccId) {
Iterator<ManagedProcess> it = deployedProcesses.iterator();
while (it.hasNext()) {
ManagedProcess deployedProcess = it.next();
// ignore duplicate start request for the same process
if (deployedProcess.getDuccId().equals(duccId)) {
return deployedProcess;
}
}
return null;
}
/**
* Copies reservations sent by the PM. It copies reservations associated with this node.
*
* @param reserves
* - list of ALL reservations
* @throws Exception
*/
public void setReservations(List<DuccUserReservation> reserves) throws Exception {
try {
reservationsSemaphore.acquire();
if (reserves != null) {
// clear old entries
reservations.clear();
// Only copy reservations for this node
IDuccReservationMap reserveMap = new DuccReservationMap();
for (DuccUserReservation r : reserves) {
reserveMap.clear();
for (Map.Entry<DuccId, IDuccReservation> entry : r.getUserReservations().entrySet()) {
if (Utils.isThisNode(getIdentity().getIp(),
entry.getValue().getNodeIdentity().getIp())) {
reserveMap.addReservation(entry.getValue());
}
}
if (reserveMap.getMap().size() > 0) {
DuccUserReservation reserve = new DuccUserReservation(r.getUserId(), r.getReserveID(),
reserveMap);
reservations.add(reserve);
}
}
}
// this.reservations = reservations;
logger.debug("setReservations", null,
"+++++++++++ Copied User Reservations - List Size:" + reservations.size());
} catch (InterruptedException e) {
} finally {
reservationsSemaphore.release();
}
}
public List<DuccId> getUserReservations(String uid) {
List<DuccId> reservationIds = new ArrayList<DuccId>();
try {
reservationsSemaphore.acquire();
if (reservations != null) {
for (DuccUserReservation r : reservations) {
if (r.getUserId().equals(uid)) {
for (Map.Entry<DuccId, IDuccReservation> entry : r.getUserReservations().entrySet()) {
reservationIds.add(entry.getValue().getDuccId());
}
break;
}
}
}
} catch (InterruptedException e) {
} finally {
reservationsSemaphore.release();
}
return reservationIds;
}
public void copyAllUserReservations(TreeMap<String, NodeUsersInfo> map) {
try {
reservationsSemaphore.acquire();
if (reservations != null) {
logger.debug("copyAllUserReservations", null,
"+++++++++++ Copying User Reservations - List Size:" + reservations.size());
for (DuccUserReservation r : reservations) {
if ("System".equals(r.getUserId())) {
continue;
}
NodeUsersInfo nui = null;
if (map.containsKey(r.getUserId())) {
nui = map.get(r.getUserId());
} else {
nui = new NodeUsersInfo(r.getUserId());
map.put(r.getUserId(), nui);
}
nui.addReservation(r.getReserveID());
}
} else {
logger.debug("copyAllUserReservations", null, " *********** No Reservations");
}
} catch (InterruptedException e) {
} finally {
reservationsSemaphore.release();
}
}
public boolean userHasReservation(String uid) throws Exception {
try {
reservationsSemaphore.acquire();
for (DuccUserReservation r : reservations) {
if (r.getUserId().equals(uid)) {
return true;
}
}
} catch (InterruptedException e) {
} finally {
reservationsSemaphore.release();
}
return false;
}
public Object deepCopy(Object original) throws Exception {
ObjectInputStream ois = null;
ObjectOutputStream oos;
ByteArrayInputStream bis;
ByteArrayOutputStream bos;
Object copy;
try {
// serialize object to bytes
bos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(bos);
oos.writeObject(original);
oos.close();
// construct an object from the bytes
bis = new ByteArrayInputStream(bos.toByteArray());
ois = new ObjectInputStream(bis);
copy = ois.readObject();
return copy;
} catch (Exception e) {
throw e;
} finally {
if (ois != null) {
ois.close();
}
}
}
public RogueProcessReaper getRogueProcessReaper() {
return rogueProcessReaper;
}
/**
* Called when an Agent receives self dispatched Ping message.
*/
// public void ping(AgentPingEvent agentPing) {
// nodeMonitor.nodeArrives(agentPing.getNode());
// }
/*
* public boolean excludeUser(String userId ) { if ( configurationFactory.userExclusionList !=
* null ) { // exclusion list contains comma separated user ids String[] excludedUsers =
* configurationFactory.userExclusionList.split(","); for ( String excludedUser : excludedUsers )
* { if ( excludedUser.equals(userId)) { return true; } } } return false; } public boolean
* excludeProcess(String process ) { if ( configurationFactory.processExclusionList != null ) { //
* exclusion list contains comma separated user ids String[] excludedProcesses =
* configurationFactory.processExclusionList.split(","); for ( String excludedProcess :
* excludedProcesses ) { if ( excludedProcess.equals(process)) { return true; } } } return false;
* }
*/
public static void main(String[] args) {
try {
NodeIdentity node = new NodeIdentity(InetAddress.getLocalHost().getHostAddress(),
InetAddress.getLocalHost().getHostName());
NodeAgent agent = new NodeAgent(node);
List<DuccUserReservation> reserves = new ArrayList<DuccUserReservation>();
IDuccReservationMap reserveMap = new DuccReservationMap();
IDuccReservationMap reserveMap2 = new DuccReservationMap();
NodeIdentity ni1 = node;
// new NodeIdentity(, name);
NodeIdentity ni2 = new NodeIdentity("111.111.111.111", "node100");
NodeIdentity ni3 = node;
NodeIdentity ni4 = new NodeIdentity("222.222.222.222", "node102");
DuccId id1 = new DuccId(100);
DuccId id2 = new DuccId(101);
DuccId id3 = new DuccId(102);
DuccId id4 = new DuccId(103);
IDuccReservation reservation1 = new DuccReservation(id1, ni1, 1);
reserveMap.addReservation(reservation1);
IDuccReservation reservation2 = new DuccReservation(id2, ni2, 1);
reserveMap.addReservation(reservation2);
IDuccReservation reservation4 = new DuccReservation(id4, ni4, 1);
reserveMap.addReservation(reservation4);
IDuccReservation reservation3 = new DuccReservation(id3, ni3, 1);
reserveMap2.addReservation(reservation3);
DuccUserReservation reserve = new DuccUserReservation("joe", new DuccId(500), reserveMap);
DuccUserReservation reserve2 = new DuccUserReservation("jane", new DuccId(500), reserveMap2);
reserves.add(reserve);
reserves.add(reserve2);
agent.setReservations(reserves);
} catch (Exception e) {
e.printStackTrace();
}
}
private class NodeExclusionParser {
private boolean excludeNodeFromCGroups = false;
private boolean excludeAP = false;
public void parse(String exclFile) throws Exception {
// <node>=cgroup,ap
File exclusionFile = new File(exclFile);
if (!exclusionFile.exists()) {
return;
}
BufferedReader br = new BufferedReader(new FileReader(exclusionFile));
String line;
NodeIdentity node = getIdentity();
String nodeName = node.getShortName();
while ((line = br.readLine()) != null) {
if (line.startsWith(nodeName)) {
String exclusions = line.substring(line.indexOf("=") + 1);
String[] parsedExclusions = exclusions.split(",");
for (String exclusion : parsedExclusions) {
if (exclusion.trim().equals("cgroup")) {
excludeNodeFromCGroups = true;
} else if (exclusion.trim().equals("ap")) {
excludeAP = true;
}
}
break;
}
}
br.close();
}
public boolean apExcluded() {
return excludeAP;
}
public boolean cgroupsExcluded() {
return excludeNodeFromCGroups;
}
}
public DuccLogger getLogger() {
return logger;
}
private void handleQuiesceAndStopEvent(DuccAdminEventQuiesceAndStop event) {
logger.info("handleQuiesceAndStopEvent", null, "... Agent Received an Admin Request to Stop");
try {
stop(true, -1);
} catch (Exception e) {
logger.info("handleQuiesceAndStopEvent", null, e);
}
}
private void handleStopEvent(DuccAdminEventStop event) {
logger.info("handleStopEvent", null, "... Agent Received an Admin Request to Stop");
try {
stop(false, event.getTimeout());
} catch (Exception e) {
logger.info("handleStopEvent", null, e);
}
}
private void handleStopPublishingEvent(DuccAdminEventStopMetrics event) {
if (isThisTargetNode(getTargetNodes(event.getTargets()))) {
logger.info("handleStopPublishingEvent", null,
"... Agent Received an Admin Request to Stop Metrics Collection and Publishing");
// Stop Camel route responsible for driving collection and publishing of metrics
configurationFactory.stopMetricsRoute();
logger.info("handleStopPublishingEvent", null,
"... Agent Stopped Metrics Collection and Publishing");
}
}
private String[] getTargetNodes(String targets) {
logger.info("getTargetNodes", null, " Targets for Admin Command:" + targets);
return targets.split(",");
}
private boolean isThisTargetNode(String[] nodes) {
for (String targetNode : nodes) {
if (Utils.isMachineNameMatch(targetNode.trim(), getIdentity().getCanonicalName())) {
return true;
}
}
return false;
}
private boolean isTarget(String[] targets) {
for (String target : targets) {
String[] targetParts = target.trim().split("@");
logger.info("isTarget", null,
" Targets for Admin Command:" + target + " This agent canonical identity:"
+ getIdentity().getCanonicalName() + " short name:"
+ getIdentity().getShortName());
if ("agent".equals(targetParts[0])) {
if (Utils.isMachineNameMatch(targetParts[1].trim(), getIdentity().getShortName())) {
return true;
}
}
}
return false;
}
@Override
public void handleAdminEvent(DuccAdminEvent event) throws Exception {
Thread t = new Thread() {
public void run() {
if (event instanceof DuccAdminEventStop) {
if (isTarget(getTargetNodes(((DuccAdminEventStop) event).getTargets()))) {
handleStopEvent((DuccAdminEventStop) event);
}
} else if (event instanceof DuccAdminEventQuiesceAndStop) {
if (isTarget(getTargetNodes(((DuccAdminEventQuiesceAndStop) event).getTargets()))) {
logger.info("handleAdminEvent", null, "Node a target for quiesce");
handleQuiesceAndStopEvent((DuccAdminEventQuiesceAndStop) event);
}
} else if (event instanceof DuccAdminEventStopMetrics) {
handleStopPublishingEvent((DuccAdminEventStopMetrics) event);
} else {
logger.info("handleAdminEvent", null,
"... Agent Received Unexpected Message of Type:" + event.getClass().getName());
}
}
};
t.start();
/*
* if (event instanceof DuccAdminEventStopMetrics) { // Get target machines from the message
* String[] nodes = ((DuccAdminEventStopMetrics) event).getTargetNodes().split(","); // Check if
* this message applies to this node for (String targetNode : nodes) { if
* (Utils.isMachineNameMatch(targetNode.trim(), getIdentity().getCanonicalName())) {
* logger.info("handleAdminEvent", null,
* "... Agent Received an Admin Request to Stop Metrics Collection and Publishing"); // Stop
* Camel route responsible for driving collection and publishing of metrics
* configurationFactory.stopMetricsRoute(); logger.info("handleAdminEvent", null,
* "... Agent Stopped Metrics Collection and Publishing"); break; } else {
* logger.info("handleAdminEvent", null, "... Agent Not Target For Message:" +
* event.getClass().getName()); } } } else { logger.info("handleAdminEvent", null,
* "... Agent Received Unexpected Message of Type:" + event.getClass().getName());
*
* }
*/
}
private class KillTimerTask extends TimerTask {
private CountDownLatch completionLatch;
private boolean killJustUimaAs;
private boolean quiesce;
public KillTimerTask(CountDownLatch completionLatch, boolean killOnlyUimaAs, boolean quiesce) {
this.completionLatch = completionLatch;
this.killJustUimaAs = killOnlyUimaAs;
this.quiesce = quiesce;
}
@Override
public void run() {
try {
// send kill -9 to any child process still running
killChildProcesses(killJustUimaAs, quiesce);
} finally {
completionLatch.countDown();
}
}
}
}