blob: 8f1741ec4e900b9c600deafe11b31c5ba674a9f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.vaidya.statistics.job;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Hashtable;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
/**
*
*/
public class JobStatistics implements JobStatisticsInterface {
/*
* Pattern for parsing the COUNTERS
*/
private static final Pattern _pattern = Pattern.compile("[[^,]?]+"); //"[[^,]?]+"
/*
* Job configuration
*/
private JobConf _jobConf;
/**
* @param jobConf the jobConf to set
*/
void setJobConf(JobConf jobConf) {
this._jobConf = jobConf;
// TODO: Add job conf to _job array
}
/*
* Aggregated Job level counters
*/
private JobHistoryParser.JobInfo _jobInfo;
/*
* Job stats
*/
private java.util.Hashtable<Enum, String> _job;
/**
* @param jobConf the jobConf to set
*/
public JobConf getJobConf() {
return this._jobConf;
}
/*
* Get Job Counters of type long
*/
public long getLongValue(Enum key) {
if (this._job.get(key) == null) {
return (long)0;
}
else {
return Long.parseLong(this._job.get(key));
}
}
/*
* Get job Counters of type Double
*/
public double getDoubleValue(Enum key) {
if (this._job.get(key) == null) {
return (double)0;
} else {
return Double.parseDouble(this._job.get(key));
}
}
/*
* Get Job Counters of type String
*/
public String getStringValue(Enum key) {
if (this._job.get(key) == null) {
return "";
} else {
return this._job.get(key);
}
}
/*
* Set key value of type long
*/
public void setValue(Enum key, long value) {
this._job.put(key, Long.toString(value));
}
/*
* Set key value of type double
*/
public void setValue(Enum key, double value) {
this._job.put(key, Double.toString(value));
}
/*
* Set key value of type String
*/
public void setValue(Enum key, String value) {
this._job.put(key, value);
}
/*
* Map Task List (Sorted by task id)
*/
private ArrayList<MapTaskStatistics> _mapTaskList = new ArrayList<MapTaskStatistics>();
/*
* Reduce Task List (Sorted by task id)
*/
private ArrayList<ReduceTaskStatistics> _reduceTaskList = new ArrayList<ReduceTaskStatistics>();
/*
* Ctor:
*/
public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException {
this._jobConf = jobConf;
this._jobInfo = jobInfo;
this._job = new Hashtable<Enum, String>();
populate_Job(this._job, jobInfo);
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList,
jobInfo.getAllTasks());
// Add the Job Type: MAP_REDUCE, MAP_ONLY
if (getLongValue(JobKeys.TOTAL_REDUCES) == 0) {
this._job.put(JobKeys.JOBTYPE,"MAP_ONLY");
} else {
this._job.put(JobKeys.JOBTYPE,"MAP_REDUCE");
}
}
/*
*
*/
private void populate_MapReduceTaskLists (ArrayList<MapTaskStatistics> mapTaskList,
ArrayList<ReduceTaskStatistics> reduceTaskList,
Map<TaskID, TaskInfo> taskMap)
throws ParseException {
int num_tasks = taskMap.entrySet().size();
// DO we need these lists?
// List<TaskAttemptInfo> successfulMapAttemptList =
// new ArrayList<TaskAttemptInfo>();
// List<TaskAttemptInfo> successfulReduceAttemptList =
// new ArrayList<TaskAttemptInfo>();
for (JobHistoryParser.TaskInfo taskInfo: taskMap.values()) {
if (taskInfo.getTaskType().equals(TaskType.MAP)) {
MapTaskStatistics mapT = new MapTaskStatistics();
TaskAttemptInfo successfulAttempt =
getLastSuccessfulTaskAttempt(taskInfo);
mapT.setValue(MapTaskKeys.TASK_ID,
successfulAttempt.getAttemptId().getTaskID().toString());
mapT.setValue(MapTaskKeys.ATTEMPT_ID,
successfulAttempt.getAttemptId().toString());
mapT.setValue(MapTaskKeys.HOSTNAME,
successfulAttempt.getTrackerName());
mapT.setValue(MapTaskKeys.TASK_TYPE,
successfulAttempt.getTaskType().toString());
mapT.setValue(MapTaskKeys.STATUS,
successfulAttempt.getTaskStatus().toString());
mapT.setValue(MapTaskKeys.START_TIME, successfulAttempt.getStartTime());
mapT.setValue(MapTaskKeys.FINISH_TIME, successfulAttempt.getFinishTime());
mapT.setValue(MapTaskKeys.SPLITS, taskInfo.getSplitLocations());
mapT.setValue(MapTaskKeys.TRACKER_NAME, successfulAttempt.getTrackerName());
mapT.setValue(MapTaskKeys.STATE_STRING, successfulAttempt.getState());
mapT.setValue(MapTaskKeys.HTTP_PORT, successfulAttempt.getHttpPort());
mapT.setValue(MapTaskKeys.ERROR, successfulAttempt.getError());
parseAndAddMapTaskCounters(mapT,
successfulAttempt.getCounters().toString());
mapTaskList.add(mapT);
// Add number of task attempts
mapT.setValue(MapTaskKeys.NUM_ATTEMPTS,
(new Integer(taskInfo.getAllTaskAttempts().size())).toString());
// Add EXECUTION_TIME = FINISH_TIME - START_TIME
long etime = mapT.getLongValue(MapTaskKeys.FINISH_TIME) -
mapT.getLongValue(MapTaskKeys.START_TIME);
mapT.setValue(MapTaskKeys.EXECUTION_TIME, (new Long(etime)).toString());
}else if (taskInfo.getTaskType().equals(TaskType.REDUCE)) {
ReduceTaskStatistics reduceT = new ReduceTaskStatistics();
TaskAttemptInfo successfulAttempt =
getLastSuccessfulTaskAttempt(taskInfo);
reduceT.setValue(ReduceTaskKeys.TASK_ID,
successfulAttempt.getAttemptId().getTaskID().toString());
reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID,
successfulAttempt.getAttemptId().toString());
reduceT.setValue(ReduceTaskKeys.HOSTNAME,
successfulAttempt.getTrackerName());
reduceT.setValue(ReduceTaskKeys.TASK_TYPE,
successfulAttempt.getTaskType().toString());
reduceT.setValue(ReduceTaskKeys.STATUS,
successfulAttempt.getTaskStatus().toString());
reduceT.setValue(ReduceTaskKeys.START_TIME,
successfulAttempt.getStartTime());
reduceT.setValue(ReduceTaskKeys.FINISH_TIME,
successfulAttempt.getFinishTime());
reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME,
successfulAttempt.getShuffleFinishTime());
reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME,
successfulAttempt.getSortFinishTime());
reduceT.setValue(ReduceTaskKeys.SPLITS, "");
reduceT.setValue(ReduceTaskKeys.TRACKER_NAME,
successfulAttempt.getTrackerName());
reduceT.setValue(ReduceTaskKeys.STATE_STRING,
successfulAttempt.getState());
reduceT.setValue(ReduceTaskKeys.HTTP_PORT,
successfulAttempt.getHttpPort());
parseAndAddReduceTaskCounters(reduceT,
successfulAttempt.getCounters().toString());
reduceTaskList.add(reduceT);
// Add number of task attempts
reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS,
(new Integer(taskInfo.getAllTaskAttempts().size())).toString());
// Add EXECUTION_TIME = FINISH_TIME - START_TIME
long etime1 = reduceT.getLongValue(ReduceTaskKeys.FINISH_TIME) -
reduceT.getLongValue(ReduceTaskKeys.START_TIME);
reduceT.setValue(ReduceTaskKeys.EXECUTION_TIME,
(new Long(etime1)).toString());
} else if (taskInfo.getTaskType().equals(TaskType.JOB_CLEANUP) ||
taskInfo.getTaskType().equals(TaskType.JOB_SETUP)) {
//System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
} else {
System.err.println("UNKNOWN TASK TYPE : "+taskInfo.getTaskType());
}
}
}
/*
* Get last successful task attempt to be added in the stats
*/
private TaskAttemptInfo getLastSuccessfulTaskAttempt(TaskInfo task) {
for (TaskAttemptInfo ai: task.getAllTaskAttempts().values()) {
if (ai.getTaskStatus().equals(TaskStatus.State.SUCCEEDED.toString())) {
return ai;
}
}
return null;
}
/*
* Popuate the job stats
*/
private void populate_Job (Hashtable<Enum, String> job, JobInfo jobInfo) throws ParseException {
job.put(JobKeys.FINISH_TIME, String.valueOf(jobInfo.getFinishTime()));
job.put(JobKeys.JOBID, jobInfo.getJobId().toString());
job.put(JobKeys.JOBNAME, jobInfo.getJobname());
job.put(JobKeys.USER, jobInfo.getUsername());
job.put(JobKeys.JOBCONF, jobInfo.getJobConfPath());
job.put(JobKeys.SUBMIT_TIME, String.valueOf(jobInfo.getSubmitTime()));
job.put(JobKeys.LAUNCH_TIME, String.valueOf(jobInfo.getLaunchTime()));
job.put(JobKeys.TOTAL_MAPS, String.valueOf(jobInfo.getTotalMaps()));
job.put(JobKeys.TOTAL_REDUCES, String.valueOf(jobInfo.getTotalReduces()));
job.put(JobKeys.FAILED_MAPS, String.valueOf(jobInfo.getFailedMaps()));
job.put(JobKeys.FAILED_REDUCES, String.valueOf(jobInfo.getFailedReduces()));
job.put(JobKeys.FINISHED_MAPS, String.valueOf(jobInfo.getFinishedMaps()));
job.put(JobKeys.FINISHED_REDUCES,
String.valueOf(jobInfo.getFinishedReduces()));
job.put(JobKeys.STATUS, jobInfo.getJobStatus().toString());
job.put(JobKeys.JOB_PRIORITY, jobInfo.getPriority());
parseAndAddJobCounters(job, jobInfo.getTotalCounters().toString());
}
/*
* Parse and add the job counters
*/
private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.err.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.err.println("part0:<"+parts[0]+">,:part1 <"+parts[1]+">");
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
job.put(JobKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
job.put(JobKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Job Counters .Launched map tasks")) {
job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
} else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
} else if (parts[0].equals("Job Counters .Data-local map tasks")) {
job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
} else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
job.put(JobKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
job.put(JobKeys.SHUFFLE_BYTES, parts[1]);
} else {
System.err.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
}
}
}
}
/*
* Parse and add the Map task counters
*/
private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.out.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
mapTask.setValue(MapTaskKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
mapTask.setValue(MapTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
mapTask.setValue(MapTaskKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("FileInputFormatCounters.BYTES_READ")) {
mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
} else {
System.err.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
}
}
}
}
/*
* Parse and add the reduce task counters
*/
private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.out.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
reduceTask.setValue(ReduceTaskKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
reduceTask.setValue(ReduceTaskKeys.SHUFFLE_BYTES, parts[1]);
} else {
System.err.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
}
}
}
}
/*
* Print the Job Execution Statistics
* TODO: split to pring job, map/reduce task list and individual map/reduce task stats
*/
public void printJobExecutionStatistics() {
/*
* Print Job Counters
*/
System.out.println("JOB COUNTERS *********************************************");
int size = this._job.size();
java.util.Iterator<Map.Entry<Enum, String>> kv = this._job.entrySet().iterator();
for (int i = 0; i < size; i++)
{
Map.Entry<Enum, String> entry = (Map.Entry<Enum, String>) kv.next();
Enum key = entry.getKey();
String value = entry.getValue();
System.out.println("Key:<" + key.name() + ">, value:<"+ value +">");
}
/*
*
*/
System.out.println("MAP COUNTERS *********************************************");
int size1 = this._mapTaskList.size();
for (int i = 0; i < size1; i++)
{
System.out.println("MAP TASK *********************************************");
this._mapTaskList.get(i).printKeys();
}
/*
*
*/
System.out.println("REDUCE COUNTERS *********************************************");
int size2 = this._mapTaskList.size();
for (int i = 0; i < size2; i++)
{
System.out.println("REDUCE TASK *********************************************");
this._reduceTaskList.get(i).printKeys();
}
}
/*
* Hash table keeping sorted lists of map tasks based on the specific map task key
*/
private Hashtable <Enum, ArrayList<MapTaskStatistics>> _sortedMapTaskListsByKey = new Hashtable<Enum, ArrayList<MapTaskStatistics>>();
/*
* @return mapTaskList : ArrayList of MapTaskStatistics
* @param mapTaskSortKey : Specific counter key used for sorting the task list
* @param datatype : indicates the data type of the counter key used for sorting
* If sort key is null then by default map tasks are sorted using map task ids.
*/
public synchronized ArrayList<MapTaskStatistics>
getMapTaskList(Enum mapTaskSortKey, KeyDataType dataType) {
/*
* If mapTaskSortKey is null then use the task id as a key.
*/
if (mapTaskSortKey == null) {
mapTaskSortKey = MapTaskKeys.TASK_ID;
}
if (this._sortedMapTaskListsByKey.get(mapTaskSortKey) == null) {
ArrayList<MapTaskStatistics> newList = (ArrayList<MapTaskStatistics>)this._mapTaskList.clone();
this._sortedMapTaskListsByKey.put(mapTaskSortKey, this.sortMapTasksByKey(newList, mapTaskSortKey, dataType));
}
return this._sortedMapTaskListsByKey.get(mapTaskSortKey);
}
private ArrayList<MapTaskStatistics> sortMapTasksByKey (ArrayList<MapTaskStatistics> mapTasks,
Enum key, Enum dataType) {
MapCounterComparator mcc = new MapCounterComparator(key, dataType);
Collections.sort (mapTasks, mcc);
return mapTasks;
}
private class MapCounterComparator implements Comparator<MapTaskStatistics> {
public Enum _sortKey;
public Enum _dataType;
public MapCounterComparator(Enum key, Enum dataType) {
this._sortKey = key;
this._dataType = dataType;
}
// Comparator interface requires defining compare method.
public int compare(MapTaskStatistics a, MapTaskStatistics b) {
if (this._dataType == KeyDataType.LONG) {
long aa = a.getLongValue(this._sortKey);
long bb = b.getLongValue(this._sortKey);
if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
} else {
return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
}
return 0;
}
}
/*
* Reduce Array List sorting
*/
private Hashtable <Enum, ArrayList<ReduceTaskStatistics>> _sortedReduceTaskListsByKey = new Hashtable<Enum,ArrayList<ReduceTaskStatistics>>();
/*
* @return reduceTaskList : ArrayList of ReduceTaskStatistics
* @param reduceTaskSortKey : Specific counter key used for sorting the task list
* @param dataType : indicates the data type of the counter key used for sorting
* If sort key is null then, by default reduce tasks are sorted using task ids.
*/
public synchronized ArrayList<ReduceTaskStatistics>
getReduceTaskList (Enum reduceTaskSortKey, KeyDataType dataType) {
/*
* If reduceTaskSortKey is null then use the task id as a key.
*/
if (reduceTaskSortKey == null) {
reduceTaskSortKey = ReduceTaskKeys.TASK_ID;
}
if (this._sortedReduceTaskListsByKey.get(reduceTaskSortKey) == null) {
ArrayList<ReduceTaskStatistics> newList = (ArrayList<ReduceTaskStatistics>)this._reduceTaskList.clone();
this._sortedReduceTaskListsByKey.put(reduceTaskSortKey, this.sortReduceTasksByKey(newList, reduceTaskSortKey, dataType));
}
return this._sortedReduceTaskListsByKey.get(reduceTaskSortKey);
}
private ArrayList<ReduceTaskStatistics> sortReduceTasksByKey (ArrayList<ReduceTaskStatistics> reduceTasks,
Enum key, Enum dataType) {
ReduceCounterComparator rcc = new ReduceCounterComparator(key, dataType);
Collections.sort (reduceTasks, rcc);
return reduceTasks;
}
private class ReduceCounterComparator implements Comparator<ReduceTaskStatistics> {
public Enum _sortKey;
public Enum _dataType; //either long or string
public ReduceCounterComparator(Enum key, Enum dataType) {
this._sortKey = key;
this._dataType = dataType;
}
// Comparator interface requires defining compare method.
public int compare(ReduceTaskStatistics a, ReduceTaskStatistics b) {
if (this._dataType == KeyDataType.LONG) {
long aa = a.getLongValue(this._sortKey);
long bb = b.getLongValue(this._sortKey);
if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
} else {
return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
}
return 0;
}
}
}