blob: 7102467a1066f0a18567fdb8b205a7dc950a2f4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.counters;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.*;
/**
* An abstract class to provide common implementation for the Counters
* container in both mapred and mapreduce packages.
*
* @param <C> type of counter inside the counters
* @param <G> type of group inside the counters
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class AbstractCounters<C extends Counter,
G extends CounterGroupBase<C>>
implements Writable, Iterable<G> {
protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
/**
* A cache from enum values to the associated counter.
*/
private Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
private Map<String, G> fgroups = Maps.newTreeMap(); // framework & fs groups
private Map<String, G> groups = Maps.newTreeMap(); // other groups
private final CounterGroupFactory<C, G> groupFactory;
// For framework counter serialization without strings
enum GroupType { FRAMEWORK, FILESYSTEM };
// Writes only framework and fs counters if false.
private boolean writeAllCounters = true;
private static final Map<String, String> legacyMap = Maps.newHashMap();
static {
legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
TaskCounter.class.getName());
legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
JobCounter.class.getName());
}
private final Limits limits = new Limits();
@InterfaceAudience.Private
public AbstractCounters(CounterGroupFactory<C, G> gf) {
groupFactory = gf;
}
/**
* Construct from another counters object.
* @param <C1> type of the other counter
* @param <G1> type of the other counter group
* @param counters the counters object to copy
* @param groupFactory the factory for new groups
*/
@InterfaceAudience.Private
public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters,
CounterGroupFactory<C, G> groupFactory) {
this.groupFactory = groupFactory;
for(G1 group: counters) {
String name = group.getName();
G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
(isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
for(Counter counter: group) {
newGroup.addCounter(counter.getName(), counter.getDisplayName(),
counter.getValue());
}
}
}
/** Add a group.
* @param group object to add
* @return the group
*/
@InterfaceAudience.Private
public synchronized G addGroup(G group) {
String name = group.getName();
if (isFrameworkGroup(name)) {
fgroups.put(name, group);
} else {
limits.checkGroups(groups.size() + 1);
groups.put(name, group);
}
return group;
}
/**
* Add a new group
* @param name of the group
* @param displayName of the group
* @return the group
*/
@InterfaceAudience.Private
public G addGroup(String name, String displayName) {
return addGroup(groupFactory.newGroup(name, displayName, limits));
}
/**
* Find a counter, create one if necessary
* @param groupName of the counter
* @param counterName name of the counter
* @return the matching counter
*/
public C findCounter(String groupName, String counterName) {
G grp = getGroup(groupName);
return grp.findCounter(counterName);
}
/**
* Find the counter for the given enum. The same enum will always return the
* same counter.
* @param key the counter key
* @return the matching counter object
*/
public synchronized C findCounter(Enum<?> key) {
C counter = cache.get(key);
if (counter == null) {
counter = findCounter(key.getDeclaringClass().getName(), key.name());
cache.put(key, counter);
}
return counter;
}
/**
* Find the file system counter for the given scheme and enum.
* @param scheme of the file system
* @param key the enum of the counter
* @return the file system counter
*/
@InterfaceAudience.Private
public synchronized C findCounter(String scheme, FileSystemCounter key) {
return ((FileSystemCounterGroup<C>) getGroup(
FileSystemCounter.class.getName())).findCounter(scheme, key);
}
/**
* Returns the names of all counter classes.
* @return Set of counter names.
*/
public synchronized Iterable<String> getGroupNames() {
return Iterables.concat(fgroups.keySet(), groups.keySet());
}
@Override
public Iterator<G> iterator() {
return Iterators.concat(fgroups.values().iterator(),
groups.values().iterator());
}
/**
* Returns the named counter group, or an empty group if there is none
* with the specified name.
* @param groupName name of the group
* @return the group
*/
public synchronized G getGroup(String groupName) {
boolean isFGroup = isFrameworkGroup(groupName);
G group = isFGroup ? fgroups.get(groupName) : groups.get(groupName);
if (group == null) {
group = groupFactory.newGroup(filterGroupName(groupName), limits);
if (isFGroup) {
fgroups.put(groupName, group);
} else {
limits.checkGroups(groups.size() + 1);
groups.put(groupName, group);
}
}
return group;
}
private String filterGroupName(String oldName) {
String newName = legacyMap.get(oldName);
if (newName == null) {
return limits.filterGroupName(oldName);
}
LOG.warn("Group "+ oldName +" is deprecated. Use "+ newName +" instead");
return newName;
}
/**
* Returns the total number of counters, by summing the number of counters
* in each group.
* @return the total number of counters
*/
public synchronized int countCounters() {
int result = 0;
for (G group : this) {
result += group.size();
}
return result;
}
/**
* Write the set of groups.
* Counters ::= version #fgroups (groupId, group)* #groups (group)*
*/
@Override
public synchronized void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, groupFactory.version());
WritableUtils.writeVInt(out, fgroups.size()); // framework groups first
for (G group : fgroups.values()) {
if (group instanceof FrameworkCounterGroup<?, ?>) {
WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
group.write(out);
} else if (group instanceof FileSystemCounterGroup<?>) {
WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
group.write(out);
}
}
if (writeAllCounters) {
WritableUtils.writeVInt(out, groups.size());
for (G group : groups.values()) {
Text.writeString(out, group.getName());
group.write(out);
}
} else {
WritableUtils.writeVInt(out, 0);
}
}
@Override
public synchronized void readFields(DataInput in) throws IOException {
int version = WritableUtils.readVInt(in);
if (version != groupFactory.version()) {
throw new IOException("Counters version mismatch, expected "+
groupFactory.version() +" got "+ version);
}
int numFGroups = WritableUtils.readVInt(in);
fgroups.clear();
GroupType[] groupTypes = GroupType.values();
while (numFGroups-- > 0) {
GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
G group;
switch (groupType) {
case FILESYSTEM: // with nothing
group = groupFactory.newFileSystemGroup();
break;
case FRAMEWORK: // with group id
group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
break;
default: // Silence dumb compiler, as it would've thrown earlier
throw new IOException("Unexpected counter group type: "+ groupType);
}
group.readFields(in);
fgroups.put(group.getName(), group);
}
int numGroups = WritableUtils.readVInt(in);
while (numGroups-- > 0) {
limits.checkGroups(groups.size() + 1);
G group = groupFactory.newGenericGroup(Text.readString(in), null, limits);
group.readFields(in);
groups.put(group.getName(), group);
}
}
/**
* Return textual representation of the counter values.
* @return the string
*/
@Override
public synchronized String toString() {
StringBuilder sb = new StringBuilder("Counters: " + countCounters());
for (G group: this) {
sb.append("\n\t").append(group.getDisplayName());
for (Counter counter: group) {
sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
.append(counter.getValue());
}
}
return sb.toString();
}
/**
* Increments multiple counters by their amounts in another Counters
* instance.
* @param other the other Counters instance
*/
public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
for(G right : other) {
G left = groups.get(right.getName());
if (left == null) {
limits.checkGroups(groups.size() + 1);
left = groupFactory.newGroup(right.getName(), right.getDisplayName(),
limits);
groups.put(right.getName(), left);
}
left.incrAllCounters(right);
}
}
@Override
@SuppressWarnings("unchecked")
public boolean equals(Object genericRight) {
if (genericRight instanceof AbstractCounters<?, ?>) {
return Iterators.elementsEqual(iterator(),
((AbstractCounters<C, G>)genericRight).iterator());
}
return false;
}
@Override
public int hashCode() {
return groups.hashCode();
}
/**
* Set the "writeAllCounters" option to true or false
* @param send if true all counters would be serialized, otherwise only
* framework counters would be serialized in
* {@link #write(DataOutput)}
*/
@InterfaceAudience.Private
public void setWriteAllCounters(boolean send) {
writeAllCounters = send;
}
/**
* Get the "writeAllCounters" option
* @return true of all counters would serialized
*/
@InterfaceAudience.Private
public boolean getWriteAllCounters() {
return writeAllCounters;
}
@InterfaceAudience.Private
public Limits limits() {
return limits;
}
}