blob: 41725920479f180de06a2cb5e976cd1900745f66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.processors;
import java.io.FileNotFoundException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessSwapUsageCollector;
import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
import org.apache.uima.ducc.common.agent.metrics.swap.ProcessSwapSpaceUsage;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public class LinuxProcessMetricsProcessor extends BaseProcessor implements
ProcessMetricsProcessor {
private long previousCPUReadingInMillis = 0;
private long previousSnapshotTime = 0;
private final ExecutorService pool;
private IDuccProcess process;
private DuccGarbageStatsCollector gcStatsCollector;
private int blockSize = 4096; // default, OS specific
private DuccLogger logger;
private ManagedProcess managedProcess;
private NodeAgent agent;
private int fudgeFactor = 5; // default is 5%
private volatile boolean closed = true;
private long percentCPU = 0;
public LinuxProcessMetricsProcessor(DuccLogger logger,
IDuccProcess process, NodeAgent agent, ManagedProcess managedProcess) throws FileNotFoundException {
this.logger = logger;
this.managedProcess = managedProcess;
this.agent = agent;
pool = Executors.newCachedThreadPool();
this.process = process;
gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
// keep a refernce to this so that we can call close() when the process
// terminates. We need to
// close fds to stat and statm files
managedProcess.setMetricsProcessor(this);
blockSize = agent.getOSPageSize();
if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
try {
fudgeFactor = Integer.parseInt(System
.getProperty("ducc.agent.share.size.fudge.factor"));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
closed = false;
}
public void stop() {
try {
if (pool != null) {
pool.shutdown();
}
} catch (Exception e) {
logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
}
}
public void close() {
closed = true;
try {
this.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean collectStats(ProcessState state) {
if (process.getProcessState().equals(ProcessState.Stopped)
|| process.getProcessState().equals(ProcessState.Killed)
|| process.getProcessState().equals(ProcessState.Failed)
|| process.getProcessState().equals(ProcessState.Stopping)) {
return false; // dont collect stats
}
return true;
}
private long getSwapUsage() throws Exception {
long swapUsage = -1;
if (agent.useCgroups) {
String containerId = agent.cgroupsManager
.getContainerId(managedProcess);
ProcessSwapUsageCollector processSwapCollector = new ProcessSwapUsageCollector(
logger, agent.cgroupsManager, containerId);
logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
"Fetching Swap Usage PID:" + process.getPID());
Future<ProcessSwapSpaceUsage> processFaults = pool
.submit(processSwapCollector);
swapUsage = processFaults.get().getSwapUsage();
logger.debug("LinuxProcessMetricsProcessor.getSwapUsage", null,
" Process Swap Usage:" + swapUsage);
}
return swapUsage;
}
private long getFaults() throws Exception {
long faults = -1;
if (agent.useCgroups) {
String containerId = agent.cgroupsManager.getContainerId(managedProcess);
ProcessMajorFaultCollector processFaultsCollector =
new ProcessMajorFaultCollector(logger, agent.cgroupsManager, containerId);
logger.debug("LinuxProcessMetricsProcessor.getFaults",null,"Fetching Page Faults PID:"+process.getPID());
Future<ProcessMemoryPageLoadUsage> processFaults = pool.submit(processFaultsCollector);
faults = processFaults.get().getMajorFaults();
logger.debug(
"LinuxProcessMetricsProcessor.getFaults",null," Process Faults (pgpgin):"+faults);
}
return faults;
}
private long getRss() throws Exception {
long rss = -1;
if (agent.useCgroups) {
String containerId = agent.cgroupsManager.getContainerId(managedProcess);
ProcessResidentMemoryCollector processRSSCollector =
new ProcessResidentMemoryCollector(logger, agent.cgroupsManager, containerId);
logger.debug("LinuxProcessMetricsProcessor.getRss",null,"Fetching RSS Usage for PID:"+process.getPID());
Future<ProcessResidentMemory> processRss = pool.submit(processRSSCollector);
rss = processRss.get().get();
logger.debug(
"LinuxProcessMetricsProcessor.getRss",null," Process RSS:"+rss);
}
return rss;
}
private long getCpuUsage() throws Exception {
long cpuUsage=-1;
if (agent.useCgroups) {
String containerId = agent.cgroupsManager.getContainerId(managedProcess);
Future<ProcessCpuUsage> processCpuUsage = null;
ProcessCpuUsageCollector processCpuUsageCollector =
new ProcessCpuUsageCollector(logger, agent.cgroupsManager, containerId);
logger.debug("LinuxProcessMetricsProcessor.getCpuUsage",null,"Fetching CPU Usage for PID:"+process.getPID());
processCpuUsage = pool
.submit(processCpuUsageCollector);
long cpuUsageInNanos = processCpuUsage.get().getCpuUsage();
if ( cpuUsageInNanos >= 0 ) {
// cpuUsage comes from cpuacct.usage and is in nanos
cpuUsage = Math.round( cpuUsageInNanos / 1000000 ); // normalize into millis
}
logger.debug(
"LinuxProcessMetricsProcessor.getCpuUsage",null,
"CPU USAGE:"+cpuUsageInNanos+ " CLOCK RATE:"+agent.cpuClockRate+" Total CPU USAGE:"+cpuUsage);
}
return cpuUsage;
}
private long getCpuTime( long totalCpuUsageInMillis) throws Exception {
long cp = -1;
if (managedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Running) ||
managedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Initializing)
) {
if (agent.useCgroups && totalCpuUsageInMillis != -1) {
long timeRunning = 1;
if ( process.getTimeWindowInit() != null ) {
timeRunning = process.getTimeWindowInit().getElapsedMillis();
}
if ( process.getTimeWindowRun() != null ) {
timeRunning += process.getTimeWindowRun().getElapsedMillis();
}
// normalize time in running state into seconds
percentCPU = Math.round(100*( (totalCpuUsageInMillis*1.0)/ (timeRunning*1.0)));
cp = percentCPU;
}
} else {
cp = percentCPU;
}
return cp;
}
private long getCurrentCpu(long totalCpuUsageInMillis ) {
long currentCpu=-1;
// publish current CPU usage by computing a delta from the last time
// CPU data was fetched.
if ( totalCpuUsageInMillis > 0 ) {
double millisCPU = ( totalCpuUsageInMillis - previousCPUReadingInMillis )*1.0;
double millisRun = ( System.currentTimeMillis() - previousSnapshotTime )*1.0;
currentCpu = Math.round(100*(millisCPU/millisRun) ) ;
previousCPUReadingInMillis = totalCpuUsageInMillis;
previousSnapshotTime = System.currentTimeMillis();
} else {
if (agent.useCgroups && totalCpuUsageInMillis != -1 ) {
currentCpu = 0;
}
}
return currentCpu;
}
private void killProcsIfExceedingMemoryThreshold() throws Exception {
if ( !agent.useCgroups ) {
return;
}
if (process.getSwapUsage() > 0
&& process.getSwapUsage() > managedProcess
.getMaxSwapThreshold()) {
} else {
String containerId = agent.cgroupsManager
.getContainerId(managedProcess);
String[] cgroupPids = agent.cgroupsManager
.getPidsInCgroup(containerId);
logger.debug("LinuxProcessMetricsProcessor.process",null,"Container ID:"+containerId+" cgroup pids "+cgroupPids.length);
// Use Memory Guard only if cgroups are disabled and fudge
// factor > -1
if ( fudgeFactor > -1
&& managedProcess.getProcessMemoryAssignment()
.getMaxMemoryWithFudge() > 0) {
long rss = (process.getResidentMemory() / 1024) / 1024; // normalize RSS into MB
logger.trace(
"process",
null,
"*** Process with PID:"
+ managedProcess.getPid()
+ " Assigned Memory (MB): "
+ managedProcess
.getProcessMemoryAssignment()
+ " MBs. Current RSS (MB):" + rss);
// check if process resident memory exceeds its memory
// assignment calculate in the PM
if (rss > managedProcess.getProcessMemoryAssignment()
.getMaxMemoryWithFudge()) {
logger.error(
"process",
null,
"\n\n********************************************************\n\tProcess with PID:"
+ managedProcess.getPid()
+ " Exceeded its max memory assignment (including a fudge factor) of "
+ managedProcess
.getProcessMemoryAssignment()
.getMaxMemoryWithFudge()
+ " MBs. This Process Resident Memory Size: "
+ rss
+ " MBs .Killing process ...\n********************************************************\n\n");
try {
managedProcess.kill(); // mark it for death
process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
.toString());
agent.stopProcess(process);
if (agent.useCgroups) {
for (String pid : cgroupPids) {
// skip the main process that was just
// killed above. Only kill
// its child processes.
if (pid.equals(managedProcess
.getDuccProcess().getPID())) {
continue;
}
killChildProcess(pid, "-15");
}
}
} catch (Exception ee) {
if (!collectStats(process.getProcessState())) {
return;
}
logger.error("process", null, ee);
}
return;
}
}
}
}
private ProcessGarbageCollectionStats getGCStats() throws Exception {
// if (!process.getProcessType().equals(ProcessType.Pop)) {
if ( process.getProcessJmxUrl() != null
&& process.getProcessJmxUrl().trim().length() > 0 ) {
logger.debug("LinuxProcessMetricsProcessor.getGCStats", null, "Collecting GC Stats");
ProcessGarbageCollectionStats gcStats = gcStatsCollector
.collect();
return gcStats;
}
return new ProcessGarbageCollectionStats();
}
public boolean processIsActive() {
return process.getProcessState().equals(ProcessState.Starting)
||
process.getProcessState().equals(ProcessState.Started)
||
process.getProcessState().equals(ProcessState.Initializing)
||
process.getProcessState().equals(ProcessState.Running);
}
private void stopRoute() throws Exception {
String methodName = "stopRoute";
if (process.getPID() != null && agent.getContext().getRoute(process.getPID()) != null) {
try {
// stop collecting process stats
agent.getContext().stopRoute(process.getPID());
} catch (Exception e) {
logger.error(methodName, null, "....Unable to stop Camel route for PID:" + process.getPID());
}
// remove route from context, otherwise the routes accumulate over time causing
// memory leak
agent.getContext().removeRoute(process.getPID());
StringBuilder sb = new StringBuilder("\n");
logger.info(methodName, null, "Removed Camel Route from Context for PID:" + process.getPID());
for (Route route : agent.getContext().getRoutes()) {
sb.append("Camel Context - RouteId:" + route.getId() + "\n");
}
logger.info(methodName, null, sb.toString());
}
}
public void process(Exchange e) {
// if process is stopping or already dead dont collect metrics. The
// Camel route has just been stopped.
if (closed || !processIsActive()) {
logger.info("LinuxProcessMetricsProcessor.process", null,"Process with PID:"+process.getPID() +" not in Running or Initializing state. Returning");
Thread t = new Thread(
new Runnable() {
public void run() {
try {
stopRoute();
} catch( Exception ex) {
logger.error("process", null, ex);
}
}
});
t.start();
return;
}
try {
process.setSwapUsage(getSwapUsage());
process.setMajorFaults(getFaults());
long rssInBytes = getRss();
process.setResidentMemory(rssInBytes);
long totalCpuUsageInMillis = getCpuUsage();
// set CPU time in terms of %
process.setCpuTime(getCpuTime(totalCpuUsageInMillis));
process.setCurrentCPU(getCurrentCpu(totalCpuUsageInMillis));
ProcessGarbageCollectionStats gcStats = getGCStats();
process.setGarbageCollectionStats(gcStats);
logger.info(
"process",
null,
"----------- PID:" + process.getPID() + " RSS:"
+ ((rssInBytes > -1) ? (rssInBytes / (1024 * 1024))+ " MB" : "-1")
+ " Total CPU Time (%):" + process.getCpuTime()
+ " Delta CPU Time (%):" + process.getCurrentCPU()
+ " Major Faults:" + process.getMajorFaults()
+ " Process Swap Usage:" + process.getSwapUsage()
+ " Max Swap Usage Allowed:"
+ managedProcess.getMaxSwapThreshold()
+ " Total GC Collection Count :"
+ gcStats.getCollectionCount()
+ " Total GC Collection Time :"
+ gcStats.getCollectionTime());
killProcsIfExceedingMemoryThreshold();
} catch (Exception exc) {
if (!collectStats(process.getProcessState())) {
return;
}
logger.error("LinuxProcessMetricsProcessor.process", null, exc);
}
}
private void killChildProcess(final String pid, final String signal) {
// spawn a thread that will do kill -15, wait for 1 minute and kill the
// process
// hard if it is still alive
(new Thread() {
public void run() {
String c_launcher_path = Utils
.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
System.getProperties());
try {
String[] killCmd = null;
String useSpawn = System
.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null
&& useSpawn.toLowerCase().equals("true")) {
killCmd = new String[] {
c_launcher_path,
"-u",
((ManagedProcess) managedProcess).getOwner(),
"--",
"/bin/kill",
signal,
((ManagedProcess) managedProcess)
.getDuccProcess().getPID() };
} else {
killCmd = new String[] {
"/bin/kill",
"-15",
((ManagedProcess) managedProcess)
.getDuccProcess().getPID() };
}
ProcessBuilder pb = new ProcessBuilder(killCmd);
Process p = pb.start();
p.wait(1000 * 60); // wait for 1 minute and whack the
// process if still alive
p.destroy();
} catch (Exception e) {
logger.error("killChildProcess",
managedProcess.getWorkDuccId(), e);
}
}
}).start();
}
}