blob: dd002a4342a23e2f1d7d58f401300d6fc1290063 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.rumen;
import java.text.ParseException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
public class Task20LineHistoryEventEmitter extends HistoryEventEmitter {
static List<SingleEventEmitter> nonFinals =
new LinkedList<SingleEventEmitter>();
static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
Long originalStartTime = null;
TaskType originalTaskType = null;
static {
nonFinals.add(new TaskStartedEventEmitter());
nonFinals.add(new TaskUpdatedEventEmitter());
finals.add(new TaskFinishedEventEmitter());
finals.add(new TaskFailedEventEmitter());
}
protected Task20LineHistoryEventEmitter() {
super();
}
static private class TaskStartedEventEmitter extends SingleEventEmitter {
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
}
static private class TaskUpdatedEventEmitter extends SingleEventEmitter {
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
}
static private class TaskFinishedEventEmitter extends SingleEventEmitter {
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String status = line.get("TASK_STATUS");
String finishTime = line.get("FINISH_TIME");
String error = line.get("ERROR");
String counters = line.get("COUNTERS");
if (finishTime != null && error == null
&& (status != null && status.equalsIgnoreCase("success"))) {
Counters eventCounters = maybeParseCounters(counters);
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
if (that.originalTaskType == null) {
return null;
}
return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime),
that.originalTaskType, status, eventCounters);
}
return null;
}
}
static private class TaskFailedEventEmitter extends SingleEventEmitter {
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String status = line.get("TASK_STATUS");
String finishTime = line.get("FINISH_TIME");
String taskType = line.get("TASK_TYPE");
String error = line.get("ERROR");
if (finishTime != null
&& (error != null || (status != null && !status
.equalsIgnoreCase("success")))) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
TaskType originalTaskType =
that.originalTaskType == null ? Version20LogInterfaceUtils
.get20TaskType(taskType) : that.originalTaskType;
return new TaskFailedEvent(taskID, Long.parseLong(finishTime),
originalTaskType, error, status, null);
}
return null;
}
}
@Override
List<SingleEventEmitter> finalSEEs() {
return finals;
}
@Override
List<SingleEventEmitter> nonFinalSEEs() {
return nonFinals;
}
}