| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.chukwa.analysis.salsa.fsm; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.ArrayList; |
| import java.util.Set; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import org.apache.hadoop.chukwa.extraction.demux.*; |
| import org.apache.hadoop.chukwa.extraction.engine.*; |
| import org.apache.hadoop.conf.*; |
| import org.apache.hadoop.mapred.*; |
| import org.apache.hadoop.util.*; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector; |
| |
| /** |
| * FSM Builder |
| * |
| * Input: start/end pairs i.e. JobHistory data |
| * |
| * Input handling is controlled by choosing a custom mapper that |
| * is able to parse the desired input format (e.g. JobHistory lines) |
| * One map class is provided for each type of input data provided |
| * Each map class "standardizes" the different input log types |
| * to the standardized internal FSMIntermedEntry representation |
| * |
| * Currently available mapper classes: |
| * DataNodeClientTraceMapper |
| * TaskTrackerClientTraceMapper |
| * JobHistoryTaskDataMapper |
| * |
| * Parameterizing choice of mapper class - read in as config parameter |
| * |
| * Output is standardized, regardless of input, and is generated by |
| * the common reducer |
| * |
| */ |
| |
| public class FSMBuilder extends Configured implements Tool { |
| |
| private static Log log = LogFactory.getLog(FSMBuilder.class); |
| |
| public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS, |
| OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED, |
| COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS} |
| |
| protected static final String SEP = "/"; |
| |
| public static class FSMReducer |
| extends MapReduceBase |
| implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> { |
| |
| /** |
| * These are used for the add_info TreeMap; keys not listed here are automatically |
| * prepended with "COUNTER_" |
| */ |
| final static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"}; |
| |
| protected final static String JCDF_ID1 = "JCDF_ID1"; |
| protected final static String JCDF_ID2 = "JCDF_ID2"; |
| protected final static String JCDF_EDGE_TIME = "JCDF_E_TIME"; |
| protected final static String JCDF_EDGE_VOL = "JCDF_E_VOL"; |
| protected final static String JCDF_SEP = "@"; |
| |
| |
| /** |
| * Populates fields used by Pig script for stitching together causal flows |
| */ |
| protected void addStitchingFields_blockread |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("JOB_ID")); |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_BYTES")); |
| |
| String id1 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START")); |
| String id2 = new String("map"+JCDF_SEP+cr.getValue("JOB_ID")); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| /** |
| * Populates fields used by Pig script for stitching together causal flows |
| */ |
| protected void addStitchingFields_map |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_INPUT_BYTES")); |
| |
| String id1 = new String("map"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String id2 = new String("shuf"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| /** |
| * Populates fields used by Pig script for stitching together causal flows |
| */ |
| protected void addStitchingFields_shuffle |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_BYTES")); |
| |
| String mapid, redid; |
| String id_parts[]; |
| |
| id_parts = (cr.getValue("TASK_ID")).split("@"); |
| if (id_parts.length != 2) { |
| log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length); |
| } |
| redid = id_parts[0]; |
| mapid = id_parts[1]; |
| |
| String id1 = new String("shuf"+JCDF_SEP+mapid); |
| String id2 = new String("shufred"+JCDF_SEP+redid); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| /** |
| * Populates fields used by Pig script for stitching together causal flows |
| */ |
| protected void addStitchingFields_redshufwait |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_INPUT_BYTES")); |
| |
| String id1 = new String("shufred"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String id2 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| /** |
| * Populates fields used by Pig script for stitching together causal flows |
| */ |
| protected void addStitchingFields_redsort |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_INPUT_BYTES")); |
| |
| String id1 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String id2 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| /** |
| * Populates fields used by Pig script for stitching together causal flows |
| */ |
| protected void addStitchingFields_redreducer |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_INPUT_BYTES")); |
| |
| String id1 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String id2 = new String("redout"+JCDF_SEP+cr.getValue("TASK_ID")); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_INPUT_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| protected void addStitchingFields_blockwrite |
| (ChukwaRecord cr, ArrayList<String> fnl) |
| { |
| assert(fnl.contains("JOB_ID")); |
| assert(fnl.contains("TASK_ID")); |
| assert(fnl.contains("TIME_END")); |
| assert(fnl.contains("TIME_START")); |
| assert(fnl.contains("COUNTER_BYTES")); |
| |
| String id1 = new String("redout"+JCDF_SEP+cr.getValue("JOB_ID")); |
| String id2 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START")); |
| String et = new String( |
| (new Long(Long.parseLong(cr.getValue("TIME_END")) - |
| Long.parseLong(cr.getValue("TIME_START")))).toString() |
| ); |
| String ev = new String(cr.getValue("COUNTER_BYTES")); |
| cr.add(JCDF_ID1, id1); |
| cr.add(JCDF_ID2, id2); |
| cr.add(JCDF_EDGE_TIME, et); |
| cr.add(JCDF_EDGE_VOL, ev); |
| } |
| |
| public void addStitchingFields |
| (ChukwaRecord cr) |
| { |
| String state_name = null; |
| String [] fieldNames = cr.getFields(); |
| |
| // get field name list |
| ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length); |
| for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); |
| |
| // safety checks |
| assert(fieldNamesList.contains("STATE_NAME")); |
| |
| state_name = cr.getValue("STATE_NAME"); |
| if (state_name.equals("MAP")) { |
| addStitchingFields_map(cr, fieldNamesList); |
| } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) { |
| addStitchingFields_redshufwait(cr, fieldNamesList); |
| } else if (state_name.equals("REDUCE_SORT")) { |
| addStitchingFields_redsort(cr, fieldNamesList); |
| } else if (state_name.equals("REDUCE_REDUCER")) { |
| addStitchingFields_redreducer(cr, fieldNamesList); |
| } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) { |
| addStitchingFields_shuffle(cr, fieldNamesList); |
| } else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) { |
| addStitchingFields_blockread(cr, fieldNamesList); |
| } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) { |
| addStitchingFields_blockwrite(cr, fieldNamesList); |
| } |
| // else add nothing |
| } |
| |
| public void reduce |
| (ChukwaRecordKey key, Iterator<FSMIntermedEntry> values, |
| OutputCollector<ChukwaRecordKey, ChukwaRecord> output, |
| Reporter reporter) |
| throws IOException |
| { |
| FSMIntermedEntry start_rec = null, end_rec = null; |
| FSMIntermedEntry tmpent; |
| String keystr = key.getKey(); |
| String newkey; |
| ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>(); |
| ArrayList<String> noncounters = new ArrayList<String>(); |
| keystr = keystr.trim(); |
| ChukwaRecord cr = new ChukwaRecord(); |
| |
| for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]); |
| |
| ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter); |
| |
| int itemcount = 0; |
| try { |
| while (values.hasNext()) { |
| itemcount++; |
| tmpent = values.next(); |
| ents.add(tmpent.clone()); |
| } |
| } catch (CloneNotSupportedException e) { |
| // do nothing |
| } |
| |
| log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)"); |
| |
| if (itemcount == 2) { // i.e. we have both start and end events |
| |
| if (ents.get(0).state_type.val == StateType.STATE_START && |
| ents.get(1).state_type.val == StateType.STATE_END) |
| { |
| start_rec = ents.get(0); end_rec = ents.get(1); |
| } else if (ents.get(1).state_type.val == StateType.STATE_START && |
| ents.get(0).state_type.val == StateType.STATE_END) |
| { |
| start_rec = ents.get(1); end_rec = ents.get(0); |
| } else { |
| log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+"."); |
| // error handling? |
| } |
| |
| cr.add(new String("STATE_NAME"),start_rec.state_name); |
| cr.add(new String("STATE_UNIQ_ID"),start_rec.getUniqueID()); |
| cr.add(new String("TIMESTAMP"),start_rec.timestamp); |
| cr.add(new String("TIME_START"),start_rec.time_start); |
| cr.add(new String("TIME_END"),end_rec.time_end); |
| cr.add(new String("TIME_START_MILLIS"),start_rec.time_start.substring(start_rec.time_start.length()-3)); |
| cr.add(new String("TIME_END_MILLIS"),end_rec.time_end.substring(end_rec.time_end.length()-3)); |
| cr.add(new String("HOST"),start_rec.host_exec); |
| cr.add(new String("HOST_OTHER"),start_rec.host_other); |
| cr.add(new String("JOB_ID"),start_rec.job_id); |
| cr.add(new String("TASK_ID"),start_rec.getFriendlyID()); |
| |
| Set<String> treemapkeys = end_rec.add_info.keySet(); |
| Iterator<String> keyIter = treemapkeys.iterator(); |
| |
| for (int i = 0; i < treemapkeys.size(); i++) { |
| assert(keyIter.hasNext()); |
| String currkey = keyIter.next(); |
| if (currkey != null && |
| !noncounters.contains(currkey)) { |
| cr.add(new String("COUNTER_" + currkey), end_rec.add_info.get(currkey)); |
| } else if (currkey != null && noncounters.contains(currkey)) { |
| cr.add(new String(currkey), end_rec.add_info.get(currkey)); |
| } |
| } |
| assert(!keyIter.hasNext()); |
| cr.setTime(Long.parseLong(start_rec.timestamp)); |
| |
| newkey = null; |
| newkey = new String(start_rec.time_orig_epoch + |
| SEP + start_rec.getUniqueID() + SEP + start_rec.time_orig); |
| |
| log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]"); |
| |
| addStitchingFields(cr); |
| log.debug(cr); |
| coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr); |
| |
| } else if (itemcount == 1) { |
| // check that we have only the start; if we have only the end, dump it |
| // otherwise change the reducetype to get record written to file for |
| // incomplete entries |
| |
| log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet)."); |
| |
| } else { // any other value is invalid |
| // malformed data; print debug info? |
| |
| log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+"."); |
| |
| } |
| } |
| } |
| |
| |
| public int run (String args[]) throws Exception { |
| int num_inputs; |
| JobConf conf = new JobConf(getConf(), FSMBuilder.class); |
| String [] args2 = args; |
| |
| if (args2.length < 4 || !"-in".equals(args2[0])) |
| { |
| System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass="); |
| System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]"); |
| return(1); |
| } |
| |
| conf.setJobName("Salsa_FSMBuilder"); |
| |
| /* Get name of Mapper class to use */ |
| String mapclassname = conf.get("chukwa.salsa.fsm.mapclass"); |
| log.info("Mapper class: " + mapclassname); |
| Class mapperClass = null; |
| try { |
| mapperClass = Class.forName(mapclassname); |
| } catch (ClassNotFoundException c) { |
| System.err.println("Mapper " + mapclassname + " not found: " + c.toString()); |
| } |
| |
| /* Get on with usual job setup */ |
| conf.setMapperClass(mapperClass); |
| conf.setReducerClass(FSMReducer.class); |
| conf.setOutputKeyClass(ChukwaRecordKey.class); |
| conf.setOutputValueClass(ChukwaRecord.class); |
| conf.setInputFormat(SequenceFileInputFormat.class); |
| conf.setOutputFormat(ChukwaRecordOutputFormat.class); |
| conf.setPartitionerClass(FSMIntermedEntryPartitioner.class); |
| conf.setMapOutputValueClass(FSMIntermedEntry.class); |
| conf.setMapOutputKeyClass(ChukwaRecordKey.class); |
| conf.setNumReduceTasks(1); // fixed at 1 to ensure that all records are grouped together |
| |
| /* Setup inputs/outputs */ |
| try { |
| num_inputs = Integer.parseInt(args2[1]); |
| } catch (NumberFormatException e) { |
| System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper="); |
| System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]"); |
| return(1); |
| } |
| |
| if (num_inputs <= 0) { |
| System.err.println("Must have at least 1 input."); |
| return(1); |
| } |
| |
| for (int i = 2; i < 2+num_inputs; i++) { |
| Path in = new Path(args2[i]); |
| FileInputFormat.addInputPath(conf, in); |
| } |
| |
| Path out = new Path(args2[2+num_inputs]); |
| FileOutputFormat.setOutputPath(conf, out); |
| |
| JobClient.runJob(conf); |
| |
| return(0); |
| } |
| |
| public static void main (String [] args) throws Exception { |
| |
| int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args); |
| |
| System.exit(res); |
| } |
| |
| |
| |
| } |
| |