blob: d12ee364f0e1a39e82a4d3c4468f21c81f8bc643 [file] [log] [blame]
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.base.Splitter;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Self;
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@InterfaceAudience.Private
@Metrics(context="yarn")
public class QueueMetrics {
@Metric("# of apps submitted") MutableCounterInt appsSubmitted;
@Metric("# of running apps") MutableGaugeInt appsRunning;
@Metric("# of pending apps") MutableGaugeInt appsPending;
@Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in GiB") MutableGaugeInt allocatedGB;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Available memory in GiB") MutableGaugeInt availableGB;
@Metric("Pending memory allocation in GiB") MutableGaugeInt pendingGB;
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in GiB") MutableGaugeInt reservedGB;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
static final int GB = 1024; // resource.memory is in MB
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
"Metrics for the resource scheduler");
static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
static final MetricsInfo USER_INFO = info("User", "Metrics by user");
static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults();
final MetricsRegistry registry;
final String queueName;
final QueueMetrics parent;
final MetricsSystem metricsSystem;
private final Map<String, QueueMetrics> users;
QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics) {
registry = new MetricsRegistry(RECORD_INFO);
this.queueName = queueName;
this.parent = parent != null ? parent.getMetrics() : null;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
: null;
metricsSystem = ms;
}
QueueMetrics tag(MetricsInfo info, String value) {
registry.tag(info, value);
return this;
}
static StringBuilder sourceName(String queueName) {
StringBuilder sb = new StringBuilder(RECORD_INFO.name());
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
sb.append(",q").append(i++).append('=').append(node);
}
return sb;
}
public synchronized
static QueueMetrics forQueue(String queueName, Queue parent,
boolean enableUserMetrics) {
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
enableUserMetrics);
}
public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
Queue parent, boolean enableUserMetrics) {
QueueMetrics metrics = new QueueMetrics(ms, queueName, parent,
enableUserMetrics).tag(QUEUE_INFO, queueName);
return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
}
synchronized QueueMetrics getUserMetrics(String userName) {
if (users == null) {
return null;
}
QueueMetrics metrics = users.get(userName);
if (metrics == null) {
metrics = new QueueMetrics(metricsSystem, queueName, null, false);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
"Metrics for user '"+ userName +"' in queue '"+ queueName +"'",
metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName));
}
return metrics;
}
public void submitApp(String user) {
appsSubmitted.incr();
appsPending.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitApp(user);
}
if (parent != null) {
parent.submitApp(user);
}
}
public void incrAppsRunning(String user) {
appsRunning.incr();
appsPending.decr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incrAppsRunning(user);
}
if (parent != null) {
parent.incrAppsRunning(user);
}
}
public void finishApp(AppSchedulingInfo app,
RMAppAttemptState rmAppAttemptFinalState) {
switch (rmAppAttemptFinalState) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}
if (app.isPending()) {
appsPending.decr();
} else {
appsRunning.decr();
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.finishApp(app, rmAppAttemptFinalState);
}
if (parent != null) {
parent.finishApp(app, rmAppAttemptFinalState);
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
availableGB.set(limit.getMemory()/GB);
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
* @param user
* @param limit resource limit
*/
public void setAvailableResourcesToUser(String user, Resource limit) {
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAvailableResourcesToQueue(limit);
}
}
/**
* Increment pending resource metrics
* @param user
* @param containers
* @param res the TOTAL delta of resources note this is different from
* the other APIs which use per container resource
*/
public void incrPendingResources(String user, int containers, Resource res) {
_incrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incrPendingResources(user, containers, res);
}
if (parent != null) {
parent.incrPendingResources(user, containers, res);
}
}
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingGB.incr(res.getMemory()/GB);
}
public void decrPendingResources(String user, int containers, Resource res) {
_decrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decrPendingResources(user, containers, res);
}
if (parent != null) {
parent.decrPendingResources(user, containers, res);
}
}
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingGB.decr(res.getMemory()/GB);
}
public void allocateResources(String user, int containers, Resource res) {
allocatedContainers.incr(containers);
allocatedGB.incr(res.getMemory()/GB * containers);
_decrPendingResources(containers, multiply(res, containers));
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(user, containers, res);
}
if (parent != null) {
parent.allocateResources(user, containers, res);
}
}
public void releaseResources(String user, int containers, Resource res) {
allocatedContainers.decr(containers);
allocatedGB.decr(res.getMemory()/GB * containers);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res);
}
if (parent != null) {
parent.releaseResources(user, containers, res);
}
}
public void reserveResource(String user, Resource res) {
reservedContainers.incr();
reservedGB.incr(res.getMemory()/GB);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.reserveResource(user, res);
}
if (parent != null) {
parent.reserveResource(user, res);
}
}
public void unreserveResource(String user, Resource res) {
reservedContainers.decr();
reservedGB.decr(res.getMemory()/GB);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.unreserveResource(user, res);
}
if (parent != null) {
parent.unreserveResource(user, res);
}
}
}