blob: d3a16560f4973273a22c064cbd147967ab41d0cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.analysis.salsa.fsm;
import java.io.IOException;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.extraction.demux.*;
import org.apache.hadoop.chukwa.extraction.engine.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
/**
* FSM Builder
*
* Input: start/end pairs i.e. JobHistory data
*
* Input handling is controlled by choosing a custom mapper that
* is able to parse the desired input format (e.g. JobHistory lines)
* One map class is provided for each type of input data provided
* Each map class "standardizes" the different input log types
* to the standardized internal FSMIntermedEntry representation
*
* Currently available mapper classes:
* DataNodeClientTraceMapper
* TaskTrackerClientTraceMapper
* JobHistoryTaskDataMapper
*
* Parameterizing choice of mapper class - read in as config parameter
*
* Output is standardized, regardless of input, and is generated by
* the common reducer
*
*/
public class FSMBuilder extends Configured implements Tool {
private static Log log = LogFactory.getLog(FSMBuilder.class);
public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS,
OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED,
COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS}
protected static final String SEP = "/";
public static class FSMReducer
extends MapReduceBase
implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> {
/**
* These are used for the add_info TreeMap; keys not listed here are automatically
* prepended with "COUNTER_"
*/
final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
protected final static String JCDF_ID1 = "JCDF_ID1";
protected final static String JCDF_ID2 = "JCDF_ID2";
protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME";
protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL";
protected final static String JCDF_SEP = "@";
/**
* Populates fields used by Pig script for stitching together causal flows
*/
protected void addStitchingFields_blockread
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("JOB_ID"));
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_BYTES"));
String id1 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
String id2 = new String("map"+JCDF_SEP+cr.getValue("JOB_ID"));
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
/**
* Populates fields used by Pig script for stitching together causal flows
*/
protected void addStitchingFields_map
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_INPUT_BYTES"));
String id1 = new String("map"+JCDF_SEP+cr.getValue("TASK_ID"));
String id2 = new String("shuf"+JCDF_SEP+cr.getValue("TASK_ID"));
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
/**
* Populates fields used by Pig script for stitching together causal flows
*/
protected void addStitchingFields_shuffle
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_BYTES"));
String mapid, redid;
String id_parts[];
id_parts = (cr.getValue("TASK_ID")).split("@");
if (id_parts.length != 2) {
log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length);
}
redid = id_parts[0];
mapid = id_parts[1];
String id1 = new String("shuf"+JCDF_SEP+mapid);
String id2 = new String("shufred"+JCDF_SEP+redid);
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
/**
* Populates fields used by Pig script for stitching together causal flows
*/
protected void addStitchingFields_redshufwait
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_INPUT_BYTES"));
String id1 = new String("shufred"+JCDF_SEP+cr.getValue("TASK_ID"));
String id2 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
/**
* Populates fields used by Pig script for stitching together causal flows
*/
protected void addStitchingFields_redsort
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_INPUT_BYTES"));
String id1 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
String id2 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
/**
* Populates fields used by Pig script for stitching together causal flows
*/
protected void addStitchingFields_redreducer
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_INPUT_BYTES"));
String id1 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
String id2 = new String("redout"+JCDF_SEP+cr.getValue("TASK_ID"));
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
protected void addStitchingFields_blockwrite
(ChukwaRecord cr, ArrayList<String> fnl)
{
assert(fnl.contains("JOB_ID"));
assert(fnl.contains("TASK_ID"));
assert(fnl.contains("TIME_END"));
assert(fnl.contains("TIME_START"));
assert(fnl.contains("COUNTER_BYTES"));
String id1 = new String("redout"+JCDF_SEP+cr.getValue("JOB_ID"));
String id2 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
String et = new String(
(new Long(Long.parseLong(cr.getValue("TIME_END")) -
Long.parseLong(cr.getValue("TIME_START")))).toString()
);
String ev = new String(cr.getValue("COUNTER_BYTES"));
cr.add(JCDF_ID1, id1);
cr.add(JCDF_ID2, id2);
cr.add(JCDF_EDGE_TIME, et);
cr.add(JCDF_EDGE_VOL, ev);
}
public void addStitchingFields
(ChukwaRecord cr)
{
String state_name = null;
String [] fieldNames = cr.getFields();
// get field name list
ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
// safety checks
assert(fieldNamesList.contains("STATE_NAME"));
state_name = cr.getValue("STATE_NAME");
if (state_name.equals("MAP")) {
addStitchingFields_map(cr, fieldNamesList);
} else if (state_name.equals("REDUCE_SHUFFLEWAIT")) {
addStitchingFields_redshufwait(cr, fieldNamesList);
} else if (state_name.equals("REDUCE_SORT")) {
addStitchingFields_redsort(cr, fieldNamesList);
} else if (state_name.equals("REDUCE_REDUCER")) {
addStitchingFields_redreducer(cr, fieldNamesList);
} else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) {
addStitchingFields_shuffle(cr, fieldNamesList);
} else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) {
addStitchingFields_blockread(cr, fieldNamesList);
} else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) {
addStitchingFields_blockwrite(cr, fieldNamesList);
}
// else add nothing
}
public void reduce
(ChukwaRecordKey key, Iterator<FSMIntermedEntry> values,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
Reporter reporter)
throws IOException
{
FSMIntermedEntry start_rec = null, end_rec = null;
FSMIntermedEntry tmpent;
String keystr = key.getKey();
String newkey;
ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>();
ArrayList<String> noncounters = new ArrayList<String>();
keystr = keystr.trim();
ChukwaRecord cr = new ChukwaRecord();
for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]);
ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter);
int itemcount = 0;
try {
while (values.hasNext()) {
itemcount++;
tmpent = values.next();
ents.add(tmpent.clone());
}
} catch (CloneNotSupportedException e) {
// do nothing
}
log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)");
if (itemcount == 2) { // i.e. we have both start and end events
if (ents.get(0).state_type.val == StateType.STATE_START &&
ents.get(1).state_type.val == StateType.STATE_END)
{
start_rec = ents.get(0); end_rec = ents.get(1);
} else if (ents.get(1).state_type.val == StateType.STATE_START &&
ents.get(0).state_type.val == StateType.STATE_END)
{
start_rec = ents.get(1); end_rec = ents.get(0);
} else {
log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+".");
// error handling?
}
cr.add(new String("STATE_NAME"),start_rec.state_name);
cr.add(new String("STATE_UNIQ_ID"),start_rec.getUniqueID());
cr.add(new String("TIMESTAMP"),start_rec.timestamp);
cr.add(new String("TIME_START"),start_rec.time_start);
cr.add(new String("TIME_END"),end_rec.time_end);
cr.add(new String("TIME_START_MILLIS"),start_rec.time_start.substring(start_rec.time_start.length()-3));
cr.add(new String("TIME_END_MILLIS"),end_rec.time_end.substring(end_rec.time_end.length()-3));
cr.add(new String("HOST"),start_rec.host_exec);
cr.add(new String("HOST_OTHER"),start_rec.host_other);
cr.add(new String("JOB_ID"),start_rec.job_id);
cr.add(new String("TASK_ID"),start_rec.getFriendlyID());
Set<String> treemapkeys = end_rec.add_info.keySet();
Iterator<String> keyIter = treemapkeys.iterator();
for (int i = 0; i < treemapkeys.size(); i++) {
assert(keyIter.hasNext());
String currkey = keyIter.next();
if (currkey != null &&
!noncounters.contains(currkey)) {
cr.add(new String("COUNTER_" + currkey), end_rec.add_info.get(currkey));
} else if (currkey != null && noncounters.contains(currkey)) {
cr.add(new String(currkey), end_rec.add_info.get(currkey));
}
}
assert(!keyIter.hasNext());
cr.setTime(Long.parseLong(start_rec.timestamp));
newkey = null;
newkey = new String(start_rec.time_orig_epoch +
SEP + start_rec.getUniqueID() + SEP + start_rec.time_orig);
log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]");
addStitchingFields(cr);
log.debug(cr);
coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr);
} else if (itemcount == 1) {
// check that we have only the start; if we have only the end, dump it
// otherwise change the reducetype to get record written to file for
// incomplete entries
log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet).");
} else { // any other value is invalid
// malformed data; print debug info?
log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+".");
}
}
}
public int run (String args[]) throws Exception {
int num_inputs;
JobConf conf = new JobConf(getConf(), FSMBuilder.class);
String [] args2 = args;
if (args2.length < 4 || !"-in".equals(args2[0]))
{
System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass=");
System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]");
return(1);
}
conf.setJobName("Salsa_FSMBuilder");
/* Get name of Mapper class to use */
String mapclassname = conf.get("chukwa.salsa.fsm.mapclass");
log.info("Mapper class: " + mapclassname);
Class mapperClass = null;
try {
mapperClass = Class.forName(mapclassname);
} catch (ClassNotFoundException c) {
System.err.println("Mapper " + mapclassname + " not found: " + c.toString());
}
/* Get on with usual job setup */
conf.setMapperClass(mapperClass);
conf.setReducerClass(FSMReducer.class);
conf.setOutputKeyClass(ChukwaRecordKey.class);
conf.setOutputValueClass(ChukwaRecord.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(ChukwaRecordOutputFormat.class);
conf.setPartitionerClass(FSMIntermedEntryPartitioner.class);
conf.setMapOutputValueClass(FSMIntermedEntry.class);
conf.setMapOutputKeyClass(ChukwaRecordKey.class);
conf.setNumReduceTasks(1); // fixed at 1 to ensure that all records are grouped together
/* Setup inputs/outputs */
try {
num_inputs = Integer.parseInt(args2[1]);
} catch (NumberFormatException e) {
System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper=");
System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]");
return(1);
}
if (num_inputs <= 0) {
System.err.println("Must have at least 1 input.");
return(1);
}
for (int i = 2; i < 2+num_inputs; i++) {
Path in = new Path(args2[i]);
FileInputFormat.addInputPath(conf, in);
}
Path out = new Path(args2[2+num_inputs]);
FileOutputFormat.setOutputPath(conf, out);
JobClient.runJob(conf);
return(0);
}
public static void main (String [] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args);
System.exit(res);
}
}