| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; |
| |
| import com.codahale.metrics.Histogram; |
| import com.codahale.metrics.Snapshot; |
| import com.codahale.metrics.UniformReservoir; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.metrics2.MetricsCollector; |
| import org.apache.hadoop.metrics2.MetricsInfo; |
| import org.apache.hadoop.metrics2.MetricsSource; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.hadoop.metrics2.annotation.Metric; |
| import org.apache.hadoop.metrics2.annotation.Metrics; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.metrics2.lib.MetricsRegistry; |
| import org.apache.hadoop.metrics2.lib.MutableGaugeInt; |
| import org.apache.hadoop.metrics2.lib.MutableGaugeLong; |
| import org.apache.hadoop.metrics2.lib.MutableQuantiles; |
| import org.apache.hadoop.metrics2.lib.MutableStat; |
| import org.apache.hadoop.metrics2.util.Quantile; |
| import org.apache.hadoop.metrics2.util.QuantileEstimator; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.TreeMap; |
| |
| import static org.apache.hadoop.metrics2.lib.Interns.info; |
| |
| @InterfaceAudience.Private |
| @Metrics(context="container") |
| public class ContainerMetrics implements MetricsSource { |
| |
| public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimitMBs"; |
| public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimitMBs"; |
| public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit"; |
| public static final String PMEM_USAGE_METRIC_NAME = "pMemUsageMBs"; |
| public static final String PMEM_USAGE_QUANTILES_NAME = "pMemUsageMBHistogram"; |
| public static final String LAUNCH_DURATION_METRIC_NAME = "launchDurationMs"; |
| public static final String LOCALIZATION_DURATION_METRIC_NAME = |
| "localizationDurationMs"; |
| private static final String PHY_CPU_USAGE_METRIC_NAME = "pCpuUsagePercent"; |
| private static final String PHY_CPU_USAGE_QUANTILES_NAME = |
| "pCpuUsagePercentHistogram"; |
| |
| // Use a multiplier of 1000 to avoid losing too much precision when |
| // converting to integers |
| private static final String VCORE_USAGE_METRIC_NAME = "milliVcoreUsage"; |
| |
| @Metric |
| public MutableStat pMemMBsStat; |
| |
| @Metric |
| public MutableQuantiles pMemMBQuantiles; |
| |
| // This tracks overall CPU percentage of the machine in terms of percentage |
| // of 1 core similar to top |
| // Thus if you use 2 cores completely out of 4 available cores this value |
| // will be 200 |
| @Metric |
| public MutableStat cpuCoreUsagePercent; |
| |
| @Metric |
| public MutableQuantiles cpuCoreUsagePercentQuantiles; |
| |
| @Metric |
| public MutableStat milliVcoresUsed; |
| |
| @Metric |
| public MutableGaugeInt pMemLimitMbs; |
| |
| @Metric |
| public MutableGaugeInt vMemLimitMbs; |
| |
| @Metric |
| public MutableGaugeInt cpuVcoreLimit; |
| |
| @Metric |
| public MutableGaugeLong launchDurationMs; |
| |
| @Metric |
| public MutableGaugeLong localizationDurationMs; |
| |
| @Metric |
| public MutableGaugeLong startTime; |
| |
| @Metric |
| public MutableGaugeLong finishTime; |
| |
| @Metric |
| public MutableGaugeInt exitCode; |
| |
| static final MetricsInfo RECORD_INFO = |
| info("ContainerResource", "Resource limit and usage by container"); |
| |
| public static final MetricsInfo PROCESSID_INFO = |
| info("ContainerPid", "Container Process Id"); |
| |
| final MetricsInfo recordInfo; |
| final MetricsRegistry registry; |
| final ContainerId containerId; |
| final MetricsSystem metricsSystem; |
| |
| // Metrics publishing status |
| private final long flushPeriodMs; |
| private final long unregisterDelayMs; |
| private boolean flushOnPeriod = false; // true if period elapsed |
| private boolean finished = false; // true if container finished |
| private Timer timer; // lazily initialized |
| |
| /** |
| * Simple metrics cache to help prevent re-registrations. |
| */ |
| protected final static Map<ContainerId, ContainerMetrics> |
| usageMetrics = new HashMap<>(); |
| // Create a timer to unregister container metrics, |
| // whose associated thread run as a daemon. |
| private final static Timer unregisterContainerMetricsTimer = |
| new Timer("Container metrics unregistration", true); |
| |
| ContainerMetrics( |
| MetricsSystem ms, ContainerId containerId, long flushPeriodMs, |
| long delayMs) { |
| this.recordInfo = |
| info(sourceName(containerId), RECORD_INFO.description()); |
| this.registry = new MetricsRegistry(recordInfo); |
| this.metricsSystem = ms; |
| this.containerId = containerId; |
| this.flushPeriodMs = flushPeriodMs; |
| this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs; |
| scheduleTimerTaskIfRequired(); |
| |
| this.pMemMBsStat = registry.newStat( |
| PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true); |
| this.pMemMBQuantiles = registry |
| .newQuantiles(PMEM_USAGE_QUANTILES_NAME, "Physical memory quantiles", |
| "Usage", "MBs", 1); |
| ContainerMetricsQuantiles memEstimator = |
| new ContainerMetricsQuantiles(MutableQuantiles.quantiles); |
| pMemMBQuantiles.setEstimator(memEstimator); |
| |
| this.cpuCoreUsagePercent = registry.newStat( |
| PHY_CPU_USAGE_METRIC_NAME, "Physical Cpu core percent usage stats", |
| "Usage", "Percents", true); |
| this.cpuCoreUsagePercentQuantiles = registry |
| .newQuantiles(PHY_CPU_USAGE_QUANTILES_NAME, |
| "Physical Cpu core percent usage quantiles", "Usage", "Percents", |
| 1); |
| ContainerMetricsQuantiles cpuEstimator = |
| new ContainerMetricsQuantiles(MutableQuantiles.quantiles); |
| cpuCoreUsagePercentQuantiles.setEstimator(cpuEstimator); |
| this.milliVcoresUsed = registry.newStat( |
| VCORE_USAGE_METRIC_NAME, "1000 times Vcore usage", "Usage", |
| "MilliVcores", true); |
| this.pMemLimitMbs = registry.newGauge( |
| PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0); |
| this.vMemLimitMbs = registry.newGauge( |
| VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0); |
| this.cpuVcoreLimit = registry.newGauge( |
| VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0); |
| this.launchDurationMs = registry.newGauge( |
| LAUNCH_DURATION_METRIC_NAME, "Launch duration in MS", 0L); |
| this.localizationDurationMs = registry.newGauge( |
| LOCALIZATION_DURATION_METRIC_NAME, "Localization duration in MS", 0L); |
| } |
| |
| ContainerMetrics tag(MetricsInfo info, ContainerId containerId) { |
| registry.tag(info, containerId.toString()); |
| return this; |
| } |
| |
| static String sourceName(ContainerId containerId) { |
| return RECORD_INFO.name() + "_" + containerId.toString(); |
| } |
| |
| public static ContainerMetrics forContainer( |
| ContainerId containerId, long flushPeriodMs, long delayMs) { |
| return forContainer( |
| DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs); |
| } |
| |
| public synchronized static ContainerMetrics getContainerMetrics( |
| ContainerId containerId) { |
| // could be null |
| return usageMetrics.get(containerId); |
| } |
| |
| synchronized static ContainerMetrics forContainer( |
| MetricsSystem ms, ContainerId containerId, long flushPeriodMs, |
| long delayMs) { |
| ContainerMetrics metrics = usageMetrics.get(containerId); |
| if (metrics == null) { |
| metrics = new ContainerMetrics(ms, containerId, flushPeriodMs, |
| delayMs).tag(RECORD_INFO, containerId); |
| |
| // Register with the MetricsSystems |
| if (ms != null) { |
| metrics = |
| ms.register(sourceName(containerId), |
| "Metrics for container: " + containerId, metrics); |
| } |
| usageMetrics.put(containerId, metrics); |
| } |
| |
| return metrics; |
| } |
| |
| synchronized static void unregisterContainerMetrics(ContainerMetrics cm) { |
| cm.metricsSystem.unregisterSource(cm.recordInfo.name()); |
| usageMetrics.remove(cm.containerId); |
| } |
| |
| @Override |
| public synchronized void getMetrics(MetricsCollector collector, boolean all) { |
| //Container goes through registered -> finished -> unregistered. |
| if (finished || flushOnPeriod) { |
| registry.snapshot(collector.addRecord(registry.info()), all); |
| } |
| |
| if (!finished && flushOnPeriod) { |
| flushOnPeriod = false; |
| scheduleTimerTaskIfRequired(); |
| } |
| } |
| |
| public synchronized void finished() { |
| if (!finished) { |
| this.finished = true; |
| if (timer != null) { |
| timer.cancel(); |
| timer = null; |
| } |
| scheduleTimerTaskForUnregistration(); |
| this.pMemMBQuantiles.stop(); |
| this.cpuCoreUsagePercentQuantiles.stop(); |
| } |
| } |
| |
| public void recordMemoryUsage(int memoryMBs) { |
| if (memoryMBs >= 0) { |
| this.pMemMBsStat.add(memoryMBs); |
| this.pMemMBQuantiles.add(memoryMBs); |
| } |
| } |
| |
| public void recordCpuUsage( |
| int totalPhysicalCpuPercent, int milliVcoresUsed) { |
| if (totalPhysicalCpuPercent >=0) { |
| this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent); |
| this.cpuCoreUsagePercentQuantiles.add(totalPhysicalCpuPercent); |
| } |
| if (milliVcoresUsed >= 0) { |
| this.milliVcoresUsed.add(milliVcoresUsed); |
| } |
| } |
| |
| public void recordProcessId(String processId) { |
| registry.tag(PROCESSID_INFO, processId); |
| } |
| |
| public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) { |
| this.vMemLimitMbs.set(vmemLimit); |
| this.pMemLimitMbs.set(pmemLimit); |
| this.cpuVcoreLimit.set(cpuVcores); |
| } |
| |
| public void recordStateChangeDurations(long launchDuration, |
| long localizationDuration) { |
| this.launchDurationMs.set(launchDuration); |
| this.localizationDurationMs.set(localizationDuration); |
| } |
| |
| public void recordStartTime(long startTime) { |
| this.startTime.set(startTime); |
| } |
| |
| public void recordFinishTimeAndExitCode(long finishTime, int exitCode) { |
| this.finishTime.set(finishTime); |
| this.exitCode.set(exitCode); |
| } |
| |
| private synchronized void scheduleTimerTaskIfRequired() { |
| if (flushPeriodMs > 0) { |
| // Lazily initialize timer |
| if (timer == null) { |
| this.timer = new Timer("Metrics flush checker", true); |
| } |
| |
| TimerTask timerTask = new TimerTask() { |
| @Override |
| public void run() { |
| synchronized (ContainerMetrics.this) { |
| if (!finished) { |
| flushOnPeriod = true; |
| } |
| } |
| } |
| }; |
| |
| timer.schedule(timerTask, flushPeriodMs); |
| } |
| } |
| |
| private void scheduleTimerTaskForUnregistration() { |
| TimerTask timerTask = new TimerTask() { |
| @Override |
| public void run() { |
| ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this); |
| } |
| }; |
| unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs); |
| } |
| |
| public static class ContainerMetricsQuantiles implements QuantileEstimator { |
| |
| private final Histogram histogram = new Histogram(new UniformReservoir()); |
| |
| private Quantile[] quantiles; |
| |
| ContainerMetricsQuantiles(Quantile[] q) { |
| quantiles = q; |
| } |
| |
| @Override |
| public synchronized void insert(long value) { |
| histogram.update(value); |
| } |
| |
| @Override |
| synchronized public long getCount() { |
| return histogram.getCount(); |
| } |
| |
| @Override |
| synchronized public void clear() { |
| // don't do anything because we want metrics over the lifetime of the |
| // container |
| } |
| |
| @Override |
| public synchronized Map<Quantile, Long> snapshot() { |
| Snapshot snapshot = histogram.getSnapshot(); |
| Map<Quantile, Long> values = new TreeMap<>(); |
| for (Quantile quantile : quantiles) { |
| values.put(quantile, (long) snapshot.getValue(quantile.quantile)); |
| } |
| return values; |
| } |
| } |
| } |