| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.Map; |
| import java.util.StringTokenizer; |
| import java.util.HashMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.SequenceFile.CompressionType; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.GzipCodec; |
| import org.apache.hadoop.mapred.*; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * Job History Log Analyzer. |
| * |
| * <h3>Description.</h3> |
| * This a tool for parsing and analyzing history logs of map-reduce jobs. |
| * History logs contain information about execution of jobs, tasks, and |
| * attempts. This tool focuses on submission, launch, start, and finish times, |
| * as well as the success or failure of jobs, tasks, and attempts. |
| * <p> |
| * The analyzer calculates <em>per hour slot utilization</em> for the cluster |
| * as follows. |
| * For each task attempt it divides the time segment from the start of the |
| * attempt t<sub>S</sub> to the finish t<sub>F</sub> into whole hours |
| * [t<sub>0</sub>, ..., t<sub>n</sub>], where t<sub>0</sub> <= t<sub>S</sub> |
| * is the maximal whole hour preceding t<sub>S</sub>, and |
| * t<sub>n</sub> >= t<sub>F</sub> is the minimal whole hour after t<sub>F</sub>. |
| * Thus, [t<sub>0</sub>, ..., t<sub>n</sub>] covers the segment |
| * [t<sub>S</sub>, t<sub>F</sub>], during which the attempt was executed. |
| * Each interval [t<sub>i</sub>, t<sub>i+1</sub>] fully contained in |
| * [t<sub>S</sub>, t<sub>F</sub>] corresponds to exactly one slot on |
| * a map-reduce cluster (usually MAP-slot or REDUCE-slot). |
| * If interval [t<sub>i</sub>, t<sub>i+1</sub>] only intersects with |
| * [t<sub>S</sub>, t<sub>F</sub>] then we say that the task |
| * attempt used just a fraction of the slot during this hour. |
| * The fraction equals the size of the intersection. |
| * Let slotTime(A, h) denote the number of slots calculated that way for a |
| * specific attempt A during hour h. |
| * The tool then sums all slots for all attempts for every hour. |
| * The result is the slot hour utilization of the cluster: |
| * <tt>slotTime(h) = SUM<sub>A</sub> slotTime(A,h)</tt>. |
| * <p> |
| * Log analyzer calculates slot hours for <em>MAP</em> and <em>REDUCE</em> |
| * attempts separately. |
| * <p> |
| * Log analyzer distinguishes between <em>successful</em> and <em>failed</em> |
| * attempts. Task attempt is considered successful if its own status is SUCCESS |
| * and the statuses of the task and the job it is a part of are also SUCCESS. |
| * Otherwise the task attempt is considered failed. |
| * <p> |
| * Map-reduce clusters are usually configured to have a fixed number of MAP |
| * and REDUCE slots per node. Thus the maximal possible number of slots on |
| * the cluster is <tt>total_slots = total_nodes * slots_per_node</tt>. |
| * Effective slot hour cannot exceed <tt>total_slots</tt> for successful |
| * attempts. |
| * <p> |
| * <em>Pending time</em> characterizes the wait time of attempts. |
| * It is calculated similarly to the slot hour except that the wait interval |
| * starts when the job is submitted and ends when an attempt starts execution. |
| * In addition to that pending time also includes intervals between attempts |
| * of the same task if it was re-executed. |
| * <p> |
| * History log analyzer calculates two pending time variations. First is based |
| * on job submission time as described above, second, starts the wait interval |
| * when the job is launched rather than submitted. |
| * |
| * <h3>Input.</h3> |
| * The following input parameters can be specified in the argument string |
| * to the job log analyzer: |
| * <ul> |
| * <li><tt>-historyDir inputDir</tt> specifies the location of the directory |
| * where analyzer will be looking for job history log files.</li> |
| * <li><tt>-resFile resultFile</tt> the name of the result file.</li> |
| * <li><tt>-usersIncluded | -usersExcluded userList</tt> slot utilization and |
| * pending time can be calculated for all or for all but the specified users. |
| * <br> |
| * <tt>userList</tt> is a comma or semicolon separated list of users.</li> |
| * <li><tt>-gzip</tt> is used if history log files are compressed. |
| * Only {@link GzipCodec} is currently supported.</li> |
| * <li><tt>-jobDelimiter pattern</tt> one can concatenate original log files into |
| * larger file(s) with the specified delimiter to recognize the end of the log |
| * for one job from the next one.<br> |
| * <tt>pattern</tt> is a java regular expression |
| * {@link java.util.regex.Pattern}, which should match only the log delimiters. |
| * <br> |
| * E.g. pattern <tt>".!!FILE=.*!!"</tt> matches delimiters, which contain |
| * the original history log file names in the following form:<br> |
| * <tt>"$!!FILE=my.job.tracker.com_myJobId_user_wordcount.log!!"</tt></li> |
| * <li><tt>-clean</tt> cleans up default directories used by the analyzer.</li> |
| * <li><tt>-test</tt> test one file locally and exit; |
| * does not require map-reduce.</li> |
| * <li><tt>-help</tt> print usage.</li> |
| * </ul> |
| * |
| * <h3>Output.</h3> |
| * The output file is formatted as a tab separated table consisting of four |
| * columns: <tt>SERIES, PERIOD, TYPE, SLOT_HOUR</tt>. |
| * <ul> |
| * <li><tt>SERIES</tt> one of the four statistical series;</li> |
| * <li><tt>PERIOD</tt> the start of the time interval in the following format: |
| * <tt>"yyyy-mm-dd hh:mm:ss"</tt>;</li> |
| * <li><tt>TYPE</tt> the slot type, e.g. MAP or REDUCE;</li> |
| * <li><tt>SLOT_HOUR</tt> the value of the slot usage during this |
| * time interval.</li> |
| * </ul> |
| */ |
| @SuppressWarnings("deprecation") |
| public class JHLogAnalyzer { |
| private static final Log LOG = LogFactory.getLog(JHLogAnalyzer.class); |
| // Constants |
| private static final String JHLA_ROOT_DIR = |
| System.getProperty("test.build.data", "stats/JHLA"); |
| private static final Path INPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_input"); |
| private static final String BASE_INPUT_FILE_NAME = "jhla_in_"; |
| private static final Path OUTPUT_DIR = new Path(JHLA_ROOT_DIR, "jhla_output"); |
| private static final Path RESULT_FILE = |
| new Path(JHLA_ROOT_DIR, "jhla_result.txt"); |
| private static final Path DEFAULT_HISTORY_DIR = new Path("history"); |
| |
| private static final int DEFAULT_TIME_INTERVAL_MSEC = 1000*60*60; // 1 hour |
| |
| static{ |
| Configuration.addDefaultResource("hdfs-default.xml"); |
| Configuration.addDefaultResource("hdfs-site.xml"); |
| } |
| |
| static enum StatSeries { |
| STAT_ALL_SLOT_TIME |
| (AccumulatingReducer.VALUE_TYPE_LONG + "allSlotTime"), |
| STAT_FAILED_SLOT_TIME |
| (AccumulatingReducer.VALUE_TYPE_LONG + "failedSlotTime"), |
| STAT_SUBMIT_PENDING_SLOT_TIME |
| (AccumulatingReducer.VALUE_TYPE_LONG + "submitPendingSlotTime"), |
| STAT_LAUNCHED_PENDING_SLOT_TIME |
| (AccumulatingReducer.VALUE_TYPE_LONG + "launchedPendingSlotTime"); |
| |
| private String statName = null; |
| private StatSeries(String name) {this.statName = name;} |
| public String toString() {return statName;} |
| } |
| |
| private static class FileCreateDaemon extends Thread { |
| private static final int NUM_CREATE_THREADS = 10; |
| private static volatile int numFinishedThreads; |
| private static volatile int numRunningThreads; |
| private static FileStatus[] jhLogFiles; |
| |
| FileSystem fs; |
| int start; |
| int end; |
| |
| FileCreateDaemon(FileSystem fs, int start, int end) { |
| this.fs = fs; |
| this.start = start; |
| this.end = end; |
| } |
| |
| public void run() { |
| try { |
| for(int i=start; i < end; i++) { |
| String name = getFileName(i); |
| Path controlFile = new Path(INPUT_DIR, "in_file_" + name); |
| SequenceFile.Writer writer = null; |
| try { |
| writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile, |
| Text.class, LongWritable.class, |
| CompressionType.NONE); |
| String logFile = jhLogFiles[i].getPath().toString(); |
| writer.append(new Text(logFile), new LongWritable(0)); |
| } catch(Exception e) { |
| throw new IOException(e); |
| } finally { |
| if (writer != null) |
| writer.close(); |
| writer = null; |
| } |
| } |
| } catch(IOException ex) { |
| LOG.error("FileCreateDaemon failed.", ex); |
| } |
| numFinishedThreads++; |
| } |
| |
| private static void createControlFile(FileSystem fs, Path jhLogDir |
| ) throws IOException { |
| fs.delete(INPUT_DIR, true); |
| jhLogFiles = fs.listStatus(jhLogDir); |
| |
| numFinishedThreads = 0; |
| try { |
| int start = 0; |
| int step = jhLogFiles.length / NUM_CREATE_THREADS |
| + ((jhLogFiles.length % NUM_CREATE_THREADS) > 0 ? 1 : 0); |
| FileCreateDaemon[] daemons = new FileCreateDaemon[NUM_CREATE_THREADS]; |
| numRunningThreads = 0; |
| for(int tIdx=0; tIdx < NUM_CREATE_THREADS && start < jhLogFiles.length; tIdx++) { |
| int end = Math.min(start + step, jhLogFiles.length); |
| daemons[tIdx] = new FileCreateDaemon(fs, start, end); |
| start += step; |
| numRunningThreads++; |
| } |
| for(int tIdx=0; tIdx < numRunningThreads; tIdx++) { |
| daemons[tIdx].start(); |
| } |
| } finally { |
| int prevValue = 0; |
| while(numFinishedThreads < numRunningThreads) { |
| if(prevValue < numFinishedThreads) { |
| LOG.info("Finished " + numFinishedThreads + " threads out of " + numRunningThreads); |
| prevValue = numFinishedThreads; |
| } |
| try {Thread.sleep(500);} catch (InterruptedException e) {} |
| } |
| } |
| } |
| } |
| |
| private static void createControlFile(FileSystem fs, Path jhLogDir |
| ) throws IOException { |
| LOG.info("creating control file: JH log dir = " + jhLogDir); |
| FileCreateDaemon.createControlFile(fs, jhLogDir); |
| LOG.info("created control file: JH log dir = " + jhLogDir); |
| } |
| |
| private static String getFileName(int fIdx) { |
| return BASE_INPUT_FILE_NAME + Integer.toString(fIdx); |
| } |
| |
| /** |
| * If keyVal is of the form KEY="VALUE", then this will return [KEY, VALUE] |
| */ |
| private static String [] getKeyValue(String t) throws IOException { |
| String[] keyVal = t.split("=\"*|\""); |
| return keyVal; |
| } |
| |
| /** |
| * JobHistory log record. |
| */ |
| private static class JobHistoryLog { |
| String JOBID; |
| String JOB_STATUS; |
| long SUBMIT_TIME; |
| long LAUNCH_TIME; |
| long FINISH_TIME; |
| long TOTAL_MAPS; |
| long TOTAL_REDUCES; |
| long FINISHED_MAPS; |
| long FINISHED_REDUCES; |
| String USER; |
| Map<String, TaskHistoryLog> tasks; |
| |
| boolean isSuccessful() { |
| return (JOB_STATUS != null) && JOB_STATUS.equals("SUCCESS"); |
| } |
| |
| void parseLine(String line) throws IOException { |
| StringTokenizer tokens = new StringTokenizer(line); |
| if(!tokens.hasMoreTokens()) |
| return; |
| String what = tokens.nextToken(); |
| // Line should start with one of the following: |
| // Job, Task, MapAttempt, ReduceAttempt |
| if(what.equals("Job")) |
| updateJob(tokens); |
| else if(what.equals("Task")) |
| updateTask(tokens); |
| else if(what.indexOf("Attempt") >= 0) |
| updateTaskAttempt(tokens); |
| } |
| |
| private void updateJob(StringTokenizer tokens) throws IOException { |
| while(tokens.hasMoreTokens()) { |
| String t = tokens.nextToken(); |
| String[] keyVal = getKeyValue(t); |
| if(keyVal.length < 2) continue; |
| |
| if(keyVal[0].equals("JOBID")) { |
| if(JOBID == null) |
| JOBID = new String(keyVal[1]); |
| else if(!JOBID.equals(keyVal[1])) { |
| LOG.error("Incorrect JOBID: " |
| + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) |
| + " expect " + JOBID); |
| return; |
| } |
| } |
| else if(keyVal[0].equals("JOB_STATUS")) |
| JOB_STATUS = new String(keyVal[1]); |
| else if(keyVal[0].equals("SUBMIT_TIME")) |
| SUBMIT_TIME = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("LAUNCH_TIME")) |
| LAUNCH_TIME = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("FINISH_TIME")) |
| FINISH_TIME = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("TOTAL_MAPS")) |
| TOTAL_MAPS = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("TOTAL_REDUCES")) |
| TOTAL_REDUCES = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("FINISHED_MAPS")) |
| FINISHED_MAPS = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("FINISHED_REDUCES")) |
| FINISHED_REDUCES = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("USER")) |
| USER = new String(keyVal[1]); |
| } |
| } |
| |
| private void updateTask(StringTokenizer tokens) throws IOException { |
| // unpack |
| TaskHistoryLog task = new TaskHistoryLog().parse(tokens); |
| if(task.TASKID == null) { |
| LOG.error("TASKID = NULL for job " + JOBID); |
| return; |
| } |
| // update or insert |
| if(tasks == null) |
| tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES)); |
| TaskHistoryLog existing = tasks.get(task.TASKID); |
| if(existing == null) |
| tasks.put(task.TASKID, task); |
| else |
| existing.updateWith(task); |
| } |
| |
| private void updateTaskAttempt(StringTokenizer tokens) throws IOException { |
| // unpack |
| TaskAttemptHistoryLog attempt = new TaskAttemptHistoryLog(); |
| String taskID = attempt.parse(tokens); |
| if(taskID == null) return; |
| if(tasks == null) |
| tasks = new HashMap<String, TaskHistoryLog>((int)(TOTAL_MAPS + TOTAL_REDUCES)); |
| TaskHistoryLog existing = tasks.get(taskID); |
| if(existing == null) { |
| existing = new TaskHistoryLog(taskID); |
| tasks.put(taskID, existing); |
| } |
| existing.updateWith(attempt); |
| } |
| } |
| |
| /** |
| * TaskHistory log record. |
| */ |
| private static class TaskHistoryLog { |
| String TASKID; |
| String TASK_TYPE; // MAP, REDUCE, SETUP, CLEANUP |
| String TASK_STATUS; |
| long START_TIME; |
| long FINISH_TIME; |
| Map<String, TaskAttemptHistoryLog> attempts; |
| |
| TaskHistoryLog() {} |
| |
| TaskHistoryLog(String taskID) { |
| TASKID = taskID; |
| } |
| |
| boolean isSuccessful() { |
| return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS"); |
| } |
| |
| TaskHistoryLog parse(StringTokenizer tokens) throws IOException { |
| while(tokens.hasMoreTokens()) { |
| String t = tokens.nextToken(); |
| String[] keyVal = getKeyValue(t); |
| if(keyVal.length < 2) continue; |
| |
| if(keyVal[0].equals("TASKID")) { |
| if(TASKID == null) |
| TASKID = new String(keyVal[1]); |
| else if(!TASKID.equals(keyVal[1])) { |
| LOG.error("Incorrect TASKID: " |
| + keyVal[1].substring(0, Math.min(keyVal[1].length(), 100)) |
| + " expect " + TASKID); |
| continue; |
| } |
| } |
| else if(keyVal[0].equals("TASK_TYPE")) |
| TASK_TYPE = new String(keyVal[1]); |
| else if(keyVal[0].equals("TASK_STATUS")) |
| TASK_STATUS = new String(keyVal[1]); |
| else if(keyVal[0].equals("START_TIME")) |
| START_TIME = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("FINISH_TIME")) |
| FINISH_TIME = Long.parseLong(keyVal[1]); |
| } |
| return this; |
| } |
| |
| /** |
| * Update with non-null fields of the same task log record. |
| */ |
| void updateWith(TaskHistoryLog from) throws IOException { |
| if(TASKID == null) |
| TASKID = from.TASKID; |
| else if(!TASKID.equals(from.TASKID)) { |
| throw new IOException("Incorrect TASKID: " + from.TASKID |
| + " expect " + TASKID); |
| } |
| if(TASK_TYPE == null) |
| TASK_TYPE = from.TASK_TYPE; |
| else if(! TASK_TYPE.equals(from.TASK_TYPE)) { |
| LOG.error( |
| "Incorrect TASK_TYPE: " + from.TASK_TYPE + " expect " + TASK_TYPE |
| + " for task " + TASKID); |
| return; |
| } |
| if(from.TASK_STATUS != null) |
| TASK_STATUS = from.TASK_STATUS; |
| if(from.START_TIME > 0) |
| START_TIME = from.START_TIME; |
| if(from.FINISH_TIME > 0) |
| FINISH_TIME = from.FINISH_TIME; |
| } |
| |
| /** |
| * Update with non-null fields of the task attempt log record. |
| */ |
| void updateWith(TaskAttemptHistoryLog attempt) throws IOException { |
| if(attempt.TASK_ATTEMPT_ID == null) { |
| LOG.error("Unexpected TASK_ATTEMPT_ID = null for task " + TASKID); |
| return; |
| } |
| if(attempts == null) |
| attempts = new HashMap<String, TaskAttemptHistoryLog>(); |
| TaskAttemptHistoryLog existing = attempts.get(attempt.TASK_ATTEMPT_ID); |
| if(existing == null) |
| attempts.put(attempt.TASK_ATTEMPT_ID, attempt); |
| else |
| existing.updateWith(attempt); |
| // update task start time |
| if(attempt.START_TIME > 0 && |
| (this.START_TIME == 0 || this.START_TIME > attempt.START_TIME)) |
| START_TIME = attempt.START_TIME; |
| } |
| } |
| |
| /** |
| * TaskAttemptHistory log record. |
| */ |
| private static class TaskAttemptHistoryLog { |
| String TASK_ATTEMPT_ID; |
| String TASK_STATUS; // this task attempt status |
| long START_TIME; |
| long FINISH_TIME; |
| long HDFS_BYTES_READ; |
| long HDFS_BYTES_WRITTEN; |
| long FILE_BYTES_READ; |
| long FILE_BYTES_WRITTEN; |
| |
| /** |
| * Task attempt is considered successful iff all three statuses |
| * of the attempt, the task, and the job equal "SUCCESS". |
| */ |
| boolean isSuccessful() { |
| return (TASK_STATUS != null) && TASK_STATUS.equals("SUCCESS"); |
| } |
| |
| String parse(StringTokenizer tokens) throws IOException { |
| String taskID = null; |
| while(tokens.hasMoreTokens()) { |
| String t = tokens.nextToken(); |
| String[] keyVal = getKeyValue(t); |
| if(keyVal.length < 2) continue; |
| |
| if(keyVal[0].equals("TASKID")) { |
| if(taskID == null) |
| taskID = new String(keyVal[1]); |
| else if(!taskID.equals(keyVal[1])) { |
| LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID); |
| continue; |
| } |
| } |
| else if(keyVal[0].equals("TASK_ATTEMPT_ID")) { |
| if(TASK_ATTEMPT_ID == null) |
| TASK_ATTEMPT_ID = new String(keyVal[1]); |
| else if(!TASK_ATTEMPT_ID.equals(keyVal[1])) { |
| LOG.error("Incorrect TASKID: " + keyVal[1] + " expect " + taskID); |
| continue; |
| } |
| } |
| else if(keyVal[0].equals("TASK_STATUS")) |
| TASK_STATUS = new String(keyVal[1]); |
| else if(keyVal[0].equals("START_TIME")) |
| START_TIME = Long.parseLong(keyVal[1]); |
| else if(keyVal[0].equals("FINISH_TIME")) |
| FINISH_TIME = Long.parseLong(keyVal[1]); |
| } |
| return taskID; |
| } |
| |
| /** |
| * Update with non-null fields of the same task attempt log record. |
| */ |
| void updateWith(TaskAttemptHistoryLog from) throws IOException { |
| if(TASK_ATTEMPT_ID == null) |
| TASK_ATTEMPT_ID = from.TASK_ATTEMPT_ID; |
| else if(! TASK_ATTEMPT_ID.equals(from.TASK_ATTEMPT_ID)) { |
| throw new IOException( |
| "Incorrect TASK_ATTEMPT_ID: " + from.TASK_ATTEMPT_ID |
| + " expect " + TASK_ATTEMPT_ID); |
| } |
| if(from.TASK_STATUS != null) |
| TASK_STATUS = from.TASK_STATUS; |
| if(from.START_TIME > 0) |
| START_TIME = from.START_TIME; |
| if(from.FINISH_TIME > 0) |
| FINISH_TIME = from.FINISH_TIME; |
| if(from.HDFS_BYTES_READ > 0) |
| HDFS_BYTES_READ = from.HDFS_BYTES_READ; |
| if(from.HDFS_BYTES_WRITTEN > 0) |
| HDFS_BYTES_WRITTEN = from.HDFS_BYTES_WRITTEN; |
| if(from.FILE_BYTES_READ > 0) |
| FILE_BYTES_READ = from.FILE_BYTES_READ; |
| if(from.FILE_BYTES_WRITTEN > 0) |
| FILE_BYTES_WRITTEN = from.FILE_BYTES_WRITTEN; |
| } |
| } |
| |
| /** |
| * Key = statName*date-time*taskType |
| * Value = number of msec for the our |
| */ |
| private static class IntervalKey { |
| static final String KEY_FIELD_DELIMITER = "*"; |
| String statName; |
| String dateTime; |
| String taskType; |
| |
| IntervalKey(String stat, long timeMSec, String taskType) { |
| statName = stat; |
| SimpleDateFormat dateF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| dateTime = dateF.format(new Date(timeMSec)); |
| this.taskType = taskType; |
| } |
| |
| IntervalKey(String key) { |
| StringTokenizer keyTokens = new StringTokenizer(key, KEY_FIELD_DELIMITER); |
| if(!keyTokens.hasMoreTokens()) return; |
| statName = keyTokens.nextToken(); |
| if(!keyTokens.hasMoreTokens()) return; |
| dateTime = keyTokens.nextToken(); |
| if(!keyTokens.hasMoreTokens()) return; |
| taskType = keyTokens.nextToken(); |
| } |
| |
| void setStatName(String stat) { |
| statName = stat; |
| } |
| |
| String getStringKey() { |
| return statName + KEY_FIELD_DELIMITER + |
| dateTime + KEY_FIELD_DELIMITER + |
| taskType; |
| } |
| |
| Text getTextKey() { |
| return new Text(getStringKey()); |
| } |
| |
| public String toString() { |
| return getStringKey(); |
| } |
| } |
| |
| /** |
| * Mapper class. |
| */ |
| private static class JHLAMapper extends IOMapperBase<Object> { |
| /** |
| * A line pattern, which delimits history logs of different jobs, |
| * if multiple job logs are written in the same file. |
| * Null value means only one job log per file is expected. |
| * The pattern should be a regular expression as in |
| * {@link String#matches(String)}. |
| */ |
| String jobDelimiterPattern; |
| int maxJobDelimiterLineLength; |
| /** Count only these users jobs */ |
| Collection<String> usersIncluded; |
| /** Exclude jobs of the following users */ |
| Collection<String> usersExcluded; |
| /** Type of compression for compressed files: gzip */ |
| Class<? extends CompressionCodec> compressionClass; |
| |
| JHLAMapper() throws IOException { |
| } |
| |
| JHLAMapper(Configuration conf) throws IOException { |
| configure(new JobConf(conf)); |
| } |
| |
| public void configure(JobConf conf) { |
| super.configure(conf ); |
| usersIncluded = getUserList(conf.get("jhla.users.included", null)); |
| usersExcluded = getUserList(conf.get("jhla.users.excluded", null)); |
| String zipClassName = conf.get("jhla.compression.class", null); |
| try { |
| compressionClass = (zipClassName == null) ? null : |
| Class.forName(zipClassName).asSubclass(CompressionCodec.class); |
| } catch(Exception e) { |
| throw new RuntimeException("Compression codec not found: ", e); |
| } |
| jobDelimiterPattern = conf.get("jhla.job.delimiter.pattern", null); |
| maxJobDelimiterLineLength = conf.getInt("jhla.job.delimiter.length", 512); |
| } |
| |
| @Override |
| public void map(Text key, |
| LongWritable value, |
| OutputCollector<Text, Text> output, |
| Reporter reporter) throws IOException { |
| String name = key.toString(); |
| long longValue = value.get(); |
| |
| reporter.setStatus("starting " + name + " ::host = " + hostName); |
| |
| long tStart = System.currentTimeMillis(); |
| parseLogFile(fs, new Path(name), longValue, output, reporter); |
| long tEnd = System.currentTimeMillis(); |
| long execTime = tEnd - tStart; |
| |
| reporter.setStatus("finished " + name + " ::host = " + hostName + |
| " in " + execTime/1000 + " sec."); |
| } |
| |
| public Object doIO(Reporter reporter, |
| String path, // full path of history log file |
| long offset // starting offset within the file |
| ) throws IOException { |
| return null; |
| } |
| |
| void collectStats(OutputCollector<Text, Text> output, |
| String name, |
| long execTime, |
| Object jobObjects) throws IOException { |
| } |
| |
| private boolean isEndOfJobLog(String line) { |
| if(jobDelimiterPattern == null) |
| return false; |
| return line.matches(jobDelimiterPattern); |
| } |
| |
| /** |
| * Collect information about one job. |
| * |
| * @param fs - file system |
| * @param filePath - full path of a history log file |
| * @param offset - starting offset in the history log file |
| * @throws IOException |
| */ |
| public void parseLogFile(FileSystem fs, |
| Path filePath, |
| long offset, |
| OutputCollector<Text, Text> output, |
| Reporter reporter |
| ) throws IOException { |
| InputStream in = null; |
| try { |
| // open file & seek |
| FSDataInputStream stm = fs.open(filePath); |
| stm.seek(offset); |
| in = stm; |
| LOG.info("Opened " + filePath); |
| reporter.setStatus("Opened " + filePath); |
| // get a compression filter if specified |
| if(compressionClass != null) { |
| CompressionCodec codec = (CompressionCodec) |
| ReflectionUtils.newInstance(compressionClass, new Configuration()); |
| in = codec.createInputStream(stm); |
| LOG.info("Codec created " + filePath); |
| reporter.setStatus("Codec created " + filePath); |
| } |
| BufferedReader reader = new BufferedReader(new InputStreamReader(in)); |
| LOG.info("Reader created " + filePath); |
| // skip to the next job log start |
| long processed = 0L; |
| if(jobDelimiterPattern != null) { |
| for(String line = reader.readLine(); |
| line != null; line = reader.readLine()) { |
| if((stm.getPos() - processed) > 100000) { |
| processed = stm.getPos(); |
| reporter.setStatus("Processing " + filePath + " at " + processed); |
| } |
| if(isEndOfJobLog(line)) |
| break; |
| } |
| } |
| // parse lines and update job history |
| JobHistoryLog jh = new JobHistoryLog(); |
| int jobLineCount = 0; |
| for(String line = readLine(reader); |
| line != null; line = readLine(reader)) { |
| jobLineCount++; |
| if((stm.getPos() - processed) > 20000) { |
| processed = stm.getPos(); |
| long numTasks = (jh.tasks == null ? 0 : jh.tasks.size()); |
| String txt = "Processing " + filePath + " at " + processed |
| + " # tasks = " + numTasks; |
| reporter.setStatus(txt); |
| LOG.info(txt); |
| } |
| if(isEndOfJobLog(line)) { |
| if(jh.JOBID != null) { |
| LOG.info("Finished parsing job: " + jh.JOBID |
| + " line count = " + jobLineCount); |
| collectJobStats(jh, output, reporter); |
| LOG.info("Collected stats for job: " + jh.JOBID); |
| } |
| jh = new JobHistoryLog(); |
| jobLineCount = 0; |
| } else |
| jh.parseLine(line); |
| } |
| if(jh.JOBID == null) { |
| LOG.error("JOBID = NULL in " + filePath + " at " + processed); |
| return; |
| } |
| collectJobStats(jh, output, reporter); |
| } catch(Exception ie) { |
| // parsing errors can happen if the file has been truncated |
| LOG.error("JHLAMapper.parseLogFile", ie); |
| reporter.setStatus("JHLAMapper.parseLogFile failed " |
| + StringUtils.stringifyException(ie)); |
| throw new IOException("Job failed.", ie); |
| } finally { |
| if(in != null) in.close(); |
| } |
| } |
| |
| /** |
| * Read lines until one ends with a " ." or "\" " |
| */ |
| private StringBuffer resBuffer = new StringBuffer(); |
| private String readLine(BufferedReader reader) throws IOException { |
| resBuffer.setLength(0); |
| reader.mark(maxJobDelimiterLineLength); |
| for(String line = reader.readLine(); |
| line != null; line = reader.readLine()) { |
| if(isEndOfJobLog(line)) { |
| if(resBuffer.length() == 0) |
| resBuffer.append(line); |
| else |
| reader.reset(); |
| break; |
| } |
| if(resBuffer.length() == 0) |
| resBuffer.append(line); |
| else if(resBuffer.length() < 32000) |
| resBuffer.append(line); |
| if(line.endsWith(" .") || line.endsWith("\" ")) { |
| break; |
| } |
| reader.mark(maxJobDelimiterLineLength); |
| } |
| String result = resBuffer.length() == 0 ? null : resBuffer.toString(); |
| resBuffer.setLength(0); |
| return result; |
| } |
| |
| private void collectPerIntervalStats(OutputCollector<Text, Text> output, |
| long start, long finish, String taskType, |
| StatSeries ... stats) throws IOException { |
| long curInterval = (start / DEFAULT_TIME_INTERVAL_MSEC) |
| * DEFAULT_TIME_INTERVAL_MSEC; |
| long curTime = start; |
| long accumTime = 0; |
| while(curTime < finish) { |
| // how much of the task time belonged to current interval |
| long nextInterval = curInterval + DEFAULT_TIME_INTERVAL_MSEC; |
| long intervalTime = ((finish < nextInterval) ? |
| finish : nextInterval) - curTime; |
| IntervalKey key = new IntervalKey("", curInterval, taskType); |
| Text val = new Text(String.valueOf(intervalTime)); |
| for(StatSeries statName : stats) { |
| key.setStatName(statName.toString()); |
| output.collect(key.getTextKey(), val); |
| } |
| |
| curTime = curInterval = nextInterval; |
| accumTime += intervalTime; |
| } |
| // For the pending stat speculative attempts may intersect. |
| // Only one of them is considered pending. |
| assert accumTime == finish - start || finish < start; |
| } |
| |
| private void collectJobStats(JobHistoryLog jh, |
| OutputCollector<Text, Text> output, |
| Reporter reporter |
| ) throws IOException { |
| if(jh == null) |
| return; |
| if(jh.tasks == null) |
| return; |
| if(jh.SUBMIT_TIME <= 0) |
| throw new IOException("Job " + jh.JOBID |
| + " SUBMIT_TIME = " + jh.SUBMIT_TIME); |
| if(usersIncluded != null && !usersIncluded.contains(jh.USER)) |
| return; |
| if(usersExcluded != null && usersExcluded.contains(jh.USER)) |
| return; |
| |
| int numAttempts = 0; |
| long totalTime = 0; |
| boolean jobSuccess = jh.isSuccessful(); |
| long jobWaitTime = jh.LAUNCH_TIME - jh.SUBMIT_TIME; |
| // attemptSubmitTime is the job's SUBMIT_TIME, |
| // or the previous attempt FINISH_TIME for all subsequent attempts |
| for(TaskHistoryLog th : jh.tasks.values()) { |
| if(th.attempts == null) |
| continue; |
| // Task is successful iff both the task and the job are a "SUCCESS" |
| long attemptSubmitTime = jh.LAUNCH_TIME; |
| boolean taskSuccess = jobSuccess && th.isSuccessful(); |
| for(TaskAttemptHistoryLog tah : th.attempts.values()) { |
| // Task attempt is considered successful iff all three statuses |
| // of the attempt, the task, and the job equal "SUCCESS" |
| boolean success = taskSuccess && tah.isSuccessful(); |
| if(tah.START_TIME == 0) { |
| LOG.error("Start time 0 for task attempt " + tah.TASK_ATTEMPT_ID); |
| continue; |
| } |
| if(tah.FINISH_TIME < tah.START_TIME) { |
| LOG.error("Finish time " + tah.FINISH_TIME + " is less than " + |
| "Start time " + tah.START_TIME + " for task attempt " + |
| tah.TASK_ATTEMPT_ID); |
| tah.FINISH_TIME = tah.START_TIME; |
| } |
| |
| if(!"MAP".equals(th.TASK_TYPE) && !"REDUCE".equals(th.TASK_TYPE) && |
| !"CLEANUP".equals(th.TASK_TYPE) && !"SETUP".equals(th.TASK_TYPE)) { |
| LOG.error("Unexpected TASK_TYPE = " + th.TASK_TYPE |
| + " for attempt " + tah.TASK_ATTEMPT_ID); |
| } |
| |
| collectPerIntervalStats(output, |
| attemptSubmitTime, tah.START_TIME, th.TASK_TYPE, |
| StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME); |
| collectPerIntervalStats(output, |
| attemptSubmitTime - jobWaitTime, tah.START_TIME, th.TASK_TYPE, |
| StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME); |
| if(success) |
| collectPerIntervalStats(output, |
| tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE, |
| StatSeries.STAT_ALL_SLOT_TIME); |
| else |
| collectPerIntervalStats(output, |
| tah.START_TIME, tah.FINISH_TIME, th.TASK_TYPE, |
| StatSeries.STAT_ALL_SLOT_TIME, |
| StatSeries.STAT_FAILED_SLOT_TIME); |
| totalTime += (tah.FINISH_TIME - tah.START_TIME); |
| numAttempts++; |
| if(numAttempts % 500 == 0) { |
| reporter.setStatus("Processing " + jh.JOBID + " at " + numAttempts); |
| } |
| attemptSubmitTime = tah.FINISH_TIME; |
| } |
| } |
| LOG.info("Total Maps = " + jh.TOTAL_MAPS |
| + " Reduces = " + jh.TOTAL_REDUCES); |
| LOG.info("Finished Maps = " + jh.FINISHED_MAPS |
| + " Reduces = " + jh.FINISHED_REDUCES); |
| LOG.info("numAttempts = " + numAttempts); |
| LOG.info("totalTime = " + totalTime); |
| LOG.info("averageAttemptTime = " |
| + (numAttempts==0 ? 0 : totalTime/numAttempts)); |
| LOG.info("jobTotalTime = " + (jh.FINISH_TIME <= jh.SUBMIT_TIME? 0 : |
| jh.FINISH_TIME - jh.SUBMIT_TIME)); |
| } |
| } |
| |
| public static class JHLAPartitioner implements Partitioner<Text, Text> { |
| static final int NUM_REDUCERS = 9; |
| |
| public void configure(JobConf conf) {} |
| |
| public int getPartition(Text key, Text value, int numPartitions) { |
| IntervalKey intKey = new IntervalKey(key.toString()); |
| if(intKey.statName.equals(StatSeries.STAT_ALL_SLOT_TIME.toString())) { |
| if(intKey.taskType.equals("MAP")) |
| return 0; |
| else if(intKey.taskType.equals("REDUCE")) |
| return 1; |
| } else if(intKey.statName.equals( |
| StatSeries.STAT_SUBMIT_PENDING_SLOT_TIME.toString())) { |
| if(intKey.taskType.equals("MAP")) |
| return 2; |
| else if(intKey.taskType.equals("REDUCE")) |
| return 3; |
| } else if(intKey.statName.equals( |
| StatSeries.STAT_LAUNCHED_PENDING_SLOT_TIME.toString())) { |
| if(intKey.taskType.equals("MAP")) |
| return 4; |
| else if(intKey.taskType.equals("REDUCE")) |
| return 5; |
| } else if(intKey.statName.equals( |
| StatSeries.STAT_FAILED_SLOT_TIME.toString())) { |
| if(intKey.taskType.equals("MAP")) |
| return 6; |
| else if(intKey.taskType.equals("REDUCE")) |
| return 7; |
| } |
| return 8; |
| } |
| } |
| |
| private static void runJHLA( |
| Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, |
| Path outputDir, |
| Configuration fsConfig) throws IOException { |
| JobConf job = new JobConf(fsConfig, JHLogAnalyzer.class); |
| |
| job.setPartitionerClass(JHLAPartitioner.class); |
| |
| FileInputFormat.setInputPaths(job, INPUT_DIR); |
| job.setInputFormat(SequenceFileInputFormat.class); |
| |
| job.setMapperClass(mapperClass); |
| job.setReducerClass(AccumulatingReducer.class); |
| |
| FileOutputFormat.setOutputPath(job, outputDir); |
| job.setOutputKeyClass(Text.class); |
| job.setOutputValueClass(Text.class); |
| job.setNumReduceTasks(JHLAPartitioner.NUM_REDUCERS); |
| JobClient.runJob(job); |
| } |
| |
| private static class LoggingCollector implements OutputCollector<Text, Text> { |
| public void collect(Text key, Text value) throws IOException { |
| LOG.info(key + " == " + value); |
| } |
| } |
| |
| /** |
| * Run job history log analyser. |
| */ |
| public static void main(String[] args) { |
| Path resFileName = RESULT_FILE; |
| Configuration conf = new Configuration(); |
| |
| try { |
| conf.setInt("test.io.file.buffer.size", 0); |
| Path historyDir = DEFAULT_HISTORY_DIR; |
| String testFile = null; |
| boolean cleanup = false; |
| |
| boolean initControlFiles = true; |
| for (int i = 0; i < args.length; i++) { // parse command line |
| if (args[i].equalsIgnoreCase("-historyDir")) { |
| historyDir = new Path(args[++i]); |
| } else if (args[i].equalsIgnoreCase("-resFile")) { |
| resFileName = new Path(args[++i]); |
| } else if (args[i].equalsIgnoreCase("-usersIncluded")) { |
| conf.set("jhla.users.included", args[++i]); |
| } else if (args[i].equalsIgnoreCase("-usersExcluded")) { |
| conf.set("jhla.users.excluded", args[++i]); |
| } else if (args[i].equalsIgnoreCase("-gzip")) { |
| conf.set("jhla.compression.class", GzipCodec.class.getCanonicalName()); |
| } else if (args[i].equalsIgnoreCase("-jobDelimiter")) { |
| conf.set("jhla.job.delimiter.pattern", args[++i]); |
| } else if (args[i].equalsIgnoreCase("-jobDelimiterLength")) { |
| conf.setInt("jhla.job.delimiter.length", Integer.parseInt(args[++i])); |
| } else if(args[i].equalsIgnoreCase("-noInit")) { |
| initControlFiles = false; |
| } else if(args[i].equalsIgnoreCase("-test")) { |
| testFile = args[++i]; |
| } else if(args[i].equalsIgnoreCase("-clean")) { |
| cleanup = true; |
| } else if(args[i].equalsIgnoreCase("-jobQueue")) { |
| conf.set("mapred.job.queue.name", args[++i]); |
| } else if(args[i].startsWith("-Xmx")) { |
| conf.set("mapred.child.java.opts", args[i]); |
| } else { |
| printUsage(); |
| } |
| } |
| |
| if(cleanup) { |
| cleanup(conf); |
| return; |
| } |
| if(testFile != null) { |
| LOG.info("Start JHLA test ============ "); |
| LocalFileSystem lfs = FileSystem.getLocal(conf); |
| conf.set("fs.defaultFS", "file:///"); |
| JHLAMapper map = new JHLAMapper(conf); |
| map.parseLogFile(lfs, new Path(testFile), 0L, |
| new LoggingCollector(), Reporter.NULL); |
| return; |
| } |
| |
| FileSystem fs = FileSystem.get(conf); |
| if(initControlFiles) |
| createControlFile(fs, historyDir); |
| long tStart = System.currentTimeMillis(); |
| runJHLA(JHLAMapper.class, OUTPUT_DIR, conf); |
| long execTime = System.currentTimeMillis() - tStart; |
| |
| analyzeResult(fs, 0, execTime, resFileName); |
| } catch(IOException e) { |
| System.err.print(StringUtils.stringifyException(e)); |
| System.exit(-1); |
| } |
| } |
| |
| |
| private static void printUsage() { |
| String className = JHLogAnalyzer.class.getSimpleName(); |
| System.err.println("Usage: " + className |
| + "\n\t[-historyDir inputDir] | [-resFile resultFile] |" |
| + "\n\t[-usersIncluded | -usersExcluded userList] |" |
| + "\n\t[-gzip] | [-jobDelimiter pattern] |" |
| + "\n\t[-help | -clean | -test testFile]"); |
| System.exit(-1); |
| } |
| |
| private static Collection<String> getUserList(String users) { |
| if(users == null) |
| return null; |
| StringTokenizer tokens = new StringTokenizer(users, ",;"); |
| Collection<String> userList = new ArrayList<String>(tokens.countTokens()); |
| while(tokens.hasMoreTokens()) |
| userList.add(tokens.nextToken()); |
| return userList; |
| } |
| |
| /** |
| * Result is combined from all reduce output files and is written to |
| * RESULT_FILE in the format |
| * column 1: |
| */ |
| private static void analyzeResult( FileSystem fs, |
| int testType, |
| long execTime, |
| Path resFileName |
| ) throws IOException { |
| LOG.info("Analyzing results ..."); |
| DataOutputStream out = null; |
| BufferedWriter writer = null; |
| try { |
| out = new DataOutputStream(fs.create(resFileName)); |
| writer = new BufferedWriter(new OutputStreamWriter(out)); |
| writer.write("SERIES\tPERIOD\tTYPE\tSLOT_HOUR\n"); |
| FileStatus[] reduceFiles = fs.listStatus(OUTPUT_DIR); |
| assert reduceFiles.length == JHLAPartitioner.NUM_REDUCERS; |
| for(int i = 0; i < JHLAPartitioner.NUM_REDUCERS; i++) { |
| DataInputStream in = null; |
| BufferedReader lines = null; |
| try { |
| in = fs.open(reduceFiles[i].getPath()); |
| lines = new BufferedReader(new InputStreamReader(in)); |
| |
| String line; |
| while((line = lines.readLine()) != null) { |
| StringTokenizer tokens = new StringTokenizer(line, "\t*"); |
| String attr = tokens.nextToken(); |
| String dateTime = tokens.nextToken(); |
| String taskType = tokens.nextToken(); |
| double val = Long.parseLong(tokens.nextToken()) / |
| (double)DEFAULT_TIME_INTERVAL_MSEC; |
| writer.write(attr.substring(2)); // skip the stat type "l:" |
| writer.write("\t"); |
| writer.write(dateTime); |
| writer.write("\t"); |
| writer.write(taskType); |
| writer.write("\t"); |
| writer.write(String.valueOf((float)val)); |
| writer.newLine(); |
| } |
| } finally { |
| if(lines != null) lines.close(); |
| if(in != null) in.close(); |
| } |
| } |
| } finally { |
| if(writer != null) writer.close(); |
| if(out != null) out.close(); |
| } |
| LOG.info("Analyzing results ... done."); |
| } |
| |
| private static void cleanup(Configuration conf) throws IOException { |
| LOG.info("Cleaning up test files"); |
| FileSystem fs = FileSystem.get(conf); |
| fs.delete(new Path(JHLA_ROOT_DIR), true); |
| } |
| } |