blob: 9863d7b7bae2a257e6c74c1e421afe6b61be251a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.ArrayList;
import org.apache.hadoop.metrics2.MetricsBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.source.JvmMetricsSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
/**
*
*/
@SuppressWarnings("deprecation")
class QueueMetrics implements MetricsSource {
private static final Log LOG =
LogFactory.getLog(QueueMetrics.class);
public static final String BUCKET_PROPERTY =
"mapred.queue.metrics.runtime.buckets";
private static final String DEFAULT_BUCKETS = "60,300,1440";
final MetricsRegistry registry = new MetricsRegistry("Queue");
final MetricMutableCounterInt mapsLaunched =
registry.newCounter("maps_launched", "", 0);
final MetricMutableCounterInt mapsCompleted =
registry.newCounter("maps_completed", "", 0);
final MetricMutableCounterInt mapsFailed =
registry.newCounter("maps_failed", "", 0);
final MetricMutableCounterInt redsLaunched =
registry.newCounter("reduces_launched", "", 0);
final MetricMutableCounterInt redsCompleted =
registry.newCounter("reduces_completed", "", 0);
final MetricMutableCounterInt redsFailed =
registry.newCounter("reduces_failed", "", 0);
final MetricMutableCounterInt jobsSubmitted =
registry.newCounter("jobs_submitted", "", 0);
final MetricMutableCounterInt jobsCompleted =
registry.newCounter("jobs_completed", "", 0);
final MetricMutableGaugeInt waitingMaps =
registry.newGauge("waiting_maps", "", 0);
final MetricMutableGaugeInt waitingReds =
registry.newGauge("waiting_reduces", "", 0);
final MetricMutableGaugeInt reservedMapSlots =
registry.newGauge("reserved_map_slots", "", 0);
final MetricMutableGaugeInt reservedRedSlots =
registry.newGauge("reserved_reduce_slots", "", 0);
final MetricMutableCounterInt jobsFailed =
registry.newCounter("jobs_failed", "", 0);
final MetricMutableCounterInt jobsKilled =
registry.newCounter("jobs_killed", "", 0);
final MetricMutableGaugeInt jobsPreparing =
registry.newGauge("jobs_preparing", "", 0);
final MetricMutableGaugeInt jobsRunning =
registry.newGauge("jobs_running", "", 0);
final MetricMutableCounterInt mapsKilled =
registry.newCounter("maps_killed", "", 0);
final MetricMutableCounterInt redsKilled =
registry.newCounter("reduces_killed", "", 0);
final MetricMutableGaugeInt[] runningTime;
TimeBucketMetrics<JobID> runBuckets;
final String sessionId;
private String queueName;
public QueueMetrics(String queueName, Configuration conf) {
this.queueName = queueName;
sessionId = conf.get("session.id", "");
registry.setContext("mapred").tag("sessionId", "", sessionId);
registry.tag("Queue", "Metrics by queue", queueName);
runningTime = buildBuckets(conf);
}
public String getQueueName() {
return this.queueName;
}
private static ArrayList<Integer> parseInts(String value) {
ArrayList<Integer> result = new ArrayList<Integer>();
for(String word: value.split(",")) {
result.add(Integer.parseInt(word.trim()));
}
return result;
}
private MetricMutableGaugeInt[] buildBuckets(Configuration conf) {
ArrayList<Integer> buckets =
parseInts(conf.get(BUCKET_PROPERTY, DEFAULT_BUCKETS));
MetricMutableGaugeInt[] result =
new MetricMutableGaugeInt[buckets.size() + 1];
result[0] = registry.newGauge("running_0", "", 0);
long[] cuts = new long[buckets.size()];
for(int i=0; i < buckets.size(); ++i) {
result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
cuts[i] = buckets.get(i) * 1000 * 60; // covert from min to ms
}
this.runBuckets = new TimeBucketMetrics<JobID>(cuts);
return result;
}
private void updateRunningTime() {
int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
for(int i=0; i < counts.length; ++i) {
runningTime[i].set(counts[i]);
}
}
public void getMetrics(MetricsBuilder builder, boolean all) {
updateRunningTime();
registry.snapshot(builder.addRecord(registry.name()), all);
}
public void launchMap(TaskAttemptID taskAttemptID, boolean speculative) {
mapsLaunched.incr();
if (!speculative) {
decWaitingMaps(taskAttemptID.getJobID(), 1);
}
}
public void completeMap(TaskAttemptID taskAttemptID) {
mapsCompleted.incr();
}
public void failedMap(TaskAttemptID taskAttemptID, boolean incWaiting) {
mapsFailed.incr();
if (incWaiting) {
addWaitingMaps(taskAttemptID.getJobID(), 1);
}
}
public void launchReduce(TaskAttemptID taskAttemptID, boolean speculative) {
redsLaunched.incr();
if (!speculative) {
decWaitingReduces(taskAttemptID.getJobID(), 1);
}
}
public void completeReduce(TaskAttemptID taskAttemptID) {
redsCompleted.incr();
}
public void failedReduce(TaskAttemptID taskAttemptID, boolean incWaiting) {
redsFailed.incr();
if (incWaiting) {
addWaitingReduces(taskAttemptID.getJobID(), 1);
}
}
public void submitJob(JobConf conf, JobID id) {
jobsSubmitted.incr();
}
public void completeJob(JobConf conf, JobID id) {
jobsCompleted.incr();
}
public void addWaitingMaps(JobID id, int task) {
waitingMaps.incr(task);
}
public void decWaitingMaps(JobID id, int task) {
waitingMaps.decr(task);
}
public void addWaitingReduces(JobID id, int task) {
waitingReds.incr(task);
}
public void decWaitingReduces(JobID id, int task){
waitingReds.decr(task);
}
public void addReservedMapSlots(int slots) {
reservedMapSlots.incr(slots);;
}
public void decReservedMapSlots(int slots) {
reservedMapSlots.decr(slots);
}
public void addReservedReduceSlots(int slots) {
reservedRedSlots.incr(slots);
}
public void decReservedReduceSlots(int slots) {
reservedRedSlots.decr(slots);
}
public void failedJob(JobConf conf, JobID id) {
jobsFailed.incr();
}
public void killedJob(JobConf conf, JobID id) {
jobsKilled.incr();
}
public void addPrepJob(JobConf conf, JobID id) {
jobsPreparing.incr();
}
public void decPrepJob(JobConf conf, JobID id) {
jobsPreparing.decr();
}
public void addRunningJob(JobConf conf, JobID id) {
jobsRunning.incr();
runBuckets.add(id, System.currentTimeMillis());
}
public void decRunningJob(JobConf conf, JobID id) {
jobsRunning.decr();
runBuckets.remove(id);
}
public void killedMap(TaskAttemptID taskAttemptID) {
mapsKilled.incr();
}
public void killedReduce(TaskAttemptID taskAttemptID) {
redsKilled.incr();
}
static QueueMetrics create(String queueName, Configuration conf) {
return create(queueName, conf, DefaultMetricsSystem.INSTANCE);
}
static QueueMetrics create(String queueName, Configuration conf,
MetricsSystem ms) {
return ms.register("QueueMetrics,q=" + queueName, "Queue metrics",
new QueueMetrics(queueName, conf));
}
}