blob: 16b1a75e5c50b90b9ad7f632950c7cf174d25336 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
/**
* Provides methods for writing to and reading from job history.
* Job History works in an append mode, JobHistory and its inner classes provide methods
* to log job events.
*
* JobHistory is split into multiple files, format of each file is plain text where each line
* is of the format [type (key=value)*], where type identifies the type of the record.
* Type maps to UID of one of the inner classes of this class.
*
* Job history is maintained in a master index which contains star/stop times of all jobs with
* a few other job level properties. Apart from this each job's history is maintained in a seperate history
* file. name of job history files follows the format jobtrackerId_jobid
*
* For parsing the job history it supports a listener based interface where each line is parsed
* and passed to listener. The listener can create an object model of history or look for specific
* events and discard rest of the history.
*/
public class JobHistory {
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobHistory");
private static final String DELIMITER = " ";
private static final String KEY = "(\\w+)";
private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
public static final String JOBTRACKER_START_TIME =
String.valueOf(System.currentTimeMillis());
private static String JOBTRACKER_UNIQUE_STRING = null;
private static String LOG_DIR = null;
private static Map<String, ArrayList<PrintWriter>> openJobs =
new HashMap<String, ArrayList<PrintWriter>>();
private static boolean disableHistory = false;
/**
* Record types are identifiers for each line of log in history files.
* A record type appears as the first token in a single line of log.
*/
public static enum RecordTypes {
Jobtracker, Job, Task, MapAttempt, ReduceAttempt
}
/**
* Job history files contain key="value" pairs, where keys belong to this enum.
* It acts as a global namespace for all keys.
*/
public static enum Keys {
JOBTRACKERID,
START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS
}
/**
* This enum contains some of the values commonly used by history log events.
* since values in history can only be strings - Values.name() is used in
* most places in history file.
*/
public static enum Values {
SUCCESS, FAILED, KILLED, MAP, REDUCE
}
// temp buffer for parsed dataa
private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
/**
* Initialize JobHistory files.
* @param conf Jobconf of the job tracker.
* @param hostname jobtracker's hostname
* @return true if intialized properly
* false otherwise
*/
public static boolean init(JobConf conf, String hostname){
try {
LOG_DIR = conf.get("hadoop.job.history.location" ,
"file:///" + new File(
System.getProperty("hadoop.log.dir")).getAbsolutePath()
+ File.separator + "history");
JOBTRACKER_UNIQUE_STRING = hostname + "_" +
JOBTRACKER_START_TIME + "_";
Path logDir = new Path(LOG_DIR);
FileSystem fs = logDir.getFileSystem(conf);
if (!fs.exists(logDir)){
if (!fs.mkdirs(logDir)){
throw new IOException("Mkdirs failed to create " + logDir.toString());
}
}
conf.set("hadoop.job.history.location", LOG_DIR);
disableHistory = false;
} catch(IOException e) {
LOG.error("Failed to initialize JobHistory log file", e);
disableHistory = true;
}
return !(disableHistory);
}
/**
* Parses history file and invokes Listener.handle() for
* each line of history. It can be used for looking through history
* files for specific items without having to keep whole history in memory.
* @param path path to history file
* @param l Listener for history events
* @param fs FileSystem where history file is present
* @throws IOException
*/
public static void parseHistoryFromFS(String path, Listener l, FileSystem fs)
throws IOException{
FSDataInputStream in = fs.open(new Path(path));
BufferedReader reader = new BufferedReader(new InputStreamReader (in));
try {
String line = null;
StringBuffer buf = new StringBuffer();
while ((line = reader.readLine())!= null){
buf.append(line);
if (!line.trim().endsWith("\"")){
continue;
}
parseLine(buf.toString(), l);
buf = new StringBuffer();
}
} finally {
try { reader.close(); } catch (IOException ex) {}
}
}
/**
* Parse a single line of history.
* @param line
* @param l
* @throws IOException
*/
private static void parseLine(String line, Listener l)throws IOException{
// extract the record type
int idx = line.indexOf(' ');
String recType = line.substring(0, idx);
String data = line.substring(idx+1, line.length());
Matcher matcher = pattern.matcher(data);
while(matcher.find()){
String tuple = matcher.group(0);
String []parts = tuple.split("=");
parseBuffer.put(Keys.valueOf(parts[0]), parts[1].substring(1, parts[1].length() -1));
}
l.handle(RecordTypes.valueOf(recType), parseBuffer);
parseBuffer.clear();
}
/**
* Log a raw record type with keys and values. This is method is generally not used directly.
* @param recordType type of log event
* @param key key
* @param value value
*/
static void log(PrintWriter out, RecordTypes recordType, Keys key,
String value){
out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\"");
}
/**
* Log a number of keys and values with record. the array length of keys and values
* should be same.
* @param recordType type of log event
* @param keys type of log event
* @param values type of log event
*/
static void log(ArrayList<PrintWriter> writers, RecordTypes recordType,
Keys[] keys, String[] values) {
StringBuffer buf = new StringBuffer(recordType.name());
buf.append(DELIMITER);
for(int i =0; i< keys.length; i++){
buf.append(keys[i]);
buf.append("=\"");
buf.append(values[i]);
buf.append("\"");
buf.append(DELIMITER);
}
for (PrintWriter out : writers) {
out.println(buf.toString());
}
}
/**
* Returns history disable status. by default history is enabled so this
* method returns false.
* @return true if history logging is disabled, false otherwise.
*/
public static boolean isDisableHistory() {
return disableHistory;
}
/**
* Enable/disable history logging. Default value is false, so history
* is enabled by default.
* @param disableHistory true if history should be disabled, false otherwise.
*/
public static void setDisableHistory(boolean disableHistory) {
JobHistory.disableHistory = disableHistory;
}
/**
* Base class contais utility stuff to manage types key value pairs with enums.
*/
static class KeyValuePair{
private Map<Keys, String> values = new HashMap<Keys, String>();
/**
* Get 'String' value for given key. Most of the places use Strings as
* values so the default get' method returns 'String'. This method never returns
* null to ease on GUIs. if no value is found it returns empty string ""
* @param k
* @return if null it returns empty string - ""
*/
public String get(Keys k){
String s = values.get(k);
return s == null ? "" : s;
}
/**
* Convert value from history to int and return.
* if no value is found it returns 0.
* @param k key
*/
public int getInt(Keys k){
String s = values.get(k);
if (null != s){
return Integer.parseInt(s);
}
return 0;
}
/**
* Convert value from history to int and return.
* if no value is found it returns 0.
* @param k
*/
public long getLong(Keys k){
String s = values.get(k);
if (null != s){
return Long.parseLong(s);
}
return 0;
}
/**
* Set value for the key.
* @param k
* @param s
*/
public void set(Keys k, String s){
values.put(k, s);
}
/**
* Adds all values in the Map argument to its own values.
* @param m
*/
public void set(Map<Keys, String> m){
values.putAll(m);
}
/**
* Reads values back from the history, input is same Map as passed to Listener by parseHistory().
* @param values
*/
public synchronized void handle(Map<Keys, String> values){
set(values);
}
/**
* Returns Map containing all key-values.
*/
public Map<Keys, String> getValues(){
return values;
}
}
/**
* Helper class for logging or reading back events related to job start, finish or failure.
*/
public static class JobInfo extends KeyValuePair{
private Map<String, Task> allTasks = new TreeMap<String, Task>();
/** Create new JobInfo */
public JobInfo(String jobId){
set(Keys.JOBID, jobId);
}
/**
* Returns all map and reduce tasks <taskid-Task>.
*/
public Map<String, Task> getAllTasks() { return allTasks; }
@Deprecated
public static String getLocalJobFilePath(String jobid) {
return getLocalJobFilePath(JobID.forName(jobid));
}
/**
* Get the path of the locally stored job file
* @param jobId id of the job
* @return the path of the job file on the local file system
*/
public static String getLocalJobFilePath(JobID jobId){
return System.getProperty("hadoop.log.dir") + File.separator +
jobId + "_conf.xml";
}
/**
* Helper function to encode the URL of the path of the job-history
* log file.
*
* @param logFile path of the job-history file
* @return URL encoded path
* @throws IOException
*/
public static String encodeJobHistoryFilePath(String logFile)
throws IOException {
Path rawPath = new Path(logFile);
String encodedFileName = null;
try {
encodedFileName = URLEncoder.encode(rawPath.getName(), "UTF-8");
} catch (UnsupportedEncodingException uee) {
IOException ioe = new IOException();
ioe.initCause(uee);
ioe.setStackTrace(uee.getStackTrace());
throw ioe;
}
Path encodedPath = new Path(rawPath.getParent(), encodedFileName);
return encodedPath.toString();
}
/**
* Helper function to encode the URL of the filename of the job-history
* log file.
*
* @param logFileName file name of the job-history file
* @return URL encoded filename
* @throws IOException
*/
public static String encodeJobHistoryFileName(String logFileName)
throws IOException {
String encodedFileName = null;
try {
encodedFileName = URLEncoder.encode(logFileName, "UTF-8");
} catch (UnsupportedEncodingException uee) {
IOException ioe = new IOException();
ioe.initCause(uee);
ioe.setStackTrace(uee.getStackTrace());
throw ioe;
}
return encodedFileName;
}
/**
* Helper function to decode the URL of the filename of the job-history
* log file.
*
* @param logFileName file name of the job-history file
* @return URL decoded filename
* @throws IOException
*/
public static String decodeJobHistoryFileName(String logFileName)
throws IOException {
String decodedFileName = null;
try {
decodedFileName = URLDecoder.decode(logFileName, "UTF-8");
} catch (UnsupportedEncodingException uee) {
IOException ioe = new IOException();
ioe.initCause(uee);
ioe.setStackTrace(uee.getStackTrace());
throw ioe;
}
return decodedFileName;
}
@Deprecated
public static void logSubmitted(String jobid, JobConf jobConf,
String jobConfPath, long submitTime
) throws IOException {
logSubmitted(JobID.forName(jobid), jobConf, jobConfPath, submitTime);
}
/**
* Log job submitted event to history. Creates a new file in history
* for the job. if history file creation fails, it disables history
* for all other events.
* @param jobId job id assigned by job tracker.
* @param jobConf job conf of the job
* @param jobConfPath path to job conf xml file in HDFS.
* @param submitTime time when job tracker received the job
* @throws IOException
*/
public static void logSubmitted(JobID jobId, JobConf jobConf,
String jobConfPath, long submitTime)
throws IOException {
FileSystem fs = null;
String userLogDir = null;
String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
if (!disableHistory){
// Get the username and job name to be used in the actual log filename;
// sanity check them too
String jobName = jobConf.getJobName();
if (jobName == null || jobName.length() == 0) {
jobName = "NA";
}
String user = jobConf.getUser();
if (user == null || user.length() == 0) {
user = "NA";
}
// setup the history log file for this job
String logFileName =
encodeJobHistoryFileName(jobUniqueString + "_" + user + "_" +
jobName);
// find user log directory
Path outputPath = FileOutputFormat.getOutputPath(jobConf);
userLogDir = jobConf.get("hadoop.job.history.user.location",
outputPath == null ? null : outputPath.toString());
if ("none".equals(userLogDir)) {
userLogDir = null;
}
if (userLogDir != null) {
userLogDir = userLogDir + Path.SEPARATOR + "_logs" +
Path.SEPARATOR + "history";
}
Path logFile = null;
Path userLogFile = null;
if (LOG_DIR != null ) {
logFile = new Path(LOG_DIR, logFileName);
}
if (userLogDir != null ) {
userLogFile = new Path(userLogDir, logFileName);
}
try{
ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
FSDataOutputStream out = null;
PrintWriter writer = null;
if (LOG_DIR != null) {
// create output stream for logging in hadoop.job.history.location
fs = new Path(LOG_DIR).getFileSystem(jobConf);
out = fs.create(logFile, true, 4096);
writer = new PrintWriter(out);
writers.add(writer);
}
if (userLogDir != null) {
// create output stream for logging
// in hadoop.job.history.user.location
fs = new Path(userLogDir).getFileSystem(jobConf);
out = fs.create(userLogFile, true, 4096);
writer = new PrintWriter(out);
writers.add(writer);
}
openJobs.put(jobUniqueString, writers);
//add to writer as well
JobHistory.log(writers, RecordTypes.Job,
new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF },
new String[]{jobId.toString(), jobName, user,
String.valueOf(submitTime) , jobConfPath}
);
}catch(IOException e){
LOG.error("Failed creating job history log file, disabling history", e);
disableHistory = true;
}
}
// Always store job conf on local file system
String localJobFilePath = JobInfo.getLocalJobFilePath(jobId);
File localJobFile = new File(localJobFilePath);
FileOutputStream jobOut = null;
try {
jobOut = new FileOutputStream(localJobFile);
jobConf.write(jobOut);
if (LOG.isDebugEnabled()) {
LOG.debug("Job conf for " + jobId + " stored at "
+ localJobFile.getAbsolutePath());
}
} catch (IOException ioe) {
LOG.error("Failed to store job conf on the local filesystem ", ioe);
} finally {
if (jobOut != null) {
try {
jobOut.close();
} catch (IOException ie) {
LOG.info("Failed to close the job configuration file "
+ StringUtils.stringifyException(ie));
}
}
}
/* Storing the job conf on the log dir */
Path jobFilePath = null;
if (LOG_DIR != null) {
jobFilePath = new Path(LOG_DIR + File.separator +
jobUniqueString + "_conf.xml");
}
Path userJobFilePath = null;
if (userLogDir != null) {
userJobFilePath = new Path(userLogDir + File.separator +
jobUniqueString + "_conf.xml");
}
FSDataOutputStream jobFileOut = null;
try {
if (LOG_DIR != null) {
fs = new Path(LOG_DIR).getFileSystem(jobConf);
if (!fs.exists(jobFilePath)) {
jobFileOut = fs.create(jobFilePath);
jobConf.write(jobFileOut);
jobFileOut.close();
}
}
if (userLogDir != null) {
fs = new Path(userLogDir).getFileSystem(jobConf);
jobFileOut = fs.create(userJobFilePath);
jobConf.write(jobFileOut);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Job conf for " + jobId + " stored at "
+ jobFilePath + "and" + userJobFilePath );
}
} catch (IOException ioe) {
LOG.error("Failed to store job conf on the local filesystem ", ioe);
} finally {
if (jobFileOut != null) {
try {
jobFileOut.close();
} catch (IOException ie) {
LOG.info("Failed to close the job configuration file "
+ StringUtils.stringifyException(ie));
}
}
}
}
@Deprecated
public static void logStarted(String jobid, long startTime, int totalMaps,
int totalReduces) {
logStarted(JobID.forName(jobid), startTime, totalMaps, totalReduces);
}
/**
* Logs launch time of job.
* @param jobId job id, assigned by jobtracker.
* @param startTime start time of job.
* @param totalMaps total maps assigned by jobtracker.
* @param totalReduces total reduces.
*/
public static void logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces){
if (!disableHistory){
String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
new String[] {jobId.toString(), String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)});
}
}
}
@Deprecated
public static void logFinished(String jobId, long finishTime,
int finishedMaps, int finishedReduces,
int failedMaps, int failedReduces,
Counters counters) {
logFinished(JobID.forName(jobId), finishTime, finishedMaps,
finishedReduces, failedMaps, failedReduces, counters);
}
/**
* Log job finished. closes the job file in history.
* @param jobId job id, assigned by jobtracker.
* @param finishTime finish time of job in ms.
* @param finishedMaps no of maps successfully finished.
* @param finishedReduces no of reduces finished sucessfully.
* @param failedMaps no of failed map tasks.
* @param failedReduces no of failed reduce tasks.
* @param counters the counters from the job
*/
public static void logFinished(JobID jobId, long finishTime,
int finishedMaps, int finishedReduces,
int failedMaps, int failedReduces,
Counters counters){
if (!disableHistory){
// close job file for this job
String logFileKey = JOBTRACKER_UNIQUE_STRING + jobId;
ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.FINISH_TIME,
Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES,
Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
Keys.COUNTERS},
new String[] {jobId.toString(), Long.toString(finishTime),
Values.SUCCESS.name(),
String.valueOf(finishedMaps),
String.valueOf(finishedReduces),
String.valueOf(failedMaps),
String.valueOf(failedReduces),
counters.makeCompactString()});
for (PrintWriter out : writer) {
out.close();
}
openJobs.remove(logFileKey);
}
Thread historyCleaner = new Thread(new HistoryCleaner());
historyCleaner.start();
}
}
@Deprecated
public static void logFailed(String jobid, long timestamp,
int finishedMaps, int finishedReduces) {
logFailed(JobID.forName(jobid), timestamp, finishedMaps, finishedReduces);
}
/**
* Logs job failed event. Closes the job history log file.
* @param jobid job id
* @param timestamp time when job failure was detected in ms.
* @param finishedMaps no finished map tasks.
* @param finishedReduces no of finished reduce tasks.
*/
public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
if (!disableHistory){
String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
if (null != writer){
JobHistory.log(writer, RecordTypes.Job,
new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
new String[] {jobid.toString(), String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps),
String.valueOf(finishedReduces)});
for (PrintWriter out : writer) {
out.close();
}
openJobs.remove(logFileKey);
}
}
}
}
/**
* Helper class for logging or reading back events related to Task's start, finish or failure.
* All events logged by this class are logged in a separate file per job in
* job tracker history. These events map to TIPs in jobtracker.
*/
public static class Task extends KeyValuePair{
private Map <String, TaskAttempt> taskAttempts = new TreeMap<String, TaskAttempt>();
@Deprecated
public static void logStarted(String jobId, String taskId, String taskType,
long startTime) {
logStarted(TaskID.forName(taskId), taskType, startTime, "n/a");
}
/**
* Log start time of task (TIP).
* @param taskId task id
* @param taskType MAP or REDUCE
* @param startTime startTime of tip.
*/
public static void logStarted(TaskID taskId, String taskType,
long startTime, String splitLocations) {
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
Keys.START_TIME, Keys.SPLITS},
new String[]{taskId.toString(), taskType,
String.valueOf(startTime),
splitLocations});
}
}
}
@Deprecated
public static void logFinished(String jobid, String taskid, String taskType,
long finishTime, Counters counters) {
logFinished(TaskID.forName(taskid), taskType, finishTime, counters);
}
/**
* Log finish time of task.
* @param taskId task id
* @param taskType MAP or REDUCE
* @param finishTime finish timeof task in ms
*/
public static void logFinished(TaskID taskId, String taskType,
long finishTime, Counters counters){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
Keys.TASK_STATUS, Keys.FINISH_TIME,
Keys.COUNTERS},
new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(),
String.valueOf(finishTime),
counters.makeCompactString()});
}
}
}
@Deprecated
public static void logFailed(String jobid, String taskid, String taskType,
long time, String error) {
logFailed(TaskID.forName(taskid), taskType, time, error);
}
/**
* Log job failed event.
* @param taskId task id
* @param taskType MAP or REDUCE.
* @param time timestamp when job failed detected.
* @param error error message for failure.
*/
public static void logFailed(TaskID taskId, String taskType, long time, String error){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.Task,
new Keys[]{Keys.TASKID, Keys.TASK_TYPE,
Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR},
new String[]{ taskId.toString(), taskType, Values.FAILED.name(), String.valueOf(time) , error});
}
}
}
/**
* Returns all task attempts for this task. <task attempt id - TaskAttempt>
*/
public Map<String, TaskAttempt> getTaskAttempts(){
return this.taskAttempts;
}
}
/**
* Base class for Map and Reduce TaskAttempts.
*/
public static class TaskAttempt extends Task{}
/**
* Helper class for logging or reading back events related to start, finish or failure of
* a Map Attempt on a node.
*/
public static class MapAttempt extends TaskAttempt{
@Deprecated
public static void logStarted(String jobid, String taskid, String attemptid,
long startTime, String hostName) {
logStarted(TaskAttemptID.forName(attemptid), startTime, hostName);
}
/**
* Log start time of this map task attempt.
* @param taskAttemptId task attempt id
* @param startTime start time of task attempt as reported by task tracker.
* @param hostName host name of the task attempt.
*/
public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
Keys.HOSTNAME},
new String[]{Values.MAP.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), String.valueOf(startTime), hostName});
}
}
}
@Deprecated
public static void logFinished(String jobid, String taskid,
String attemptid, long time, String host) {
logFinished(TaskAttemptID.forName(attemptid), time, host);
}
/**
* Log finish time of map task attempt.
* @param taskAttemptId task attempt id
* @param finishTime finish time
* @param hostName host name
*/
public static void logFinished(TaskAttemptID taskAttemptId, long finishTime,
String hostName){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME},
new String[]{Values.MAP.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), Values.SUCCESS.name(),
String.valueOf(finishTime), hostName});
}
}
}
@Deprecated
public static void logFailed(String jobid, String taskid,
String attemptid, long timestamp, String host,
String err) {
logFailed(TaskAttemptID.forName(attemptid), timestamp, host, err);
}
/**
* Log task attempt failed event.
* @param taskAttemptId task attempt id
* @param timestamp timestamp
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
*/
public static void logFailed(TaskAttemptID taskAttemptId,
long timestamp, String hostName, String error){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
new String[]{ Values.MAP.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), Values.FAILED.name(),
String.valueOf(timestamp), hostName, error});
}
}
}
@Deprecated
public static void logKilled(String jobid, String taskid, String attemptid,
long timestamp, String hostname, String error){
logKilled(TaskAttemptID.forName(attemptid), timestamp, hostname, error);
}
/**
* Log task attempt killed event.
* @param taskAttemptId task attempt id
* @param timestamp timestamp
* @param hostName hostname of this task attempt.
* @param error error message if any for this task attempt.
*/
public static void logKilled(TaskAttemptID taskAttemptId,
long timestamp, String hostName, String error){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.MapAttempt,
new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
new String[]{ Values.MAP.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), Values.KILLED.name(),
String.valueOf(timestamp), hostName, error});
}
}
}
}
/**
* Helper class for logging or reading back events related to start, finish or failure of
* a Map Attempt on a node.
*/
public static class ReduceAttempt extends TaskAttempt{
@Deprecated
public static void logStarted(String jobid, String taskid, String attemptid,
long startTime, String hostName) {
logStarted(TaskAttemptID.forName(attemptid), startTime, hostName);
}
/**
* Log start time of Reduce task attempt.
* @param taskAttemptId task attempt id
* @param startTime start time
* @param hostName host name
*/
public static void logStarted(TaskAttemptID taskAttemptId,
long startTime, String hostName){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
new String[]{Values.REDUCE.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), String.valueOf(startTime), hostName});
}
}
}
@Deprecated
public static void logFinished(String jobid, String taskid, String attemptid,
long shuffleFinished, long sortFinished,
long finishTime, String hostname) {
logFinished(TaskAttemptID.forName(attemptid), shuffleFinished,
sortFinished, finishTime, hostname);
}
/**
* Log finished event of this task.
* @param taskAttemptId task attempt id
* @param shuffleFinished shuffle finish time
* @param sortFinished sort finish time
* @param finishTime finish time of task
* @param hostName host name where task attempt executed
*/
public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished,
long sortFinished, long finishTime,
String hostName){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
Keys.FINISH_TIME, Keys.HOSTNAME},
new String[]{Values.REDUCE.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), Values.SUCCESS.name(),
String.valueOf(shuffleFinished), String.valueOf(sortFinished),
String.valueOf(finishTime), hostName});
}
}
}
@Deprecated
public static void logFailed(String jobid, String taskid, String attemptid,
long timestamp, String hostname, String error){
logFailed(TaskAttemptID.forName(attemptid), timestamp, hostname, error);
}
/**
* Log failed reduce task attempt.
* @param taskAttemptId task attempt id
* @param timestamp time stamp when task failed
* @param hostName host name of the task attempt.
* @param error error message of the task.
*/
public static void logFailed(TaskAttemptID taskAttemptId, long timestamp,
String hostName, String error){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
new String[]{ Values.REDUCE.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), Values.FAILED.name(),
String.valueOf(timestamp), hostName, error });
}
}
}
@Deprecated
public static void logKilled(String jobid, String taskid, String attemptid,
long timestamp, String hostname, String error){
logKilled(TaskAttemptID.forName(attemptid), timestamp, hostname, error);
}
/**
* Log killed reduce task attempt.
* @param taskAttemptId task attempt id
* @param timestamp time stamp when task failed
* @param hostName host name of the task attempt.
* @param error error message of the task.
*/
public static void logKilled(TaskAttemptID taskAttemptId, long timestamp,
String hostName, String error){
if (!disableHistory){
ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING
+ taskAttemptId.getJobID());
if (null != writer){
JobHistory.log(writer, RecordTypes.ReduceAttempt,
new Keys[]{ Keys.TASK_TYPE, Keys.TASKID,
Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,
Keys.FINISH_TIME, Keys.HOSTNAME,
Keys.ERROR },
new String[]{ Values.REDUCE.name(), taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(), Values.KILLED.name(),
String.valueOf(timestamp), hostName, error });
}
}
}
}
/**
* Callback interface for reading back log events from JobHistory. This interface
* should be implemented and passed to JobHistory.parseHistory()
*
*/
public static interface Listener{
/**
* Callback method for history parser.
* @param recType type of record, which is the first entry in the line.
* @param values a map of key-value pairs as thry appear in history.
* @throws IOException
*/
public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException;
}
/**
* Delete history files older than one month. Update master index and remove all
* jobs older than one month. Also if a job tracker has no jobs in last one month
* remove reference to the job tracker.
*
*/
public static class HistoryCleaner implements Runnable{
static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
static final long THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS;
private long now;
private static boolean isRunning = false;
private static long lastRan;
/**
* Cleans up history data.
*/
public void run(){
if (isRunning){
return;
}
now = System.currentTimeMillis();
// clean history only once a day at max
if (lastRan ==0 || (now - lastRan) < ONE_DAY_IN_MS){
return;
}
lastRan = now;
isRunning = true;
File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){
public boolean accept(File file){
// delete if older than 30 days
if (now - file.lastModified() > THIRTY_DAYS_IN_MS){
return true;
}
return false;
}
});
for(File f : oldFiles){
f.delete();
LOG.info("Deleting old history file : " + f.getName());
}
isRunning = false;
}
}
}