blob: 8af0c41a0801d4f3b5d8ed12e6db946b0757620c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.processors;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.camel.Exchange;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.agent.launcher.ManagedProcess;
import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector;
import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector;
import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage;
import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory;
import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage;
import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage;
import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess;
import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
public class LinuxProcessMetricsProcessor extends BaseProcessor implements
ProcessMetricsProcessor {
private RandomAccessFile statmFile;
// private RandomAccessFile nodeStatFile;
private RandomAccessFile processStatFile;
private long totalCpuInitUsage = 0;
private boolean initializing = true;
private final ExecutorService pool;
private IDuccProcess process;
private DuccGarbageStatsCollector gcStatsCollector;
private int blockSize = 4096; // default, OS specific
private DuccLogger logger;
private ManagedProcess managedProcess;
private NodeAgent agent;
private int fudgeFactor = 5; // default is 5%
private volatile boolean closed = true;
private long clockAtStartOfRun = 0;
private long percentCPU = 0;
public LinuxProcessMetricsProcessor(DuccLogger logger,
IDuccProcess process, NodeAgent agent, String statmFilePath,
String nodeStatFilePath, String processStatFilePath,
ManagedProcess managedProcess) throws FileNotFoundException {
this.logger = logger;
statmFile = new RandomAccessFile(statmFilePath, "r");
// nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r");
processStatFile = new RandomAccessFile(processStatFilePath, "r");
this.managedProcess = managedProcess;
this.agent = agent;
pool = Executors.newCachedThreadPool();
this.process = process;
gcStatsCollector = new DuccGarbageStatsCollector(logger, process);
// keep a refernce to this so that we can call close() when the process
// terminates. We need to
// close fds to stat and statm files
managedProcess.setMetricsProcessor(this);
blockSize = agent.getOSPageSize();
if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) {
try {
fudgeFactor = Integer.parseInt(System
.getProperty("ducc.agent.share.size.fudge.factor"));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
closed = false;
}
public void stop() {
try {
if (pool != null) {
pool.shutdown();
}
} catch (Exception e) {
logger.error("LinuxProcessMetricsProcessor.stop()", null, e);
}
}
public void close() {
closed = true;
try {
if (statmFile != null && statmFile.getFD().valid()) {
statmFile.close();
}
if (processStatFile != null && processStatFile.getFD().valid()) {
processStatFile.close();
}
this.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
private boolean collectStats(ProcessState state) {
if (process.getProcessState().equals(ProcessState.Stopped)
|| process.getProcessState().equals(ProcessState.Killed)
|| process.getProcessState().equals(ProcessState.Failed)
|| process.getProcessState().equals(ProcessState.Stopping)) {
return false; // dont collect stats
}
return true;
}
public void process(Exchange e) {
if (closed) { // files closed
return;
}
// if process is stopping or already dead dont collect metrics. The
// Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
if (process.getProcessState().equals(ProcessState.Initializing)
|| process.getProcessState().equals(ProcessState.Running))
try {
// executes script
// DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up
// swap used by
// a process
long totalSwapUsage = 0;
long totalFaults = 0;
long totalCpuUsage = 0;
long totalRss = 0;
int currentCpuUsage = 0;
Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null;
Future<ProcessCpuUsage> processCpuUsage = null;
String[] cgroupPids = new String[0];
try {
String swapUsageScript = System
.getProperty("ducc.agent.swap.usage.script");
if (agent.useCgroups) {
String containerId = agent.cgroupsManager
.getContainerId(managedProcess);
cgroupPids = agent.cgroupsManager
.getPidsInCgroup(containerId);
for (String pid : cgroupPids) {
// the swap usage script is defined in
// ducc.properties
if (swapUsageScript != null) {
DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
pid, managedProcess.getOwner(),
swapUsageScript, logger);
totalSwapUsage += processSwapSpaceUsage
.getSwapUsage();
}
ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
logger, pid);
// if process is stopping or already dead dont
// collect metrics. The Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
processMajorFaultUsage = pool
.submit(processMajorFaultUsageCollector);
totalFaults += processMajorFaultUsage.get()
.getMajorFaults();
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile("/proc/" + pid + "/stat", "r");
ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(
logger, pid, raf, 42, 0);
// if process is stopping or already dead dont
// collect metrics. The Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
processCpuUsage = pool
.submit(processCpuUsageCollector);
totalCpuUsage += (processCpuUsage.get()
.getTotalJiffies() / agent.cpuClockRate);
} catch( Exception ee) {
logger.warn(
"LinuxProcessMetricsProcessor.process",
null,ee);
} finally {
if ( raf != null ) {
raf.close();
}
}
currentCpuUsage += collectProcessCurrentCPU(pid);
RandomAccessFile rStatmFile = null;
try {
rStatmFile = new RandomAccessFile("/proc/"
+ pid + "/statm", "r");
} catch (FileNotFoundException fnfe) {
logger.info(
"LinuxProcessMetricsProcessor.process",
null,
"Statm File:"
+ "/proc/"
+ pid
+ "/statm *Not Found*. Process must have already exited");
return;
}
ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
rStatmFile, 2, 0);
// if process is stopping or already dead dont
// collect metrics. The Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
Future<ProcessResidentMemory> prm = pool
.submit(collector);
totalRss += prm.get().get();
rStatmFile.close();
}
} else {
if (swapUsageScript != null) {
DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage(
process.getPID(),
managedProcess.getOwner(), swapUsageScript,
logger);
totalSwapUsage = processSwapSpaceUsage
.getSwapUsage();
}
ProcessMajorFaultCollector processMajorFaultUsageCollector = new ProcessMajorFaultCollector(
logger, process.getPID());
// if process is stopping or already dead dont collect
// metrics. The Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
processMajorFaultUsage = pool
.submit(processMajorFaultUsageCollector);
totalFaults = processMajorFaultUsage.get()
.getMajorFaults();
ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(
logger, process.getPID(), processStatFile, 42,
0);
// if process is stopping or already dead dont collect
// metrics. The Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
processCpuUsage = pool.submit(processCpuUsageCollector);
totalCpuUsage = processCpuUsage.get().getTotalJiffies()
/ agent.cpuClockRate;
currentCpuUsage = collectProcessCurrentCPU(process
.getPID());
ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(
statmFile, 2, 0);
// if process is stopping or already dead dont collect
// metrics. The Camel
// route has just been stopped.
if (!collectStats(process.getProcessState())) {
return;
}
Future<ProcessResidentMemory> prm = pool
.submit(collector);
totalRss = prm.get().get();
}
} catch (Exception exc) {
if (!collectStats(process.getProcessState())) {
return;
}
logger.error("LinuxProcessMetricsProcessor.process", null,
exc);
}
// report cpu utilization while the process is running
if (managedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Running)) {
if (agent.cpuClockRate > 0) {
// if the process just change state from Initializing to
// Running ...
if (initializing) {
initializing = false;
// cache how much cpu was used up during
// initialization of the process
totalCpuInitUsage = totalCpuUsage;
// capture time when process state changed to
// Running
clockAtStartOfRun = System.currentTimeMillis();
}
// normalize time in running state into seconds
long timeSinceRunningInSeconds = (System
.currentTimeMillis() - clockAtStartOfRun) / 1000;
if (timeSinceRunningInSeconds > 0) { // prevent division
// by zero
// normalize cpu % usage to report in seconds. Also
// subtract how much cpu was
// used during initialization
percentCPU = 100
* (totalCpuUsage - totalCpuInitUsage)
/ timeSinceRunningInSeconds;
}
// Publish cumulative CPU usage
process.setCpuTime(percentCPU);
} else {
process.setCpuTime(0);
logger.info(
"process",
null,
"Agent is unable to determine Node's clock rate. Defaulting CPU Time to 0 For Process with PID:"
+ process.getPID());
}
} else if (managedProcess.getDuccProcess().getProcessState()
.equals(ProcessState.Initializing)) {
// report 0 for CPU while the process is initializing
process.setCpuTime(0);
} else {
process.setCpuTime(0);
// if process is not dead, report the last known percentCPU
// process.setCpuTime(percentCPU);
}
process.setCurrentCPU(currentCpuUsage);
logger.info(
"process",
null,
"----------- PID:" + process.getPID()
+ " Average CPU Time:" + percentCPU
+ "% Current CPU Time:"
+ process.getCurrentCPU());
// long majorFaults =
// processMajorFaultUsage.get().getMajorFaults();
// collects process Major faults (swap in memory)
process.setMajorFaults(totalFaults);
// Current Process Swap Usage in bytes
long st = System.currentTimeMillis();
long processSwapUsage = totalSwapUsage * 1024;
// collects swap usage from /proc/<PID>/smaps file via a script
// DUCC_HOME/admin/collect_process_swap_usage.sh
process.setSwapUsage(processSwapUsage);
logger.info(
"process",
null,
"----------- PID:" + process.getPID()
+ " Major Faults:" + totalFaults
+ " Process Swap Usage:" + processSwapUsage
+ " Max Swap Usage Allowed:"
+ managedProcess.getMaxSwapThreshold()
+ " Time to Collect Swap Usage:"
+ (System.currentTimeMillis() - st));
if (processSwapUsage > 0
&& processSwapUsage > managedProcess
.getMaxSwapThreshold()) {
/*
* // Disable code that kill a process if it exceeds its
* swap allocation. Per JIRA // UIMA-3320, agent will
* monitor node-wide swap usage and will kill processes that
* // use most of the swap. logger.error( "process", null,
* "\n\n********************************************************\n\tProcess with PID:"
* + managedProcess.getPid() +
* " Exceeded its Max Swap Usage Threshold of " +
* (managedProcess.getMaxSwapThreshold() / 1024) / 1024 +
* " MBs. The Current Swap Usage is: " + (processSwapUsage /
* 1024) / 1024 +
* " MBs .Killing process ...\n********************************************************\n\n"
* ); try { managedProcess.kill(); // mark it for death
* process
* .setReasonForStoppingProcess(ReasonForStoppingProcess
* .ExceededSwapThreshold .toString());
* agent.stopProcess(process);
*
* if ( agent.useCgroups ) { for( String pid : cgroupPids )
* { // skip the main process that was just killed above.
* Only kill // its child processes. if (
* pid.equals(managedProcess.getDuccProcess().getPID())) {
* continue; } killChildProcess(pid,"-15"); } }
*
* } catch (Exception ee) { logger.error("process", null,
* ee); } return;
*/
} else {
// Use Memory Guard only if cgroups are disabled and fudge
// factor > -1
if (!agent.useCgroups
&& fudgeFactor > -1
&& managedProcess.getProcessMemoryAssignment()
.getMaxMemoryWithFudge() > 0) {
// RSS is in terms of pages(blocks) which size is system
// dependent. Default 4096 bytes
long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize
// RSS
// into
// MB
logger.trace(
"process",
null,
"*** Process with PID:"
+ managedProcess.getPid()
+ " Assigned Memory (MB): "
+ managedProcess
.getProcessMemoryAssignment()
+ " MBs. Current RSS (MB):" + rss);
// check if process resident memory exceeds its memory
// assignment calculate in the PM
if (rss > managedProcess.getProcessMemoryAssignment()
.getMaxMemoryWithFudge()) {
logger.error(
"process",
null,
"\n\n********************************************************\n\tProcess with PID:"
+ managedProcess.getPid()
+ " Exceeded its max memory assignment (including a fudge factor) of "
+ managedProcess
.getProcessMemoryAssignment()
.getMaxMemoryWithFudge()
+ " MBs. This Process Resident Memory Size: "
+ rss
+ " MBs .Killing process ...\n********************************************************\n\n");
try {
managedProcess.kill(); // mark it for death
process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize
.toString());
agent.stopProcess(process);
if (agent.useCgroups) {
for (String pid : cgroupPids) {
// skip the main process that was just
// killed above. Only kill
// its child processes.
if (pid.equals(managedProcess
.getDuccProcess().getPID())) {
continue;
}
killChildProcess(pid, "-15");
}
}
} catch (Exception ee) {
if (!collectStats(process.getProcessState())) {
return;
}
logger.error("process", null, ee);
}
return;
}
}
}
// Publish resident memory
process.setResidentMemory((totalRss * blockSize));
// dont collect GC metrics for POPs. May not be java or may not
// be a jmx enabled java process
if (!process.getProcessType().equals(ProcessType.Pop)) {
ProcessGarbageCollectionStats gcStats = gcStatsCollector
.collect();
process.setGarbageCollectionStats(gcStats);
logger.info(
"process",
null,
"PID:" + process.getPID()
+ " Total GC Collection Count :"
+ gcStats.getCollectionCount()
+ " Total GC Collection Time :"
+ gcStats.getCollectionTime());
}
} catch (Exception ex) {
// if the child process is not running dont log the exception.
if (!collectStats(process.getProcessState())) {
return;
}
logger.error("process", null, ex);
ex.printStackTrace();
}
}
private int collectProcessCurrentCPU(String pid) throws Exception {
InputStream stream = null;
BufferedReader reader = null;
String cpuTime = "0";
ProcessBuilder pb;
int cpuint = 0;
if (process != null
&& (process.getProcessState().equals(ProcessState.Running) || (process
.getProcessState().equals(ProcessState.Initializing)))) {
// run top in batch mode and filter just the CPU
pb = new ProcessBuilder("/bin/sh", "-c", "top -b -n 1 -p " + pid
+ " | tail -n 2 | head -n 1 | awk '{print $9}'");
pb.redirectErrorStream(true);
Process proc = pb.start();
// spawn ps command and scrape the output
stream = proc.getInputStream();
reader = new BufferedReader(new InputStreamReader(stream));
String line;
String regex = "\\s+";
// read the next line from ps output
while ((line = reader.readLine()) != null) {
String tokens[] = line.split(regex);
if (tokens.length > 0) {
logger.info("collectProcessCurrentCPU", null, " PID:"+pid+" " +line
+ " == CPUTIME:" + tokens[0]);
cpuTime = tokens[0];
}
}
if (cpuTime.indexOf(".") > -1) {
cpuTime = cpuTime.substring(0, cpuTime.indexOf("."));
}
stream.close();
proc.waitFor();
try {
cpuint = Integer.valueOf(cpuTime);
} catch (NumberFormatException e) {
// ignore, return 0
}
}
return cpuint;
}
private void killChildProcess(final String pid, final String signal) {
// spawn a thread that will do kill -15, wait for 1 minute and kill the
// process
// hard if it is still alive
(new Thread() {
public void run() {
String c_launcher_path = Utils
.resolvePlaceholderIfExists(
System.getProperty("ducc.agent.launcher.ducc_spawn_path"),
System.getProperties());
try {
String[] killCmd = null;
String useSpawn = System
.getProperty("ducc.agent.launcher.use.ducc_spawn");
if (useSpawn != null
&& useSpawn.toLowerCase().equals("true")) {
killCmd = new String[] {
c_launcher_path,
"-u",
((ManagedProcess) managedProcess).getOwner(),
"--",
"/bin/kill",
signal,
((ManagedProcess) managedProcess)
.getDuccProcess().getPID() };
} else {
killCmd = new String[] {
"/bin/kill",
"-15",
((ManagedProcess) managedProcess)
.getDuccProcess().getPID() };
}
ProcessBuilder pb = new ProcessBuilder(killCmd);
Process p = pb.start();
p.wait(1000 * 60); // wait for 1 minute and whack the
// process if still alive
p.destroy();
} catch (Exception e) {
logger.error("killChildProcess",
managedProcess.getWorkDuccId(), e);
}
}
}).start();
}
}