blob: 3de268efb5ef00218f15f40f5ffdc234413e3e5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.analysis.salsa.fsm;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.extraction.demux.*;
import org.apache.hadoop.chukwa.extraction.engine.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
/**
* Pluggable mapper for FSMBuilder
* Supports only 0.20+ JobHistory files
* because of explicitly coded counter names
*
* K2 = State Name + State ID
* (We use ChukwaRecordKey since it would already have implemented a bunch of
* useful things such as Comparators etc.)
* V2 = TreeMap
*/
public class JobHistoryTaskDataMapper
extends MapReduceBase
implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry>
{
private static Log log = LogFactory.getLog(FSMBuilder.class);
protected static final String SEP = "/";
protected final static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
/*
* Helper function for mapper to populate TreeMap of FSMIntermedEntr
* with input/output counters for Map records
*/
protected FSMIntermedEntry populateRecord_MapCounters
(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList)
{
String mapCounterNames [] = {
"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_BYTES",
"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_BYTES",
"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
};
String mapCounterDestNames[] = {
"FILE_BYTES_WRITTEN",
"COMBINE_INPUT_RECORDS",
"COMBINE_OUTPUT_RECORDS",
"INPUT_BYTES",
"INPUT_RECORDS",
"OUTPUT_BYTES",
"OUTPUT_RECORDS",
"SPILLED_RECORDS"
};
assert(mapCounterDestNames.length == mapCounterNames.length);
for (int i = 0; i < mapCounterDestNames.length; i++) {
if (fieldNamesList.contains(mapCounterNames[i])) {
this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i]));
}
}
this_rec.add_info.put("FILE_BYTES_READ",new String("0")); // to have same fields as reduce
this_rec.add_info.put("INPUT_GROUPS",new String("0")); // to have same fields as reduce
return this_rec;
}
/*
* Helper function for mapper to populate TreeMap of FSMIntermedEntr
* with input/output counters for Reduce records
*/
protected FSMIntermedEntry populateRecord_ReduceCounters
(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList)
{
String redCounterNames [] = {
"Counter:FileSystemCounters:FILE_BYTES_READ",
"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_GROUPS",
"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_OUTPUT_RECORDS",
"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_SHUFFLE_BYTES",
"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
};
String redCounterDestNames[] = {
"FILE_BYTES_READ",
"FILE_BYTES_WRITTEN",
"COMBINE_INPUT_RECORDS",
"COMBINE_OUTPUT_RECORDS",
"INPUT_GROUPS",
"INPUT_RECORDS",
"OUTPUT_RECORDS",
"INPUT_BYTES", // NOTE: shuffle bytes are mapped to "input_bytes"
"SPILLED_RECORDS"
};
assert(redCounterDestNames.length == redCounterNames.length);
for (int i = 0; i < redCounterDestNames.length; i++) {
if (fieldNamesList.contains(redCounterNames[i])) {
this_rec.add_info.put(redCounterDestNames[i], val.getValue(redCounterNames[i]));
}
}
this_rec.add_info.put("OUTPUT_BYTES",new String("0")); // to have same fields as map
return this_rec;
}
public void map
(ChukwaRecordKey key, ChukwaRecord val,
OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
Reporter reporter)
throws IOException
{
String task_type;
FSMIntermedEntry this_rec = new FSMIntermedEntry();
boolean add_record = true;
/* Extract field names for checking */
String [] fieldNames = val.getFields();
ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
/* Check state (Map or Reduce), generate unique ID */
if (!fieldNamesList.contains("TASK_ATTEMPT_ID")) return; // Ignore "TASK" entries
if (!fieldNamesList.contains("TASK_TYPE")) { // Malformed, ignore
return;
} else {
task_type = val.getValue("TASK_TYPE");
if (!task_type.equals("MAP") && !task_type.equals("REDUCE")) {
return; // do nothing
}
}
/* Check if this is a start or end entry, set state type, extract start/end times */
if (fieldNamesList.contains("START_TIME")) {
this_rec.state_type.val = StateType.STATE_START;
this_rec.timestamp = new String(val.getValue("START_TIME"));
this_rec.time_start = new String(val.getValue("START_TIME"));
this_rec.time_end = new String("");
if (val.getValue("START_TIME").length() < 4+2) { // needs to at least have milliseconds
add_record = add_record & false;
}
} else if (fieldNamesList.contains("FINISH_TIME")) {
this_rec.state_type.val = StateType.STATE_END;
this_rec.timestamp = new String(val.getValue("FINISH_TIME"));
this_rec.time_start = new String("");
this_rec.time_end = new String(val.getValue("FINISH_TIME"));
if (val.getValue("FINISH_TIME").length() < 4+2) { // needs to at least have milliseconds
add_record = add_record & false;
}
} else {
this_rec.state_type.val = StateType.STATE_NOOP;
}
/* Fill in common intermediate state entry information */
// Extract original ChukwaRecordKey values for later key reconstruction by reducer
try {
this_rec = ParseUtilities.splitChukwaRecordKey(key.getKey().trim(),this_rec,SEP);
} catch (Exception e) {
log.warn("Error occurred splitting ChukwaRecordKey ["+key.getKey().trim()+"]: " + e.toString());
return;
}
// Populate state enum information
this_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
if (task_type.equals("MAP")) {
this_rec.state_mapred = new MapRedState(MapRedState.MAP);
} else if (task_type.equals("REDUCE")) {
this_rec.state_mapred = new MapRedState(MapRedState.REDUCE);
} else {
this_rec.state_mapred = new MapRedState(MapRedState.NONE); // error handling here?
}
// Fill state name, unique ID
this_rec.state_name = new String(this_rec.state_mapred.toString());
this_rec.identifier = new String(val.getValue("TASK_ATTEMPT_ID"));
this_rec.generateUniqueID();
// Extract hostname from tracker name (if present), or directly fill from hostname (<= 0.18)
if (fieldNamesList.contains("HOSTNAME")) {
this_rec.host_exec = new String(val.getValue("HOSTNAME"));
this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec);
} else if (fieldNamesList.contains("TRACKER_NAME")) {
this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME"));
} else {
this_rec.host_exec = new String("");
}
if (this_rec.state_type.val == StateType.STATE_END) {
assert(fieldNamesList.contains("TASK_STATUS"));
String tmpstring = null;
tmpstring = val.getValue("TASK_STATUS");
if (tmpstring.equals("KILLED") || tmpstring.equals("FAILED")) {
add_record = add_record & false;
}
if (tmpstring != null && tmpstring.length() > 0) {
this_rec.add_info.put("STATE_STRING",tmpstring);
} else {
this_rec.add_info.put("STATE_STRING",new String(""));
}
switch(this_rec.state_mapred.val) {
case MapRedState.MAP:
this_rec = populateRecord_MapCounters(this_rec, val, fieldNamesList);
break;
case MapRedState.REDUCE:
this_rec = populateRecord_ReduceCounters(this_rec, val, fieldNamesList);
break;
default:
// do nothing
break;
}
}
// manually add clustername etc
assert(fieldNamesList.contains(Record.tagsField));
assert(fieldNamesList.contains("csource"));
this_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
this_rec.add_info.put("csource",val.getValue("csource"));
/* Special handling for Reduce Ends */
if (task_type.equals("REDUCE")) {
if (this_rec.state_type.val == StateType.STATE_END) {
add_record = add_record & expandReduceEnd(key,val,output,reporter,this_rec);
} else if (this_rec.state_type.val == StateType.STATE_START) {
add_record = add_record & expandReduceStart(key,val,output,reporter,this_rec);
}
} else if (task_type.equals("MAP")) {
add_record = add_record & true;
}
if (add_record) {
log.debug("Collecting record " + this_rec + "("+this_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType,this_rec.getUniqueID()),this_rec);
}
} // end of map()
protected boolean expandReduceStart
(ChukwaRecordKey key, ChukwaRecord val,
OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
Reporter reporter, FSMIntermedEntry this_rec)
throws IOException
{
FSMIntermedEntry redshuf_start_rec = null;
try {
redshuf_start_rec = this_rec.clone();
} catch (CloneNotSupportedException e) {
// TODO: Error handling
}
redshuf_start_rec.state_type = new StateType(StateType.STATE_START);
redshuf_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
redshuf_start_rec.timestamp = new String(this_rec.timestamp);
redshuf_start_rec.time_start = new String(this_rec.timestamp);
redshuf_start_rec.time_end = new String("");
redshuf_start_rec.generateUniqueID();
log.debug("Collecting record " + redshuf_start_rec +
"("+redshuf_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(
new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_start_rec.getUniqueID()),
redshuf_start_rec
);
return true;
}
/*
* Generates 5 extra FSMIntermedEntry's for a given reduce_end entry
*/
protected boolean expandReduceEnd
(ChukwaRecordKey key, ChukwaRecord val,
OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output,
Reporter reporter, FSMIntermedEntry this_rec)
throws IOException
{
/* Split into ReduceShuffleWait, ReduceSort, ReduceReducer
* But also retain the original combined Reduce at the same time
*/
FSMIntermedEntry redshuf_end_rec = null;
FSMIntermedEntry redsort_start_rec = null, redsort_end_rec = null;
FSMIntermedEntry redred_start_rec = null, redred_end_rec = null;
/* Extract field names for checking */
String [] fieldNames = val.getFields();
ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
try {
redsort_start_rec = this_rec.clone();
redred_start_rec = this_rec.clone();
redshuf_end_rec = this_rec.clone();
redsort_end_rec = this_rec.clone();
redred_end_rec = this_rec.clone();
} catch (CloneNotSupportedException e) {
// TODO: Error handling
}
redshuf_end_rec.state_type = new StateType(StateType.STATE_END);
redshuf_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
redsort_start_rec.state_type = new StateType(StateType.STATE_START);
redsort_end_rec.state_type = new StateType(StateType.STATE_END);
redsort_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SORT);
redsort_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_SORT);
redred_start_rec.state_type = new StateType(StateType.STATE_START);
redred_end_rec.state_type = new StateType(StateType.STATE_END);
redred_start_rec.state_mapred = new MapRedState(MapRedState.REDUCE_REDUCER);
redred_end_rec.state_mapred = new MapRedState(MapRedState.REDUCE_REDUCER);
redshuf_end_rec.generateUniqueID();
redsort_start_rec.generateUniqueID();
redsort_end_rec.generateUniqueID();
redred_start_rec.generateUniqueID();
redred_end_rec.generateUniqueID();
if(fieldNamesList.contains("SHUFFLE_FINISHED") && fieldNamesList.contains("SORT_FINISHED")) {
if (val.getValue("SHUFFLE_FINISHED") == null) return false;
if (val.getValue("SORT_FINISHED") == null) return false;
} else {
return false;
}
redshuf_end_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED"));
redshuf_end_rec.time_start = new String("");
redshuf_end_rec.time_end = new String(val.getValue("SHUFFLE_FINISHED"));
redsort_start_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED"));
redsort_start_rec.time_start = new String(val.getValue("SHUFFLE_FINISHED"));
redsort_start_rec.time_end = new String("");
assert(fieldNamesList.contains("SORT_FINISHED"));
redsort_end_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
redsort_end_rec.time_start = new String("");
redsort_end_rec.time_end = new String(val.getValue("SORT_FINISHED"));
redred_start_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
redred_start_rec.time_start = new String(val.getValue("SORT_FINISHED"));
redred_start_rec.time_end = new String("");
/* redred_end times are exactly the same as the original red_end times */
log.debug("Collecting record " + redshuf_end_rec +
"("+redshuf_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(
new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_end_rec.getUniqueID()),
redshuf_end_rec
);
log.debug("Collecting record " + redsort_start_rec +
"("+redsort_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(
new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_start_rec.getUniqueID()),
redsort_start_rec
);
log.debug("Collecting record " + redsort_end_rec +
"("+redsort_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(
new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_end_rec.getUniqueID()),
redsort_end_rec
);
log.debug("Collecting record " + redred_start_rec +
"("+redred_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(
new ChukwaRecordKey(FSM_CRK_ReduceType,redred_start_rec.getUniqueID()),
redred_start_rec
);
log.debug("Collecting record " + redred_end_rec +
"("+redred_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
output.collect(
new ChukwaRecordKey(FSM_CRK_ReduceType,redred_end_rec.getUniqueID()),
redred_end_rec
);
return true;
}
} // end of mapper class