| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.agent.processors; |
| |
| |
| import java.io.FileNotFoundException; |
| import java.io.RandomAccessFile; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| import org.apache.camel.Exchange; |
| import org.apache.uima.ducc.agent.NodeAgent; |
| import org.apache.uima.ducc.agent.launcher.ManagedProcess; |
| import org.apache.uima.ducc.agent.metrics.collectors.DuccGarbageStatsCollector; |
| import org.apache.uima.ducc.agent.metrics.collectors.ProcessCpuUsageCollector; |
| import org.apache.uima.ducc.agent.metrics.collectors.ProcessMajorFaultCollector; |
| import org.apache.uima.ducc.agent.metrics.collectors.ProcessResidentMemoryCollector; |
| import org.apache.uima.ducc.common.agent.metrics.cpu.ProcessCpuUsage; |
| import org.apache.uima.ducc.common.agent.metrics.memory.ProcessResidentMemory; |
| import org.apache.uima.ducc.common.agent.metrics.swap.DuccProcessSwapSpaceUsage; |
| import org.apache.uima.ducc.common.agent.metrics.swap.ProcessMemoryPageLoadUsage; |
| import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats; |
| import org.apache.uima.ducc.common.utils.DuccLogger; |
| import org.apache.uima.ducc.common.utils.Utils; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcess.ReasonForStoppingProcess; |
| import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType; |
| import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; |
| |
| public class LinuxProcessMetricsProcessor extends BaseProcessor implements ProcessMetricsProcessor { |
| private RandomAccessFile statmFile; |
| |
| // private RandomAccessFile nodeStatFile; |
| private RandomAccessFile processStatFile; |
| |
| private long totalCpuInitUsage = 0; |
| |
| private boolean initializing = true; |
| |
| private final ExecutorService pool; |
| |
| private IDuccProcess process; |
| |
| private DuccGarbageStatsCollector gcStatsCollector; |
| |
| private int blockSize = 4096; // default, OS specific |
| |
| private DuccLogger logger; |
| |
| private ManagedProcess managedProcess; |
| |
| private NodeAgent agent; |
| |
| private int fudgeFactor = 5; // default is 5% |
| |
| private volatile boolean closed = true; |
| |
| private long clockAtStartOfRun=0; |
| |
| private long percentCPU=0; |
| |
| // private int logCounter=0; |
| public LinuxProcessMetricsProcessor(DuccLogger logger, IDuccProcess process, NodeAgent agent, |
| String statmFilePath, String nodeStatFilePath, String processStatFilePath, |
| ManagedProcess managedProcess) throws FileNotFoundException { |
| this.logger = logger; |
| statmFile = new RandomAccessFile(statmFilePath, "r"); |
| // nodeStatFile = new RandomAccessFile(nodeStatFilePath, "r"); |
| processStatFile = new RandomAccessFile(processStatFilePath, "r"); |
| this.managedProcess = managedProcess; |
| this.agent = agent; |
| pool = Executors.newCachedThreadPool(); |
| this.process = process; |
| gcStatsCollector = new DuccGarbageStatsCollector(logger, process); |
| |
| // keep a refernce to this so that we can call close() when the process terminates. We need to |
| // close fds to stat and statm files |
| managedProcess.setMetricsProcessor(this); |
| |
| blockSize = agent.getOSPageSize(); |
| |
| if (System.getProperty("ducc.agent.share.size.fudge.factor") != null) { |
| try { |
| fudgeFactor = Integer.parseInt(System.getProperty("ducc.agent.share.size.fudge.factor")); |
| } catch (NumberFormatException e) { |
| e.printStackTrace(); |
| } |
| } |
| closed = false; |
| } |
| public void stop() { |
| try { |
| if ( pool != null ) { |
| pool.shutdown(); |
| } |
| } catch( Exception e) { |
| logger.error("LinuxProcessMetricsProcessor.stop()", null, e); |
| |
| } |
| } |
| public void close() { |
| closed = true; |
| try { |
| if (statmFile != null && statmFile.getFD().valid()) { |
| statmFile.close(); |
| } |
| if ( processStatFile != null && processStatFile.getFD().valid()) { |
| processStatFile.close(); |
| } |
| this.stop(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| public void process(Exchange e) { |
| if ( closed ) { // files closed |
| return; |
| } |
| if (process.getProcessState().equals(ProcessState.Initializing) |
| || process.getProcessState().equals(ProcessState.Running)) |
| try { |
| |
| String DUCC_HOME = Utils.findDuccHome(); |
| // executes script DUCC_HOME/admin/ducc_get_process_swap_usage.sh which sums up swap used by |
| // a process |
| long totalSwapUsage = 0; |
| long totalFaults = 0; |
| long totalCpuUsage = 0; |
| long totalRss = 0; |
| Future<ProcessMemoryPageLoadUsage> processMajorFaultUsage = null; |
| Future<ProcessCpuUsage> processCpuUsage = null; |
| String[] cgroupPids = new String[0]; |
| try { |
| if ( agent.useCgroups ) { |
| String containerId = agent.cgroupsManager.getContainerId(managedProcess); |
| cgroupPids = |
| agent.cgroupsManager.getPidsInCgroup(containerId); |
| |
| for( String pid : cgroupPids ) { |
| DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage( |
| pid, managedProcess.getOwner(), DUCC_HOME |
| + "/admin/ducc_get_process_swap_usage.sh", logger); |
| totalSwapUsage += processSwapSpaceUsage.getSwapUsage(); |
| |
| |
| ProcessMajorFaultCollector processMajorFaultUsageCollector = |
| new ProcessMajorFaultCollector(logger, pid); |
| |
| processMajorFaultUsage = pool |
| .submit(processMajorFaultUsageCollector); |
| totalFaults += processMajorFaultUsage.get().getMajorFaults(); |
| |
| ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger, |
| pid, processStatFile, 42, 0); |
| |
| processCpuUsage = pool.submit(processCpuUsageCollector); |
| totalCpuUsage += (processCpuUsage.get().getTotalJiffies()/ agent.cpuClockRate); |
| |
| RandomAccessFile rStatmFile = |
| new RandomAccessFile("/proc/" + pid + "/statm", "r"); |
| ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(rStatmFile, 2, |
| 0); |
| Future<ProcessResidentMemory> prm = pool.submit(collector); |
| |
| totalRss += prm.get().get(); |
| |
| rStatmFile.close(); |
| } |
| } else { |
| DuccProcessSwapSpaceUsage processSwapSpaceUsage = new DuccProcessSwapSpaceUsage( |
| process.getPID(), managedProcess.getOwner(), DUCC_HOME |
| + "/admin/ducc_get_process_swap_usage.sh", logger); |
| totalSwapUsage = processSwapSpaceUsage.getSwapUsage(); |
| |
| ProcessMajorFaultCollector processMajorFaultUsageCollector = |
| new ProcessMajorFaultCollector(logger, process.getPID()); |
| |
| processMajorFaultUsage = pool |
| .submit(processMajorFaultUsageCollector); |
| totalFaults = processMajorFaultUsage.get().getMajorFaults(); |
| |
| ProcessCpuUsageCollector processCpuUsageCollector = new ProcessCpuUsageCollector(logger, |
| process.getPID(), processStatFile, 42, 0); |
| |
| processCpuUsage = pool.submit(processCpuUsageCollector); |
| totalCpuUsage = processCpuUsage.get().getTotalJiffies()/ agent.cpuClockRate; |
| |
| ProcessResidentMemoryCollector collector = new ProcessResidentMemoryCollector(statmFile, 2, |
| 0); |
| Future<ProcessResidentMemory> prm = pool.submit(collector); |
| |
| totalRss = prm.get().get(); |
| } |
| |
| } catch( Exception exc) { |
| logger.error("LinuxProcessMetricsProcessor.process", null, exc); |
| } |
| |
| |
| // report cpu utilization while the process is running |
| if ( managedProcess.getDuccProcess().getProcessState().equals(ProcessState.Running)) { |
| if (agent.cpuClockRate > 0) { |
| // if the process just change state from Initializing to Running ... |
| if ( initializing ) { |
| initializing = false; |
| // cache how much cpu was used up during initialization of the process |
| totalCpuInitUsage = totalCpuUsage; |
| // capture time when process state changed to Running |
| clockAtStartOfRun = System.currentTimeMillis(); |
| } |
| // normalize time in running state into seconds |
| long timeSinceRunningInSeconds = (System.currentTimeMillis() - clockAtStartOfRun)/1000; |
| // normalize cpu % usage to report in seconds. Also subtract how much cpu was |
| // used during initialization |
| percentCPU = 100* ( totalCpuUsage - totalCpuInitUsage )/timeSinceRunningInSeconds; |
| |
| logger.info("process", null, "----------- PID:" + process.getPID() |
| + " CPU Time:" + percentCPU+"%"); |
| // Publish cumulative CPU usage |
| process.setCpuTime(percentCPU); |
| } else { |
| process.setCpuTime(0); |
| logger.info("process", null, |
| "Agent is unable to determine Node's clock rate. Defaulting CPU Time to 0 For Process with PID:" |
| + process.getPID()); |
| } |
| |
| } else if ( managedProcess.getDuccProcess().getProcessState().equals(ProcessState.Initializing)) { |
| // report 0 for CPU while the process is initializing |
| process.setCpuTime(0); |
| } |
| else { |
| // if process is not dead, report the last known percentCPU |
| process.setCpuTime(percentCPU); |
| } |
| // long majorFaults = processMajorFaultUsage.get().getMajorFaults(); |
| // collects process Major faults (swap in memory) |
| process.setMajorFaults(totalFaults); |
| // Current Process Swap Usage in bytes |
| long st = System.currentTimeMillis(); |
| // long processSwapUsage = processSwapSpaceUsage.getSwapUsage() * 1024; |
| long processSwapUsage = totalSwapUsage * 1024; |
| // collects swap usage from /proc/<PID>/smaps file via a script |
| // DUCC_HOME/admin/collect_process_swap_usage.sh |
| process.setSwapUsage(processSwapUsage); |
| // if ( (logCounter % 2 ) == 0 ) { |
| logger.info("process", null, "----------- PID:" + process.getPID() + " Major Faults:" |
| + totalFaults + " Process Swap Usage:" + processSwapUsage |
| + " Max Swap Usage Allowed:" + managedProcess.getMaxSwapThreshold() |
| + " Time to Collect Swap Usage:" + (System.currentTimeMillis() - st)); |
| // } |
| // logCounter++; |
| |
| if (processSwapUsage > 0 && processSwapUsage > managedProcess.getMaxSwapThreshold()) { |
| /* |
| // Disable code that kill a process if it exceeds its swap allocation. Per JIRA |
| // UIMA-3320, agent will monitor node-wide swap usage and will kill processes that |
| // use most of the swap. |
| logger.error( |
| "process", |
| null, |
| "\n\n********************************************************\n\tProcess with PID:" |
| + managedProcess.getPid() |
| + " Exceeded its Max Swap Usage Threshold of " |
| + (managedProcess.getMaxSwapThreshold() / 1024) |
| / 1024 |
| + " MBs. The Current Swap Usage is: " |
| + (processSwapUsage / 1024) |
| / 1024 |
| + " MBs .Killing process ...\n********************************************************\n\n"); |
| try { |
| managedProcess.kill(); // mark it for death |
| process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededSwapThreshold |
| .toString()); |
| agent.stopProcess(process); |
| |
| if ( agent.useCgroups ) { |
| for( String pid : cgroupPids ) { |
| // skip the main process that was just killed above. Only kill |
| // its child processes. |
| if ( pid.equals(managedProcess.getDuccProcess().getPID())) { |
| continue; |
| } |
| killChildProcess(pid,"-15"); |
| } |
| } |
| |
| } catch (Exception ee) { |
| logger.error("process", null, ee); |
| } |
| return; |
| */ |
| } else { |
| // Use Memory Guard only if cgroups are disabled and fudge factor > -1 |
| |
| if ( !agent.useCgroups && fudgeFactor > -1 |
| && managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() > 0) { |
| // RSS is in terms of pages(blocks) which size is system dependent. Default 4096 bytes |
| long rss = (totalRss * (blockSize / 1024)) / 1024; // normalize RSS into MB |
| logger.trace("process", null, "*** Process with PID:" + managedProcess.getPid() |
| + " Assigned Memory (MB): " + managedProcess.getProcessMemoryAssignment() |
| + " MBs. Current RSS (MB):" + rss); |
| // check if process resident memory exceeds its memory assignment calculate in the PM |
| if (rss > managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge()) { |
| logger.error( |
| "process", |
| null, |
| "\n\n********************************************************\n\tProcess with PID:" |
| + managedProcess.getPid() |
| + " Exceeded its max memory assignment (including a fudge factor) of " |
| + managedProcess.getProcessMemoryAssignment().getMaxMemoryWithFudge() |
| + " MBs. This Process Resident Memory Size: " |
| + rss |
| + " MBs .Killing process ...\n********************************************************\n\n"); |
| try { |
| managedProcess.kill(); // mark it for death |
| process.setReasonForStoppingProcess(ReasonForStoppingProcess.ExceededShareSize |
| .toString()); |
| agent.stopProcess(process); |
| |
| if ( agent.useCgroups ) { |
| for( String pid : cgroupPids ) { |
| // skip the main process that was just killed above. Only kill |
| // its child processes. |
| if ( pid.equals(managedProcess.getDuccProcess().getPID())) { |
| continue; |
| } |
| killChildProcess(pid,"-15"); |
| } |
| } |
| } catch (Exception ee) { |
| logger.error("process", null, ee); |
| } |
| return; |
| } |
| } |
| |
| } |
| // Publish resident memory |
| process.setResidentMemory((totalRss * blockSize)); |
| // dont collect GC metrics for POPs. May not be java or may not be a jmx enabled java process |
| if ( !process.getProcessType().equals(ProcessType.Pop)) { |
| ProcessGarbageCollectionStats gcStats = gcStatsCollector.collect(); |
| process.setGarbageCollectionStats(gcStats); |
| logger.info( |
| "process", |
| null, |
| "PID:" + process.getPID() + " Total GC Collection Count :" |
| + gcStats.getCollectionCount() + " Total GC Collection Time :" |
| + gcStats.getCollectionTime()); |
| } |
| |
| } catch (Exception ex) { |
| logger.error("process", null, e); |
| ex.printStackTrace(); |
| } |
| |
| } |
| private void killChildProcess(final String pid, final String signal) { |
| // spawn a thread that will do kill -15, wait for 1 minute and kill the process |
| // hard if it is still alive |
| (new Thread() { |
| public void run() { |
| String c_launcher_path = |
| Utils.resolvePlaceholderIfExists( |
| System.getProperty("ducc.agent.launcher.ducc_spawn_path"),System.getProperties()); |
| try { |
| String[] killCmd=null; |
| String useSpawn = System.getProperty("ducc.agent.launcher.use.ducc_spawn"); |
| if ( useSpawn != null && useSpawn.toLowerCase().equals("true")) { |
| killCmd = new String[] { c_launcher_path, |
| "-u", ((ManagedProcess)managedProcess).getOwner(), "--","/bin/kill",signal,((ManagedProcess) managedProcess).getDuccProcess().getPID() }; |
| } else { |
| killCmd = new String[] { "/bin/kill","-15",((ManagedProcess) managedProcess).getDuccProcess().getPID() }; |
| } |
| ProcessBuilder pb = new ProcessBuilder(killCmd); |
| Process p = pb.start(); |
| p.wait(1000 * 60); // wait for 1 minute and whack the process if still alive |
| p.destroy(); |
| } catch( Exception e) { |
| logger.error("killChildProcess", managedProcess.getWorkDuccId(), e); |
| } |
| } |
| }).start(); |
| |
| |
| } |
| |
| } |