blob: 1e42e7a01d9583ed4912ef3054770c8d4a1ac583 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.metrics2.lib.Interns.info;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Splitter;
@InterfaceAudience.Private
@Metrics(context="yarn")
public class QueueMetrics implements MetricsSource {
@Metric("# of apps submitted") MutableCounterInt appsSubmitted;
@Metric("# of running apps") MutableGaugeInt appsRunning;
@Metric("# of pending apps") MutableGaugeInt appsPending;
@Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Aggregate # of allocated node-local containers")
MutableCounterLong aggregateNodeLocalContainersAllocated;
@Metric("Aggregate # of allocated rack-local containers")
MutableCounterLong aggregateRackLocalContainersAllocated;
@Metric("Aggregate # of allocated off-switch containers")
MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of preempted containers") MutableCounterLong
aggregateContainersPreempted;
@Metric("Aggregate # of preempted memory seconds") MutableCounterLong
aggregateMemoryMBSecondsPreempted;
@Metric("Aggregate # of preempted vcore seconds") MutableCounterLong
aggregateVcoreSecondsPreempted;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications;
@Metric("App Attempt First Container Allocation Delay")
MutableRate appAttemptFirstContainerAllocationDelay;
@Metric("Aggregate total of preempted memory MB")
MutableCounterLong aggregateMemoryMBPreempted;
@Metric("Aggregate total of preempted vcores")
MutableCounterLong aggregateVcoresPreempted;
//Metrics updated only for "default" partition
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers")
MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of released containers")
MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeLong availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
@Metric("Pending CPU allocation in virtual cores")
MutableGaugeInt pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
private final MutableGaugeInt[] runningTime;
private TimeBucketMetrics<ApplicationId> runBuckets;
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
"Metrics for the resource scheduler");
protected static final MetricsInfo QUEUE_INFO =
info("Queue", "Metrics by queue");
protected static final MetricsInfo USER_INFO =
info("User", "Metrics by user");
protected static final MetricsInfo PARTITION_INFO =
info("Partition", "Metrics by partition");
static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults();
protected final MetricsRegistry registry;
protected final String queueName;
private QueueMetrics parent;
private final Queue parentQueue;
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
private final boolean enableUserMetrics;
protected static final MetricsInfo P_RECORD_INFO =
info("PartitionQueueMetrics", "Metrics for the resource scheduler");
// Use "default" to operate NO_LABEL (default) partition internally
public static final String DEFAULT_PARTITION = "default";
// Use "" to register NO_LABEL (default) partition into metrics system
public static final String DEFAULT_PARTITION_JMX_STR = "";
// Metric Name Delimiter
public static final String METRIC_NAME_DELIMITER = ".";
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
"AllocatedResource.";
private static final String ALLOCATED_RESOURCE_METRIC_DESC =
"Allocated NAME";
private static final String AVAILABLE_RESOURCE_METRIC_PREFIX =
"AvailableResource.";
private static final String AVAILABLE_RESOURCE_METRIC_DESC =
"Available NAME";
private static final String PENDING_RESOURCE_METRIC_PREFIX =
"PendingResource.";
private static final String PENDING_RESOURCE_METRIC_DESC =
"Pending NAME";
private static final String RESERVED_RESOURCE_METRIC_PREFIX =
"ReservedResource.";
private static final String RESERVED_RESOURCE_METRIC_DESC =
"Reserved NAME";
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX =
"AggregatePreemptedSeconds.";
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME";
public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
registry = new MetricsRegistry(RECORD_INFO);
this.queueName = queueName;
this.parent = parent != null ? parent.getMetrics() : null;
this.parentQueue = parent;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>() : null;
this.enableUserMetrics = enableUserMetrics;
metricsSystem = ms;
this.conf = conf;
runningTime = buildBuckets(conf);
createQueueMetricsForCustomResources();
}
protected QueueMetrics tag(MetricsInfo info, String value) {
registry.tag(info, value);
return this;
}
protected static StringBuilder sourceName(String queueName) {
StringBuilder sb = new StringBuilder(RECORD_INFO.name());
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
sb.append(",q").append(i++).append('=').append(node);
}
return sb;
}
static StringBuilder pSourceName(String partition) {
StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
sb.append(",partition").append('=').append(partition);
return sb;
}
static StringBuilder qSourceName(String queueName) {
StringBuilder sb = new StringBuilder();
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
sb.append(",q").append(i++).append('=').append(node);
}
return sb;
}
public synchronized static QueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
enableUserMetrics, conf);
}
/**
* Helper method to clear cache.
*/
@Private
public synchronized static void clearQueueMetrics() {
QUEUE_METRICS.clear();
}
/**
* Simple metrics cache to help prevent re-registrations.
*/
private static final Map<String, QueueMetrics> QUEUE_METRICS =
new HashMap<String, QueueMetrics>();
/**
* Returns the metrics cache to help prevent re-registrations.
*
* @return A string to {@link QueueMetrics} map.
*/
public static Map<String, QueueMetrics> getQueueMetrics() {
return QUEUE_METRICS;
}
public synchronized static QueueMetrics forQueue(MetricsSystem ms,
String queueName, Queue parent, boolean enableUserMetrics,
Configuration conf) {
QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
.tag(QUEUE_INFO, queueName);
// Register with the MetricsSystems
if (ms != null) {
metrics = ms.register(sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
}
getQueueMetrics().put(queueName, metrics);
}
return metrics;
}
public synchronized QueueMetrics getUserMetrics(String userName) {
if (users == null) {
return null;
}
QueueMetrics metrics = users.get(userName);
if (metrics == null) {
metrics =
new QueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
"Metrics for user '"+ userName +"' in queue '"+ queueName +"'",
metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName));
}
return metrics;
}
/**
* Partition * Queue Metrics
*
* Computes Metrics at Partition (Node Label) * Queue Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* QueueMetrics (A)
* metrics
* QueueMetrics (A1)
* metrics
* QueueMetrics (A2)
* metrics
* QueueMetrics (B)
* metrics
*
* @param partition
* @return QueueMetrics
*/
public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
String partitionJMXStr = partition;
if ((partition == null)
|| (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
partition = DEFAULT_PARTITION;
partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
}
String metricName = partition + METRIC_NAME_DELIMITER + this.queueName;
QueueMetrics metrics = getQueueMetrics().get(metricName);
if (metrics == null) {
QueueMetrics queueMetrics =
new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
this.enableUserMetrics, this.conf, partition);
metricsSystem.register(
pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
.toString(),
"Metrics for queue: " + this.queueName,
queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
this.queueName));
getQueueMetrics().put(metricName, queueMetrics);
return queueMetrics;
} else {
return metrics;
}
}
/**
* Partition Metrics
*
* Computes Metrics at Partition (Node Label) Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* metrics
*
* @param partition
* @return QueueMetrics
*/
private QueueMetrics getPartitionMetrics(String partition) {
String partitionJMXStr = partition;
if ((partition == null)
|| (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
partition = DEFAULT_PARTITION;
partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
}
String metricName = partition + METRIC_NAME_DELIMITER;
QueueMetrics metrics = getQueueMetrics().get(metricName);
if (metrics == null) {
metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null,
false, this.conf, partition);
// Register with the MetricsSystems
if (metricsSystem != null) {
metricsSystem.register(pSourceName(partitionJMXStr).toString(),
"Metrics for partition: " + partitionJMXStr,
(PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
partitionJMXStr));
}
getQueueMetrics().put(metricName, metrics);
}
return metrics;
}
private ArrayList<Integer> parseInts(String value) {
ArrayList<Integer> result = new ArrayList<Integer>();
for(String s: value.split(",")) {
result.add(Integer.parseInt(s.trim()));
}
return result;
}
private MutableGaugeInt[] buildBuckets(Configuration conf) {
ArrayList<Integer> buckets =
parseInts(conf.get(YarnConfiguration.RM_METRICS_RUNTIME_BUCKETS,
YarnConfiguration.DEFAULT_RM_METRICS_RUNTIME_BUCKETS));
MutableGaugeInt[] result = new MutableGaugeInt[buckets.size() + 1];
result[0] = registry.newGauge("running_0", "", 0);
long[] cuts = new long[buckets.size()];
for(int i=0; i < buckets.size(); ++i) {
result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
cuts[i] = buckets.get(i) * 1000L * 60; // covert from min to ms
}
this.runBuckets = new TimeBucketMetrics<ApplicationId>(cuts);
return result;
}
private void updateRunningTime() {
int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
for(int i=0; i < counts.length; ++i) {
runningTime[i].set(counts[i]);
}
}
public void getMetrics(MetricsCollector collector, boolean all) {
updateRunningTime();
registry.snapshot(collector.addRecord(registry.info()), all);
}
public void submitApp(String user) {
appsSubmitted.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitApp(user);
}
if (parent != null) {
parent.submitApp(user);
}
}
public void submitAppAttempt(String user) {
appsPending.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitAppAttempt(user);
}
if (parent != null) {
parent.submitAppAttempt(user);
}
}
public void runAppAttempt(ApplicationId appId, String user) {
runBuckets.add(appId, System.currentTimeMillis());
appsRunning.incr();
appsPending.decr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.runAppAttempt(appId, user);
}
if (parent != null) {
parent.runAppAttempt(appId, user);
}
}
public void finishAppAttempt(
ApplicationId appId, boolean isPending, String user) {
runBuckets.remove(appId);
if (isPending) {
appsPending.decr();
} else {
appsRunning.decr();
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishAppAttempt(appId, isPending, user);
}
if (parent != null) {
parent.finishAppAttempt(appId, isPending, user);
}
}
public void finishApp(String user, RMAppState rmAppFinalState) {
switch (rmAppFinalState) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishApp(user, rmAppFinalState);
}
if (parent != null) {
parent.finishApp(user, rmAppFinalState);
}
}
public void moveAppFrom(AppSchedulingInfo app) {
if (app.isPending()) {
appsPending.decr();
} else {
appsRunning.decr();
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppFrom(app);
}
if (parent != null) {
parent.moveAppFrom(app);
}
}
public void moveAppTo(AppSchedulingInfo app) {
if (app.isPending()) {
appsPending.incr();
} else {
appsRunning.incr();
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppTo(app);
}
if (parent != null) {
parent.moveAppTo(app);
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
* @param partition Node Partition
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(String partition, Resource limit) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
setAvailableResources(limit);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.setAvailableResources(limit);
if(this.queueName.equals("root")) {
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.setAvailableResources(limit);
}
}
}
}
/**
* Set Available resources with support for resource vectors.
*
* @param limit
*/
public void setAvailableResources(Resource limit) {
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAvailableValues(), registry,
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
*
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
this.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, limit);
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
*
* @param partition Node Partition
* @param user
* @param limit resource limit
*/
public void setAvailableResourcesToUser(String partition, String user,
Resource limit) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAvailableResources(limit);
}
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
QueueMetrics partitionUserMetrics =
partitionQueueMetrics.getUserMetrics(user);
if (partitionUserMetrics != null) {
partitionUserMetrics.setAvailableResources(limit);
}
}
}
/**
* Increment pending resource metrics
*
* @param partition Node Partition
* @param user
* @param containers
* @param res the TOTAL delta of resources note this is different from the
* other APIs which use per container resource
*/
public void incrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalIncrPendingResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalIncrPendingResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.incrementPendingResources(containers, res);
}
}
}
public void internalIncrPendingResources(String partition, String user,
int containers, Resource res) {
incrementPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalIncrPendingResources(partition, user, containers,
res);
}
if (parent != null) {
parent.internalIncrPendingResources(partition, user, containers, res);
}
}
protected void createQueueMetricsForCustomResources() {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
this.queueMetricsForCustomResources =
new QueueMetricsForCustomResources();
registerCustomResources();
}
}
protected void registerCustomResources() {
Map<String, Long> customResources =
queueMetricsForCustomResources.initAndGetCustomResources();
queueMetricsForCustomResources
.registerCustomResources(customResources, this.registry);
queueMetricsForCustomResources
.registerCustomResources(customResources, this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
queueMetricsForCustomResources
.registerCustomResources(customResources, this.registry,
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
queueMetricsForCustomResources
.registerCustomResources(customResources, this.registry,
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
}
private void incrementPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getPendingValues(), this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
}
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalDecrPendingResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalDecrPendingResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.decrementPendingResources(containers, res);
}
}
}
protected void internalDecrPendingResources(String partition, String user,
int containers, Resource res) {
decrementPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalDecrPendingResources(partition, user, containers,
res);
}
if (parent != null) {
parent.internalDecrPendingResources(partition, user, containers, res);
}
}
private void decrementPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res, containers);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getPendingValues(), this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
}
public void incrNodeTypeAggregations(String user, NodeType type) {
if (type == NodeType.NODE_LOCAL) {
aggregateNodeLocalContainersAllocated.incr();
} else if (type == NodeType.RACK_LOCAL) {
aggregateRackLocalContainersAllocated.incr();
} else if (type == NodeType.OFF_SWITCH) {
aggregateOffSwitchContainersAllocated.incr();
} else {
return;
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incrNodeTypeAggregations(user, type);
}
if (parent != null) {
parent.incrNodeTypeAggregations(user, type);
}
}
public void allocateResources(String partition, String user, int containers,
Resource res, boolean decrPending) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalAllocateResources(partition, user, containers, res, decrPending);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalAllocateResources(partition, user,
containers, res, decrPending);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.computeAllocateResources(containers, res, decrPending);
}
}
}
public void internalAllocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
computeAllocateResources(containers, res, decrPending);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalAllocateResources(partition, user, containers, res,
decrPending);
}
if (parent != null) {
parent.internalAllocateResources(partition, user, containers, res,
decrPending);
}
}
/**
* Allocate Resources for a partition with support for resource vectors.
*
* @param containers number of containers
* @param res resource containing memory size, vcores etc
* @param decrPending decides whether to decrease pending resource or not
*/
private void computeAllocateResources(int containers, Resource res,
boolean decrPending) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(), this.registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
if (decrPending) {
decrementPendingResources(containers, res);
}
}
/**
* Allocate Resource for container size change.
* @param partition Node Partition
* @param user
* @param res
*/
public void allocateResources(String partition, String user, Resource res) {
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(), this.registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getPendingValues(), this.registry,
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(partition, user, res);
}
if (parent != null) {
parent.allocateResources(partition, user, res);
}
}
public void releaseResources(String partition, String user, int containers,
Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalReleaseResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalReleaseResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.computeReleaseResources(containers, res);
}
}
}
public void internalReleaseResources(String partition, String user,
int containers, Resource res) {
computeReleaseResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalReleaseResources(partition, user, containers, res);
}
if (parent != null) {
parent.internalReleaseResources(partition, user, containers, res);
}
}
/**
* Release Resources for a partition with support for resource vectors.
*
* @param containers number of containers
* @param res resource containing memory size, vcores etc
*/
private void computeReleaseResources(int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(), this.registry,
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
}
public void preemptContainer() {
aggregateContainersPreempted.incr();
if (parent != null) {
parent.preemptContainer();
}
}
public void updatePreemptedMemoryMBSeconds(long mbSeconds) {
aggregateMemoryMBSecondsPreempted.incr(mbSeconds);
if (parent != null) {
parent.updatePreemptedMemoryMBSeconds(mbSeconds);
}
}
public void updatePreemptedVcoreSeconds(long vcoreSeconds) {
aggregateVcoreSecondsPreempted.incr(vcoreSeconds);
if (parent != null) {
parent.updatePreemptedVcoreSeconds(vcoreSeconds);
}
}
public void updatePreemptedResources(Resource res) {
aggregateMemoryMBPreempted.incr(res.getMemorySize());
aggregateVcoresPreempted.incr(res.getVirtualCores());
if (parent != null) {
parent.updatePreemptedResources(res);
}
}
public void updatePreemptedForCustomResources(Resource res) {
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAggregatedPreempted(res);
}
if (parent != null) {
parent.updatePreemptedForCustomResources(res);
}
}
public void updatePreemptedSecondsForCustomResources(Resource res,
long seconds) {
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources
.increaseAggregatedPreemptedSeconds(res, seconds);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getAggregatePreemptedSeconds()
.getValues(), this.registry,
AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX,
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
}
if (parent != null) {
parent.updatePreemptedSecondsForCustomResources(res, seconds);
}
}
public void reserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalReserveResources(partition, user, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalReserveResources(partition, user, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.incrReserveResources(res);
}
}
}
protected void internalReserveResources(String partition, String user,
Resource res) {
incrReserveResources(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalReserveResources(partition, user, res);
}
if (parent != null) {
parent.internalReserveResources(partition, user, res);
}
}
public void incrReserveResources(Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseReserved(res);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getReservedValues(), this.registry,
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
}
public void unreserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalUnReserveResources(partition, user, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalUnReserveResources(partition, user, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.decrReserveResource(res);
}
}
}
protected void internalUnReserveResources(String partition, String user,
Resource res) {
decrReserveResource(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalUnReserveResources(partition, user, res);
}
if (parent != null) {
parent.internalUnReserveResources(partition, user, res);
}
}
public void decrReserveResource(Resource res) {
int containers = 1;
reservedContainers.decr(containers);
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseReserved(res);
queueMetricsForCustomResources.registerCustomResources(
queueMetricsForCustomResources.getReservedValues(), this.registry,
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
}
public void incrActiveUsers() {
activeUsers.incr();
}
public void decrActiveUsers() {
activeUsers.decr();
}
public void activateApp(String user) {
activeApplications.incr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.activateApp(user);
}
if (parent != null) {
parent.activateApp(user);
}
}
public void deactivateApp(String user) {
activeApplications.decr();
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.deactivateApp(user);
}
if (parent != null) {
parent.deactivateApp(user);
}
}
public void addAppAttemptFirstContainerAllocationDelay(long latency) {
appAttemptFirstContainerAllocationDelay.add(latency);
}
public int getAppsSubmitted() {
return appsSubmitted.value();
}
public int getAppsRunning() {
return appsRunning.value();
}
public int getAppsPending() {
return appsPending.value();
}
public int getAppsCompleted() {
return appsCompleted.value();
}
public int getAppsKilled() {
return appsKilled.value();
}
public int getAppsFailed() {
return appsFailed.value();
}
public Resource getAllocatedResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),
queueMetricsForCustomResources.getAllocatedValues());
}
return Resource.newInstance(allocatedMB.value(),
allocatedVCores.value());
}
public Resource getAvailableResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(availableMB.value(), availableVCores.value(),
queueMetricsForCustomResources.getAvailableValues());
}
return Resource.newInstance(availableMB.value(), availableVCores.value());
}
public Resource getPendingResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(pendingMB.value(), pendingVCores.value(),
queueMetricsForCustomResources.getPendingValues());
}
return Resource.newInstance(pendingMB.value(), pendingVCores.value());
}
public Resource getReservedResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(reservedMB.value(), reservedVCores.value(),
queueMetricsForCustomResources.getReservedValues());
}
return Resource.newInstance(reservedMB.value(), reservedVCores.value());
}
/**
* Handle this specially as this has a long value and it could be
* truncated when casted into an int parameter of
* Resource.newInstance (vCores).
* @return QueueMetricsCustomResource
*/
@VisibleForTesting
public CustomResourceMetricValue getAggregatedPreemptedSecondsResources() {
return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
}
@VisibleForTesting
public MutableCounterLong getAggregateMemoryMBSecondsPreempted() {
return aggregateMemoryMBSecondsPreempted;
}
@VisibleForTesting
public MutableCounterLong getAggregateVcoreSecondsPreempted() {
return aggregateVcoreSecondsPreempted;
}
@VisibleForTesting
public long getAggregateMemoryMBPreempted() {
return aggregateMemoryMBPreempted.value();
}
@VisibleForTesting
public long getAggregateVcoresPreempted() {
return aggregateVcoresPreempted.value();
}
public long getAllocatedMB() {
return allocatedMB.value();
}
public int getAllocatedVirtualCores() {
return allocatedVCores.value();
}
public int getAllocatedContainers() {
return allocatedContainers.value();
}
public long getAvailableMB() {
return availableMB.value();
}
public int getAvailableVirtualCores() {
return availableVCores.value();
}
public long getPendingMB() {
return pendingMB.value();
}
public int getPendingVirtualCores() {
return pendingVCores.value();
}
public int getPendingContainers() {
return pendingContainers.value();
}
public long getReservedMB() {
return reservedMB.value();
}
public int getReservedVirtualCores() {
return reservedVCores.value();
}
public int getReservedContainers() {
return reservedContainers.value();
}
public int getActiveUsers() {
return activeUsers.value();
}
public int getActiveApps() {
return activeApplications.value();
}
public MetricsSystem getMetricsSystem() {
return metricsSystem;
}
public long getAggregateAllocatedContainers() {
return aggregateContainersAllocated.value();
}
public long getAggregateNodeLocalContainersAllocated() {
return aggregateNodeLocalContainersAllocated.value();
}
public long getAggregateRackLocalContainersAllocated() {
return aggregateRackLocalContainersAllocated.value();
}
public long getAggregateOffSwitchContainersAllocated() {
return aggregateOffSwitchContainersAllocated.value();
}
public long getAggegatedReleasedContainers() {
return aggregateContainersReleased.value();
}
public long getAggregatePreemptedContainers() {
return aggregateContainersPreempted.value();
}
/**
* Fills in Resource values from available metrics values of custom resources
* to @code{targetResource}, only if the corresponding
* value of @code{targetResource} is zero.
* If @code{fromResource} has a value less than the available metrics value
* for a particular resource, it will be set to the @code{targetResource}
* instead.
*
* @param fromResource The resource to compare available resource values with.
* @param targetResource The resource to save the values into.
*/
public void fillInValuesFromAvailableResources(Resource fromResource,
Resource targetResource) {
if (queueMetricsForCustomResources != null) {
CustomResourceMetricValue availableResources =
queueMetricsForCustomResources.getAvailable();
// We expect all custom resources contained in availableResources,
// so we will loop through all of them.
for (Map.Entry<String, Long> availableEntry : availableResources
.getValues().entrySet()) {
String resourceName = availableEntry.getKey();
// We only update the value if fairshare is 0 for that resource.
if (targetResource.getResourceValue(resourceName) == 0) {
Long availableValue = availableEntry.getValue();
long value = Math.min(availableValue,
fromResource.getResourceValue(resourceName));
targetResource.setResourceValue(resourceName, value);
}
}
}
}
@VisibleForTesting
public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
return this.queueMetricsForCustomResources;
}
protected void setQueueMetricsForCustomResources(
QueueMetricsForCustomResources metrics) {
this.queueMetricsForCustomResources = metrics;
}
public void setParent(QueueMetrics parent) {
this.parent = parent;
}
public Queue getParentQueue() {
return parentQueue;
}
}