blob: 07b3deadc4d6c6fbe40608a8aa278799c25f41d6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.QuantileEstimator;
import org.apache.hadoop.yarn.api.records.ContainerId;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import static org.apache.hadoop.metrics2.lib.Interns.info;
@InterfaceAudience.Private
@Metrics(context="container")
public class ContainerMetrics implements MetricsSource {
public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimitMBs";
public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimitMBs";
public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit";
public static final String PMEM_USAGE_METRIC_NAME = "pMemUsageMBs";
public static final String PMEM_USAGE_QUANTILES_NAME = "pMemUsageMBHistogram";
public static final String LAUNCH_DURATION_METRIC_NAME = "launchDurationMs";
public static final String LOCALIZATION_DURATION_METRIC_NAME =
"localizationDurationMs";
private static final String PHY_CPU_USAGE_METRIC_NAME = "pCpuUsagePercent";
private static final String PHY_CPU_USAGE_QUANTILES_NAME =
"pCpuUsagePercentHistogram";
// Use a multiplier of 1000 to avoid losing too much precision when
// converting to integers
private static final String VCORE_USAGE_METRIC_NAME = "milliVcoreUsage";
@Metric
public MutableStat pMemMBsStat;
@Metric
public MutableQuantiles pMemMBQuantiles;
// This tracks overall CPU percentage of the machine in terms of percentage
// of 1 core similar to top
// Thus if you use 2 cores completely out of 4 available cores this value
// will be 200
@Metric
public MutableStat cpuCoreUsagePercent;
@Metric
public MutableQuantiles cpuCoreUsagePercentQuantiles;
@Metric
public MutableStat milliVcoresUsed;
@Metric
public MutableGaugeInt pMemLimitMbs;
@Metric
public MutableGaugeInt vMemLimitMbs;
@Metric
public MutableGaugeInt cpuVcoreLimit;
@Metric
public MutableGaugeLong launchDurationMs;
@Metric
public MutableGaugeLong localizationDurationMs;
@Metric
public MutableGaugeLong startTime;
@Metric
public MutableGaugeLong finishTime;
@Metric
public MutableGaugeInt exitCode;
static final MetricsInfo RECORD_INFO =
info("ContainerResource", "Resource limit and usage by container");
public static final MetricsInfo PROCESSID_INFO =
info("ContainerPid", "Container Process Id");
final MetricsInfo recordInfo;
final MetricsRegistry registry;
final ContainerId containerId;
final MetricsSystem metricsSystem;
// Metrics publishing status
private final long flushPeriodMs;
private final long unregisterDelayMs;
private boolean flushOnPeriod = false; // true if period elapsed
private boolean finished = false; // true if container finished
private Timer timer; // lazily initialized
/**
* Simple metrics cache to help prevent re-registrations.
*/
protected final static Map<ContainerId, ContainerMetrics>
usageMetrics = new HashMap<>();
// Create a timer to unregister container metrics,
// whose associated thread run as a daemon.
private final static Timer unregisterContainerMetricsTimer =
new Timer("Container metrics unregistration", true);
ContainerMetrics(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
long delayMs) {
this.recordInfo =
info(sourceName(containerId), RECORD_INFO.description());
this.registry = new MetricsRegistry(recordInfo);
this.metricsSystem = ms;
this.containerId = containerId;
this.flushPeriodMs = flushPeriodMs;
this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs;
scheduleTimerTaskIfRequired();
this.pMemMBsStat = registry.newStat(
PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true);
this.pMemMBQuantiles = registry
.newQuantiles(PMEM_USAGE_QUANTILES_NAME, "Physical memory quantiles",
"Usage", "MBs", 1);
ContainerMetricsQuantiles memEstimator =
new ContainerMetricsQuantiles(MutableQuantiles.quantiles);
pMemMBQuantiles.setEstimator(memEstimator);
this.cpuCoreUsagePercent = registry.newStat(
PHY_CPU_USAGE_METRIC_NAME, "Physical Cpu core percent usage stats",
"Usage", "Percents", true);
this.cpuCoreUsagePercentQuantiles = registry
.newQuantiles(PHY_CPU_USAGE_QUANTILES_NAME,
"Physical Cpu core percent usage quantiles", "Usage", "Percents",
1);
ContainerMetricsQuantiles cpuEstimator =
new ContainerMetricsQuantiles(MutableQuantiles.quantiles);
cpuCoreUsagePercentQuantiles.setEstimator(cpuEstimator);
this.milliVcoresUsed = registry.newStat(
VCORE_USAGE_METRIC_NAME, "1000 times Vcore usage", "Usage",
"MilliVcores", true);
this.pMemLimitMbs = registry.newGauge(
PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0);
this.vMemLimitMbs = registry.newGauge(
VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0);
this.cpuVcoreLimit = registry.newGauge(
VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0);
this.launchDurationMs = registry.newGauge(
LAUNCH_DURATION_METRIC_NAME, "Launch duration in MS", 0L);
this.localizationDurationMs = registry.newGauge(
LOCALIZATION_DURATION_METRIC_NAME, "Localization duration in MS", 0L);
}
ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
registry.tag(info, containerId.toString());
return this;
}
static String sourceName(ContainerId containerId) {
return RECORD_INFO.name() + "_" + containerId.toString();
}
public static ContainerMetrics forContainer(
ContainerId containerId, long flushPeriodMs, long delayMs) {
return forContainer(
DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
}
public synchronized static ContainerMetrics getContainerMetrics(
ContainerId containerId) {
// could be null
return usageMetrics.get(containerId);
}
synchronized static ContainerMetrics forContainer(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
long delayMs) {
ContainerMetrics metrics = usageMetrics.get(containerId);
if (metrics == null) {
metrics = new ContainerMetrics(ms, containerId, flushPeriodMs,
delayMs).tag(RECORD_INFO, containerId);
// Register with the MetricsSystems
if (ms != null) {
metrics =
ms.register(sourceName(containerId),
"Metrics for container: " + containerId, metrics);
}
usageMetrics.put(containerId, metrics);
}
return metrics;
}
synchronized static void unregisterContainerMetrics(ContainerMetrics cm) {
cm.metricsSystem.unregisterSource(cm.recordInfo.name());
usageMetrics.remove(cm.containerId);
}
@Override
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
//Container goes through registered -> finished -> unregistered.
if (finished || flushOnPeriod) {
registry.snapshot(collector.addRecord(registry.info()), all);
}
if (!finished && flushOnPeriod) {
flushOnPeriod = false;
scheduleTimerTaskIfRequired();
}
}
public synchronized void finished() {
if (!finished) {
this.finished = true;
if (timer != null) {
timer.cancel();
timer = null;
}
scheduleTimerTaskForUnregistration();
this.pMemMBQuantiles.stop();
this.cpuCoreUsagePercentQuantiles.stop();
}
}
public void recordMemoryUsage(int memoryMBs) {
if (memoryMBs >= 0) {
this.pMemMBsStat.add(memoryMBs);
this.pMemMBQuantiles.add(memoryMBs);
}
}
public void recordCpuUsage(
int totalPhysicalCpuPercent, int milliVcoresUsed) {
if (totalPhysicalCpuPercent >=0) {
this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent);
this.cpuCoreUsagePercentQuantiles.add(totalPhysicalCpuPercent);
}
if (milliVcoresUsed >= 0) {
this.milliVcoresUsed.add(milliVcoresUsed);
}
}
public void recordProcessId(String processId) {
registry.tag(PROCESSID_INFO, processId);
}
public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) {
this.vMemLimitMbs.set(vmemLimit);
this.pMemLimitMbs.set(pmemLimit);
this.cpuVcoreLimit.set(cpuVcores);
}
public void recordStateChangeDurations(long launchDuration,
long localizationDuration) {
this.launchDurationMs.set(launchDuration);
this.localizationDurationMs.set(localizationDuration);
}
public void recordStartTime(long startTime) {
this.startTime.set(startTime);
}
public void recordFinishTimeAndExitCode(long finishTime, int exitCode) {
this.finishTime.set(finishTime);
this.exitCode.set(exitCode);
}
private synchronized void scheduleTimerTaskIfRequired() {
if (flushPeriodMs > 0) {
// Lazily initialize timer
if (timer == null) {
this.timer = new Timer("Metrics flush checker", true);
}
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
synchronized (ContainerMetrics.this) {
if (!finished) {
flushOnPeriod = true;
}
}
}
};
timer.schedule(timerTask, flushPeriodMs);
}
}
private void scheduleTimerTaskForUnregistration() {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
}
};
unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs);
}
public static class ContainerMetricsQuantiles implements QuantileEstimator {
private final Histogram histogram = new Histogram(new UniformReservoir());
private Quantile[] quantiles;
ContainerMetricsQuantiles(Quantile[] q) {
quantiles = q;
}
@Override
public synchronized void insert(long value) {
histogram.update(value);
}
@Override
synchronized public long getCount() {
return histogram.getCount();
}
@Override
synchronized public void clear() {
// don't do anything because we want metrics over the lifetime of the
// container
}
@Override
public synchronized Map<Quantile, Long> snapshot() {
Snapshot snapshot = histogram.getSnapshot();
Map<Quantile, Long> values = new TreeMap<>();
for (Quantile quantile : quantiles) {
values.put(quantile, (long) snapshot.getValue(quantile.quantile));
}
return values;
}
}
}