blob: c1e19b9c31c53593f4ce7f5c6608b6a6857b6a88 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.recover;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
/*
* Recovers the completed tasks from the previous life of Application Master.
* The completed tasks are deciphered from the history file of the previous life.
* Recovery service intercepts and replay the events for completed tasks.
* While recovery is in progress, the scheduling of new tasks are delayed by
* buffering the task schedule events.
* The recovery service controls the clock while recovery is in progress.
*/
//TODO:
//task cleanup for all non completed tasks
//change job output committer to have
// - atomic job output promotion
// - recover output of completed tasks
public class RecoveryService extends CompositeService implements Recovery {
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationId appID;
private final Dispatcher dispatcher;
private final ControlledClock clock;
private final int startCount;
private JobInfo jobInfo = null;
private final Map<TaskId, TaskInfo> completedTasks =
new HashMap<TaskId, TaskInfo>();
private final List<TaskEvent> pendingTaskScheduleEvents =
new ArrayList<TaskEvent>();
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationId appID, Clock clock, int startCount) {
super("RecoveringDispatcher");
this.appID = appID;
this.startCount = startCount;
this.dispatcher = new RecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
}
@Override
public void init(Configuration conf) {
super.init(conf);
// parse the history file
try {
parse();
if (completedTasks.size() > 0) {
recoveryMode = true;
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " +
"TO RECOVER " + completedTasks.size());
LOG.info("Job launch time " + jobInfo.getLaunchTime());
clock.setTime(jobInfo.getLaunchTime());
}
} catch (IOException e) {
LOG.warn(e);
LOG.warn("Could not parse the old history file. Aborting recovery. "
+ "Starting afresh.");
}
}
@Override
public Dispatcher getDispatcher() {
return dispatcher;
}
@Override
public Clock getClock() {
return clock;
}
@Override
public Set<TaskId> getCompletedTasks() {
return completedTasks.keySet();
}
private void parse() throws IOException {
// TODO: parse history file based on startCount
String jobName = TypeConverter.fromYarn(appID).toString();
String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
getConfig());
historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath, jobName, startCount - 1)); //read the previous history file
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
completedTasks
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
LOG.info("Read from history task "
+ TypeConverter.toYarn(taskInfo.getTaskId()));
}
}
LOG.info("Read completed tasks from history "
+ completedTasks.size());
}
class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler;
private final EventHandler handler;
RecoveryDispatcher() {
actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler);
}
@Override
public void dispatch(Event event) {
if (recoveryMode) {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID());
LOG.info("Attempt start time " + attInfo.getStartTime());
clock.setTime(attInfo.getStartTime());
} else if (event.getType() == TaskAttemptEventType.TA_DONE
|| event.getType() == TaskAttemptEventType.TA_FAILMSG
|| event.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID());
LOG.info("Attempt finish time " + attInfo.getFinishTime());
clock.setTime(attInfo.getFinishTime());
}
else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
|| event.getType() == TaskEventType.T_ATTEMPT_KILLED
|| event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
.getTaskId());
taskInfo.getAllTaskAttempts().remove(
TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
// remove the task info from completed tasks if all attempts are
// recovered
if (taskInfo.getAllTaskAttempts().size() == 0) {
completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
// checkForRecoveryComplete
LOG.info("CompletedTasks() " + completedTasks.size());
if (completedTasks.size() == 0) {
recoveryMode = false;
clock.reset();
LOG.info("Setting the recovery mode to false. " +
"Recovery is complete!");
// send all pending tasks schedule events
for (TaskEvent tEv : pendingTaskScheduleEvents) {
actualHandler.handle(tEv);
}
}
}
}
}
super.dispatch(event);
}
@Override
public EventHandler getEventHandler() {
return handler;
}
}
private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
TaskInfo taskInfo = completedTasks.get(id.getTaskId());
return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
}
private class InterceptingEventHandler implements EventHandler {
EventHandler actualHandler;
InterceptingEventHandler(EventHandler actualHandler) {
this.actualHandler = actualHandler;
}
@Override
public void handle(Event event) {
if (!recoveryMode) {
// delegate to the dispatcher one
actualHandler.handle(event);
return;
}
else if (event.getType() == TaskEventType.T_SCHEDULE) {
TaskEvent taskEvent = (TaskEvent) event;
// delay the scheduling of new tasks till previous ones are recovered
if (completedTasks.get(taskEvent.getTaskID()) == null) {
LOG.debug("Adding to pending task events "
+ taskEvent.getTaskID());
pendingTaskScheduleEvents.add(taskEvent);
return;
}
}
else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
LOG.debug("CONTAINER_REQ " + aId);
sendAssignedEvent(aId, attInfo);
return;
}
else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
LOG.debug("TASK_CLEAN");
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CLEANUP_DONE));
return;
}
else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
.getTaskAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
// send the status update event
sendStatusUpdateEvent(aId, attInfo);
TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
switch (state) {
case SUCCEEDED:
// send the done event
LOG.info("Sending done event to " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_DONE));
break;
case KILLED:
LOG.info("Sending kill event to " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_KILL));
break;
default:
LOG.info("Sending fail event to " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_FAILMSG));
break;
}
return;
}
// delegate to the actual handler
actualHandler.handle(event);
}
private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
TaskAttemptInfo attemptInfo) {
LOG.info("Sending status update event to " + yarnAttemptID);
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = yarnAttemptID;
taskAttemptStatus.progress = 1.0f;
taskAttemptStatus.diagnosticInfo = "";
taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
// taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
taskAttemptStatus.phase = Phase.CLEANUP;
org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
if (cntrs == null) {
taskAttemptStatus.counters = null;
} else {
taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo
.getCounters());
}
actualHandler.handle(new TaskAttemptStatusUpdateEvent(
taskAttemptStatus.id, taskAttemptStatus));
}
private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = recordFactory
.newRecordInstance(ContainerId.class);
Container container = recordFactory
.newRecordInstance(Container.class);
container.setId(cId);
container.setNodeId(recordFactory
.newRecordInstance(NodeId.class));
container.setContainerToken(null);
container.setNodeHttpAddress(attemptInfo.getHostname() + ":" +
attemptInfo.getHttpPort());
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container));
}
}
}