blob: 1cfebd8d405eaf115b78b9f70957841d31ad1ab0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.vaidya.statistics.job;
import java.util.ArrayList;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobHistory;
import org.apache.hadoop.mapred.JobHistory.JobInfo;
import org.apache.hadoop.mapred.JobHistory.Keys;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import java.text.ParseException;
//import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
import java.util.Hashtable;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Collections;
/**
*
*/
public class JobStatistics implements JobStatisticsInterface {
/*
* Pattern for parsing the COUNTERS
*/
private static final Pattern _pattern = Pattern.compile("[[^,]?]+"); //"[[^,]?]+"
/*
* Job configuration
*/
private JobConf _jobConf;
/**
* @param jobConf the jobConf to set
*/
void setJobConf(JobConf jobConf) {
this._jobConf = jobConf;
// TODO: Add job conf to _job array
}
/*
* Aggregated Job level counters
*/
private JobHistory.JobInfo _jobInfo;
/*
* Job stats
*/
private java.util.Hashtable<Enum, String> _job;
/**
* @param jobConf the jobConf to set
*/
public JobConf getJobConf() {
return this._jobConf;
}
/*
* Get Job Counters of type long
*/
public long getLongValue(Enum key) {
if (this._job.get(key) == null) {
return (long)0;
}
else {
return Long.parseLong(this._job.get(key));
}
}
/*
* Get job Counters of type Double
*/
public double getDoubleValue(Enum key) {
if (this._job.get(key) == null) {
return (double)0;
} else {
return Double.parseDouble(this._job.get(key));
}
}
/*
* Get Job Counters of type String
*/
public String getStringValue(Enum key) {
if (this._job.get(key) == null) {
return "";
} else {
return this._job.get(key);
}
}
/*
* Set key value of type long
*/
public void setValue(Enum key, long value) {
this._job.put(key, Long.toString(value));
}
/*
* Set key value of type double
*/
public void setValue(Enum key, double value) {
this._job.put(key, Double.toString(value));
}
/*
* Set key value of type String
*/
public void setValue(Enum key, String value) {
this._job.put(key, value);
}
/*
* Map Task List (Sorted by task id)
*/
private ArrayList<MapTaskStatistics> _mapTaskList = new ArrayList<MapTaskStatistics>();
/*
* Reduce Task List (Sorted by task id)
*/
private ArrayList<ReduceTaskStatistics> _reduceTaskList = new ArrayList<ReduceTaskStatistics>();
/*
* Ctor:
*/
public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException {
this._jobConf = jobConf;
this._jobInfo = jobInfo;
this._job = new Hashtable<Enum, String>();
populate_Job(this._job, this._jobInfo.getValues());
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
// Add the Job Type: MAP_REDUCE, MAP_ONLY
if (getLongValue(JobKeys.TOTAL_REDUCES) == 0) {
this._job.put(JobKeys.JOBTYPE,"MAP_ONLY");
} else {
this._job.put(JobKeys.JOBTYPE,"MAP_REDUCE");
}
}
/*
*
*/
private void populate_MapReduceTaskLists (ArrayList<MapTaskStatistics> mapTaskList,
ArrayList<ReduceTaskStatistics> reduceTaskList,
java.util.Map<String, JobHistory.Task> taskMap) throws ParseException {
/*
*
*/
int num_tasks = taskMap.entrySet().size();
java.util.Iterator<Map.Entry<String, JobHistory.Task>> ti = taskMap.entrySet().iterator();
for (int i = 0; i < num_tasks; i++)
{
Map.Entry<String, JobHistory.Task> entry = (Map.Entry<String, JobHistory.Task>) ti.next();
JobHistory.Task task = entry.getValue();
if (task.get(Keys.TASK_TYPE).equals("MAP")) {
MapTaskStatistics mapT = new MapTaskStatistics();
java.util.Map<JobHistory.Keys, String> mapTask = task.getValues();
java.util.Map<JobHistory.Keys, String> successTaskAttemptMap = getLastSuccessfulTaskAttempt(task);
// NOTE: Following would lead to less number of actual tasks collected in the tasklist array
if (successTaskAttemptMap != null) {
mapTask.putAll(successTaskAttemptMap);
} else {
System.err.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
}
int size = mapTask.size();
java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = mapTask.entrySet().iterator();
for (int j = 0; j < size; j++)
{
Map.Entry<JobHistory.Keys, String> mtc = kv.next();
JobHistory.Keys key = mtc.getKey();
String value = mtc.getValue();
//System.out.println("JobHistory.MapKeys."+key+": "+value);
switch (key) {
case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break;
case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break;
case HOSTNAME: mapT.setValue(MapTaskKeys.HOSTNAME, value); break;
case TASK_TYPE: mapT.setValue(MapTaskKeys.TASK_TYPE, value); break;
case TASK_STATUS: mapT.setValue(MapTaskKeys.STATUS, value); break;
case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break;
case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break;
case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break;
case TRACKER_NAME: mapT.setValue(MapTaskKeys.TRACKER_NAME, value); break;
case STATE_STRING: mapT.setValue(MapTaskKeys.STATE_STRING, value); break;
case HTTP_PORT: mapT.setValue(MapTaskKeys.HTTP_PORT, value); break;
case ERROR: mapT.setValue(MapTaskKeys.ERROR, value); break;
case COUNTERS:
value.concat(",");
parseAndAddMapTaskCounters(mapT, value);
mapTaskList.add(mapT);
break;
default: System.err.println("JobHistory.MapKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
break;
}
}
// Add number of task attempts
mapT.setValue(MapTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
// Add EXECUTION_TIME = FINISH_TIME - START_TIME
long etime = mapT.getLongValue(MapTaskKeys.FINISH_TIME) - mapT.getLongValue(MapTaskKeys.START_TIME);
mapT.setValue(MapTaskKeys.EXECUTION_TIME, (new Long(etime)).toString());
}else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) {
ReduceTaskStatistics reduceT = new ReduceTaskStatistics();
java.util.Map<JobHistory.Keys, String> reduceTask = task.getValues();
java.util.Map<JobHistory.Keys, String> successTaskAttemptMap = getLastSuccessfulTaskAttempt(task);
// NOTE: Following would lead to less number of actual tasks collected in the tasklist array
if (successTaskAttemptMap != null) {
reduceTask.putAll(successTaskAttemptMap);
} else {
System.err.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
}
int size = reduceTask.size();
java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator();
for (int j = 0; j < size; j++)
{
Map.Entry<JobHistory.Keys, String> rtc = kv.next();
JobHistory.Keys key = rtc.getKey();
String value = rtc.getValue();
//System.out.println("JobHistory.ReduceKeys."+key+": "+value);
switch (key) {
case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
case HOSTNAME: reduceT.setValue(ReduceTaskKeys.HOSTNAME, value); break;
case TASK_TYPE: reduceT.setValue(ReduceTaskKeys.TASK_TYPE, value); break;
case TASK_STATUS: reduceT.setValue(ReduceTaskKeys.STATUS, value); break;
case START_TIME: reduceT.setValue(ReduceTaskKeys.START_TIME, value); break;
case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
case SPLITS: reduceT.setValue(ReduceTaskKeys.SPLITS, value); break;
case TRACKER_NAME: reduceT.setValue(ReduceTaskKeys.TRACKER_NAME, value); break;
case STATE_STRING: reduceT.setValue(ReduceTaskKeys.STATE_STRING, value); break;
case HTTP_PORT: reduceT.setValue(ReduceTaskKeys.HTTP_PORT, value); break;
case COUNTERS:
value.concat(",");
parseAndAddReduceTaskCounters(reduceT, value);
reduceTaskList.add(reduceT);
break;
default: System.err.println("JobHistory.ReduceKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
break;
}
}
// Add number of task attempts
reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
// Add EXECUTION_TIME = FINISH_TIME - START_TIME
long etime1 = reduceT.getLongValue(ReduceTaskKeys.FINISH_TIME) - reduceT.getLongValue(ReduceTaskKeys.START_TIME);
reduceT.setValue(ReduceTaskKeys.EXECUTION_TIME, (new Long(etime1)).toString());
} else if (task.get(Keys.TASK_TYPE).equals("CLEANUP") ||
task.get(Keys.TASK_TYPE).equals("SETUP")) {
//System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
} else {
System.err.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
}
}
}
/*
* Get last successful task attempt to be added in the stats
*/
private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
int size = taskAttempts.size();
java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
for (int i=0; i<size; i++) {
// CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
JobHistory.TaskAttempt attempt = tae.getValue();
if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
return attempt.getValues();
}
}
return null;
}
/*
* Popuate the job stats
*/
private void populate_Job (Hashtable<Enum, String> job, java.util.Map<JobHistory.Keys, String> jobC) throws ParseException {
int size = jobC.size();
java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
for (int i = 0; i < size; i++)
{
Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
JobHistory.Keys key = entry.getKey();
String value = entry.getValue();
//System.out.println("JobHistory.JobKeys."+key+": "+value);
switch (key) {
case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
case JOBID: job.put(JobKeys.JOBID, value); break;
case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
case USER: job.put(JobKeys.USER, value); break;
case JOBCONF: job.put(JobKeys.JOBCONF, value); break;
case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break;
case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break;
case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break;
case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break;
case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break;
case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break;
case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
case JOB_PRIORITY: job.put(JobKeys.JOB_PRIORITY, value); break;
case COUNTERS:
value.concat(",");
parseAndAddJobCounters(job, value);
break;
default: System.err.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
break;
}
}
}
/*
* Parse and add the job counters
*/
private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.err.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.err.println("part0:<"+parts[0]+">,:part1 <"+parts[1]+">");
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
job.put(JobKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
job.put(JobKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Job Counters .Launched map tasks")) {
job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
} else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
} else if (parts[0].equals("Job Counters .Data-local map tasks")) {
job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
} else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
job.put(JobKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
job.put(JobKeys.SHUFFLE_BYTES, parts[1]);
} else {
System.err.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
}
}
}
}
/*
* Parse and add the Map task counters
*/
private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.out.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
mapTask.setValue(MapTaskKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
mapTask.setValue(MapTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
mapTask.setValue(MapTaskKeys.SPILLED_RECORDS, parts[1]);
} else {
System.err.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
}
}
}
}
/*
* Parse and add the reduce task counters
*/
private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) throws ParseException {
Counters cnt = Counters.fromEscapedCompactString(counters);
for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
Counters.Group grp = grps.next();
//String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
Counters.Counter counter = mycounters.next();
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
//System.out.println("groupName:"+groupname+",countername: "+countername);
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
String value = (new Long(counter.getValue())).toString();
String[] parts = {countername,value};
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
reduceTask.setValue(ReduceTaskKeys.SPILLED_RECORDS, parts[1]);
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
reduceTask.setValue(ReduceTaskKeys.SHUFFLE_BYTES, parts[1]);
} else {
System.err.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
}
}
}
}
/*
* Print the Job Execution Statistics
* TODO: split to pring job, map/reduce task list and individual map/reduce task stats
*/
public void printJobExecutionStatistics() {
/*
* Print Job Counters
*/
System.out.println("JOB COUNTERS *********************************************");
int size = this._job.size();
java.util.Iterator<Map.Entry<Enum, String>> kv = this._job.entrySet().iterator();
for (int i = 0; i < size; i++)
{
Map.Entry<Enum, String> entry = (Map.Entry<Enum, String>) kv.next();
Enum key = entry.getKey();
String value = entry.getValue();
System.out.println("Key:<" + key.name() + ">, value:<"+ value +">");
}
/*
*
*/
System.out.println("MAP COUNTERS *********************************************");
int size1 = this._mapTaskList.size();
for (int i = 0; i < size1; i++)
{
System.out.println("MAP TASK *********************************************");
this._mapTaskList.get(i).printKeys();
}
/*
*
*/
System.out.println("REDUCE COUNTERS *********************************************");
int size2 = this._mapTaskList.size();
for (int i = 0; i < size2; i++)
{
System.out.println("REDUCE TASK *********************************************");
this._reduceTaskList.get(i).printKeys();
}
}
/*
* Hash table keeping sorted lists of map tasks based on the specific map task key
*/
private Hashtable <Enum, ArrayList<MapTaskStatistics>> _sortedMapTaskListsByKey = new Hashtable<Enum, ArrayList<MapTaskStatistics>>();
/*
* @return mapTaskList : ArrayList of MapTaskStatistics
* @param mapTaskSortKey : Specific counter key used for sorting the task list
* @param datatype : indicates the data type of the counter key used for sorting
* If sort key is null then by default map tasks are sorted using map task ids.
*/
public synchronized ArrayList<MapTaskStatistics>
getMapTaskList(Enum mapTaskSortKey, KeyDataType dataType) {
/*
* If mapTaskSortKey is null then use the task id as a key.
*/
if (mapTaskSortKey == null) {
mapTaskSortKey = MapTaskKeys.TASK_ID;
}
if (this._sortedMapTaskListsByKey.get(mapTaskSortKey) == null) {
ArrayList<MapTaskStatistics> newList = (ArrayList<MapTaskStatistics>)this._mapTaskList.clone();
this._sortedMapTaskListsByKey.put(mapTaskSortKey, this.sortMapTasksByKey(newList, mapTaskSortKey, dataType));
}
return this._sortedMapTaskListsByKey.get(mapTaskSortKey);
}
private ArrayList<MapTaskStatistics> sortMapTasksByKey (ArrayList<MapTaskStatistics> mapTasks,
Enum key, Enum dataType) {
MapCounterComparator mcc = new MapCounterComparator(key, dataType);
Collections.sort (mapTasks, mcc);
return mapTasks;
}
private class MapCounterComparator implements Comparator<MapTaskStatistics> {
public Enum _sortKey;
public Enum _dataType;
public MapCounterComparator(Enum key, Enum dataType) {
this._sortKey = key;
this._dataType = dataType;
}
// Comparator interface requires defining compare method.
public int compare(MapTaskStatistics a, MapTaskStatistics b) {
if (this._dataType == KeyDataType.LONG) {
long aa = a.getLongValue(this._sortKey);
long bb = b.getLongValue(this._sortKey);
if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
} else {
return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
}
return 0;
}
}
/*
* Reduce Array List sorting
*/
private Hashtable <Enum, ArrayList<ReduceTaskStatistics>> _sortedReduceTaskListsByKey = new Hashtable<Enum,ArrayList<ReduceTaskStatistics>>();
/*
* @return reduceTaskList : ArrayList of ReduceTaskStatistics
* @param reduceTaskSortKey : Specific counter key used for sorting the task list
* @param dataType : indicates the data type of the counter key used for sorting
* If sort key is null then, by default reduce tasks are sorted using task ids.
*/
public synchronized ArrayList<ReduceTaskStatistics>
getReduceTaskList (Enum reduceTaskSortKey, KeyDataType dataType) {
/*
* If reduceTaskSortKey is null then use the task id as a key.
*/
if (reduceTaskSortKey == null) {
reduceTaskSortKey = ReduceTaskKeys.TASK_ID;
}
if (this._sortedReduceTaskListsByKey.get(reduceTaskSortKey) == null) {
ArrayList<ReduceTaskStatistics> newList = (ArrayList<ReduceTaskStatistics>)this._reduceTaskList.clone();
this._sortedReduceTaskListsByKey.put(reduceTaskSortKey, this.sortReduceTasksByKey(newList, reduceTaskSortKey, dataType));
}
return this._sortedReduceTaskListsByKey.get(reduceTaskSortKey);
}
private ArrayList<ReduceTaskStatistics> sortReduceTasksByKey (ArrayList<ReduceTaskStatistics> reduceTasks,
Enum key, Enum dataType) {
ReduceCounterComparator rcc = new ReduceCounterComparator(key, dataType);
Collections.sort (reduceTasks, rcc);
return reduceTasks;
}
private class ReduceCounterComparator implements Comparator<ReduceTaskStatistics> {
public Enum _sortKey;
public Enum _dataType; //either long or string
public ReduceCounterComparator(Enum key, Enum dataType) {
this._sortKey = key;
this._dataType = dataType;
}
// Comparator interface requires defining compare method.
public int compare(ReduceTaskStatistics a, ReduceTaskStatistics b) {
if (this._dataType == KeyDataType.LONG) {
long aa = a.getLongValue(this._sortKey);
long bb = b.getLongValue(this._sortKey);
if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
} else {
return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
}
return 0;
}
}
}