blob: cd357a23da1b24c2e377992166c845b639543f48 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
EventHandler<JobEvent> {
private static final TaskAttemptCompletionEvent[]
EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
private static final Log LOG = LogFactory.getLog(JobImpl.class);
//The maximum fraction of fetch failures allowed for a map
private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
//final fields
private final ApplicationAttemptId applicationAttemptId;
private final Clock clock;
private final JobACLsManager aclsManager;
private final String username;
private final OutputCommitter committer;
private final Map<JobACL, AccessControlList> jobACLs;
private float setupWeight = 0.05f;
private float cleanupWeight = 0.05f;
private float mapWeight = 0.0f;
private float reduceWeight = 0.0f;
private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private final List<AMInfo> amInfos;
private final Lock readLock;
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final Object tasksSyncHandle = new Object();
private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
private final String queueName;
private final long appSubmitTime;
private boolean lazyTasksCopyNeeded = false;
volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = new Counters();
private Object fullCountersLock = new Object();
private Counters fullCounters = null;
private Counters finalMapCounters = null;
private Counters finalReduceCounters = null;
// FIXME:
//
// Can then replace task-level uber counters (MR-2424) with job-level ones
// sent from LocalContainerLauncher, and eventually including a count of
// of uber-AM attempts (probably sent from MRAppMaster).
public JobConf conf;
//fields initialized in init
private FileSystem fs;
private Path remoteJobSubmitDir;
public Path remoteJobConfFile;
private JobContext jobContext;
private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
private final List<String> diagnostics = new ArrayList<String>();
//task/attempt related datastructures
private final Map<TaskId, Integer> successAttemptCompletionEventNoMap =
new HashMap<TaskId, Integer>();
private final Map<TaskAttemptId, Integer> fetchFailuresMapping =
new HashMap<TaskAttemptId, Integer>();
private static final DiagnosticsUpdateTransition
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
new CounterUpdateTransition();
protected static final
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
stateMachineFactory
= new StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
(JobState.NEW)
// Transitions from NEW state
.addTransition(JobState.NEW, JobState.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.NEW, JobState.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(JobState.NEW,
EnumSet.of(JobState.INITED, JobState.FAILED),
JobEventType.JOB_INIT,
new InitTransition())
.addTransition(JobState.NEW, JobState.KILLED,
JobEventType.JOB_KILL,
new KillNewJobTransition())
.addTransition(JobState.NEW, JobState.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from INITED state
.addTransition(JobState.INITED, JobState.INITED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.INITED, JobState.INITED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobState.INITED, JobState.RUNNING,
JobEventType.JOB_START,
new StartTransition())
.addTransition(JobState.INITED, JobState.KILLED,
JobEventType.JOB_KILL,
new KillInitedJobTransition())
.addTransition(JobState.INITED, JobState.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
.addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
(JobState.RUNNING,
EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition())
.addTransition
(JobState.RUNNING,
EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
JobEventType.JOB_COMPLETED,
new JobNoTasksCompletedTransition())
.addTransition(JobState.RUNNING, JobState.KILL_WAIT,
JobEventType.JOB_KILL, new KillTasksTransition())
.addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
new MapTaskRescheduledTransition())
.addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
new TaskAttemptFetchFailureTransition())
.addTransition(
JobState.RUNNING,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from KILL_WAIT state.
.addTransition
(JobState.KILL_WAIT,
EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
JobEventType.JOB_TASK_COMPLETED,
new KillWaitTaskCompletedTransition())
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.KILL_WAIT,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from SUCCEEDED state
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.SUCCEEDED,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from FAILED state
.addTransition(JobState.FAILED, JobState.FAILED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.FAILED, JobState.FAILED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.FAILED,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.FAILED, JobState.FAILED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from KILLED state
.addTransition(JobState.KILLED, JobState.KILLED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.KILLED, JobState.KILLED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.KILLED,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.KILLED, JobState.KILLED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
JobState.ERROR,
JobState.ERROR,
EnumSet.of(JobEventType.JOB_INIT,
JobEventType.JOB_KILL,
JobEventType.JOB_TASK_COMPLETED,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR))
// create the topology tables
.installTopology();
private final StateMachine<JobState, JobEventType, JobEvent> stateMachine;
//changing fields while the job is running
private int numMapTasks;
private int numReduceTasks;
private int completedTaskCount = 0;
private int succeededMapTaskCount = 0;
private int succeededReduceTaskCount = 0;
private int failedMapTaskCount = 0;
private int failedReduceTaskCount = 0;
private int killedMapTaskCount = 0;
private int killedReduceTaskCount = 0;
private long startTime;
private long finishTime;
private float setupProgress;
private float mapProgress;
private float reduceProgress;
private float cleanupProgress;
private boolean isUber = false;
private Credentials fsTokens;
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos) {
this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
this.conf = new JobConf(conf);
this.metrics = metrics;
this.clock = clock;
this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
this.amInfos = amInfos;
this.userName = userName;
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.appSubmitTime = appSubmitTime;
this.oldJobId = TypeConverter.fromYarn(jobId);
this.newApiCommitter = newApiCommitter;
this.taskAttemptListener = taskAttemptListener;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.fsTokens = fsTokenCredentials;
this.jobTokenSecretManager = jobTokenSecretManager;
this.committer = committer;
this.aclsManager = new JobACLsManager(conf);
this.username = System.getProperty("user.name");
this.jobACLs = aclsManager.constructJobACLs(conf);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
return stateMachine;
}
@Override
public JobId getID() {
return jobId;
}
// Getter methods that make unit testing easier (package-scoped)
OutputCommitter getCommitter() {
return this.committer;
}
EventHandler getEventHandler() {
return this.eventHandler;
}
JobContext getJobContext() {
return this.jobContext;
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
AccessControlList jobACL = jobACLs.get(jobOperation);
return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
}
@Override
public Task getTask(TaskId taskID) {
readLock.lock();
try {
return tasks.get(taskID);
} finally {
readLock.unlock();
}
}
@Override
public int getCompletedMaps() {
readLock.lock();
try {
return succeededMapTaskCount + failedMapTaskCount + killedMapTaskCount;
} finally {
readLock.unlock();
}
}
@Override
public int getCompletedReduces() {
readLock.lock();
try {
return succeededReduceTaskCount + failedReduceTaskCount
+ killedReduceTaskCount;
} finally {
readLock.unlock();
}
}
@Override
public boolean isUber() {
return isUber;
}
@Override
public Counters getAllCounters() {
readLock.lock();
try {
JobState state = getState();
if (state == JobState.ERROR || state == JobState.FAILED
|| state == JobState.KILLED || state == JobState.SUCCEEDED) {
this.mayBeConstructFinalFullCounters();
return fullCounters;
}
Counters counters = new Counters();
counters.incrAllCounters(jobCounters);
return incrTaskCounters(counters, tasks.values());
} finally {
readLock.unlock();
}
}
public static Counters incrTaskCounters(
Counters counters, Collection<Task> tasks) {
for (Task task : tasks) {
counters.incrAllCounters(task.getCounters());
}
return counters;
}
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock();
try {
if (taskAttemptCompletionEvents.size() > fromEventId) {
int actualMax = Math.min(maxEvents,
(taskAttemptCompletionEvents.size() - fromEventId));
events = taskAttemptCompletionEvents.subList(fromEventId,
actualMax + fromEventId).toArray(events);
}
return events;
} finally {
readLock.unlock();
}
}
@Override
public List<String> getDiagnostics() {
readLock.lock();
try {
return diagnostics;
} finally {
readLock.unlock();
}
}
@Override
public JobReport getReport() {
readLock.lock();
try {
JobState state = getState();
// jobFile can be null if the job is not yet inited.
String jobFile =
remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, jobFile, amInfos, isUber);
}
computeProgress();
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
this.mapProgress, this.reduceProgress,
cleanupProgress, jobFile, amInfos, isUber);
} finally {
readLock.unlock();
}
}
@Override
public float getProgress() {
this.readLock.lock();
try {
computeProgress();
return (this.setupProgress * this.setupWeight + this.cleanupProgress
* this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress
* this.reduceWeight);
} finally {
this.readLock.unlock();
}
}
private void computeProgress() {
this.readLock.lock();
try {
float mapProgress = 0f;
float reduceProgress = 0f;
for (Task task : this.tasks.values()) {
if (task.getType() == TaskType.MAP) {
mapProgress += task.getProgress();
} else {
reduceProgress += task.getProgress();
}
}
if (this.numMapTasks != 0) {
mapProgress = mapProgress / this.numMapTasks;
}
if (this.numReduceTasks != 0) {
reduceProgress = reduceProgress / this.numReduceTasks;
}
this.mapProgress = mapProgress;
this.reduceProgress = reduceProgress;
} finally {
this.readLock.unlock();
}
}
@Override
public Map<TaskId, Task> getTasks() {
synchronized (tasksSyncHandle) {
lazyTasksCopyNeeded = true;
return Collections.unmodifiableMap(tasks);
}
}
@Override
public Map<TaskId,Task> getTasks(TaskType taskType) {
Map<TaskId, Task> localTasksCopy = tasks;
Map<TaskId, Task> result = new HashMap<TaskId, Task>();
Set<TaskId> tasksOfGivenType = null;
readLock.lock();
try {
if (TaskType.MAP == taskType) {
tasksOfGivenType = mapTasks;
} else {
tasksOfGivenType = reduceTasks;
}
for (TaskId taskID : tasksOfGivenType)
result.put(taskID, localTasksCopy.get(taskID));
return result;
} finally {
readLock.unlock();
}
}
@Override
public JobState getState() {
readLock.lock();
try {
return getStateMachine().getCurrentState();
} finally {
readLock.unlock();
}
}
protected void scheduleTasks(Set<TaskId> taskIDs) {
for (TaskId taskID : taskIDs) {
eventHandler.handle(new TaskEvent(taskID,
TaskEventType.T_SCHEDULE));
}
}
@Override
/**
* The only entry point to change the Job.
*/
public void handle(JobEvent event) {
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
try {
writeLock.lock();
JobState oldState = getState();
try {
getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
addDiagnostic("Invalid event " + event.getType() +
" on Job " + this.jobId);
eventHandler.handle(new JobEvent(this.jobId,
JobEventType.INTERNAL_ERROR));
}
//notify the eventhandler of state change
if (oldState != getState()) {
LOG.info(jobId + "Job Transitioned from " + oldState + " to "
+ getState());
}
}
finally {
writeLock.unlock();
}
}
//helpful in testing
protected void addTask(Task task) {
synchronized (tasksSyncHandle) {
if (lazyTasksCopyNeeded) {
Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>();
newTasks.putAll(tasks);
tasks = newTasks;
lazyTasksCopyNeeded = false;
}
}
tasks.put(task.getID(), task);
if (task.getType() == TaskType.MAP) {
mapTasks.add(task.getID());
} else if (task.getType() == TaskType.REDUCE) {
reduceTasks.add(task.getID());
}
metrics.waitingTask(task);
}
void setFinishTime() {
finishTime = clock.getTime();
}
void logJobHistoryFinishedEvent() {
this.setFinishTime();
JobFinishedEvent jfe = createJobFinishedEvent(this);
LOG.info("Calling handler for JobFinishedEvent ");
this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));
}
/**
* Create the default file System for this job.
* @param conf the conf object
* @return the default filesystem for this job
* @throws IOException
*/
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(conf);
}
static JobState checkJobCompleteSuccess(JobImpl job) {
// check for Job success
if (job.completedTaskCount == job.tasks.size()) {
try {
// Commit job & do cleanup
job.getCommitter().commitJob(job.getJobContext());
} catch (IOException e) {
LOG.warn("Could not do commit for Job", e);
}
job.logJobHistoryFinishedEvent();
return job.finished(JobState.SUCCEEDED);
}
return null;
}
JobState finished(JobState finalState) {
if (getState() == JobState.RUNNING) {
metrics.endRunningJob(this);
}
if (finishTime == 0) setFinishTime();
eventHandler.handle(new JobFinishEvent(jobId));
switch (finalState) {
case KILLED:
metrics.killedJob(this);
break;
case FAILED:
metrics.failedJob(this);
break;
case SUCCEEDED:
metrics.completedJob(this);
}
return finalState;
}
@Override
public String getUserName() {
return userName;
}
@Override
public String getQueueName() {
return queueName;
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
*/
@Override
public Path getConfFile() {
return remoteJobConfFile;
}
@Override
public String getName() {
return jobName;
}
@Override
public int getTotalMaps() {
return mapTasks.size(); //FIXME: why indirection? return numMapTasks...
// unless race? how soon can this get called?
}
@Override
public int getTotalReduces() {
return reduceTasks.size(); //FIXME: why indirection? return numReduceTasks
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getJobACLs()
*/
@Override
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.unmodifiableMap(jobACLs);
}
@Override
public List<AMInfo> getAMInfos() {
return amInfos;
}
/**
* Decide whether job can be run in uber mode based on various criteria.
* @param dataInputLength Total length for all splits
*/
private void makeUberDecision(long dataInputLength) {
//FIXME: need new memory criterion for uber-decision (oops, too late here;
// until AM-resizing supported,
// must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
//FIXME: handling multiple reduces within a single AM does not seem to
//work.
// int sysMaxReduces =
// job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
int sysMaxReduces = 1;
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from
// [File?]InputFormat and default block size
// from that
long sysMemSizeForUberSlot =
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
boolean uberEnabled =
conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
boolean smallInput = (dataInputLength <= sysMaxBytes);
// ignoring overhead due to UberAM and statics as negligible here:
boolean smallMemory =
( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
<= sysMemSizeForUberSlot)
|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
boolean notChainJob = !isChainJob(conf);
// User has overall veto power over uberization, or user can modify
// limits (overriding system settings and potentially shooting
// themselves in the head). Note that ChainMapper/Reducer are
// fundamentally incompatible with MR-1220; they employ a blocking
// queue between the maps/reduces and thus require parallel execution,
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
// and thus requires sequential execution.
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
&& smallInput && smallMemory && notChainJob;
if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
+ numReduceTasks + "r tasks (" + dataInputLength
+ " input bytes) will run sequentially on single node.");
// make sure reduces are scheduled only after all map are completed
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
// uber-subtask attempts all get launched on same node; if one fails,
// probably should retry elsewhere, i.e., move entire uber-AM: ergo,
// limit attempts to 1 (or at most 2? probably not...)
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(jobId).append(" because:");
if (!uberEnabled)
msg.append(" not enabled;");
if (!smallNumMapTasks)
msg.append(" too many maps;");
if (!smallNumReduceTasks)
msg.append(" too many reduces;");
if (!smallInput)
msg.append(" too much input;");
if (!smallMemory)
msg.append(" too much RAM;");
if (!notChainJob)
msg.append(" chainjob");
LOG.info(msg.toString());
}
}
/**
* ChainMapper and ChainReducer must execute in parallel, so they're not
* compatible with uberization/LocalContainerLauncher (100% sequential).
*/
private boolean isChainJob(Configuration conf) {
boolean isChainJob = false;
try {
String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
if (mapClassName != null) {
Class<?> mapClass = Class.forName(mapClassName);
if (ChainMapper.class.isAssignableFrom(mapClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainMapper
}
try {
String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
if (reduceClassName != null) {
Class<?> reduceClass = Class.forName(reduceClassName);
if (ChainReducer.class.isAssignableFrom(reduceClass))
isChainJob = true;
}
} catch (ClassNotFoundException cnfe) {
// don't care; assume it's not derived from ChainReducer
}
return isChainJob;
}
/*
private int getBlockSize() {
String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
if (inputClassName != null) {
Class<?> inputClass - Class.forName(inputClassName);
if (FileInputFormat<K, V>)
}
}
*/
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
/**
* Note that this transition method is called directly (and synchronously)
* by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
* just plain sequential call within AM context), so we can trigger
* modifications in AM state from here (at least, if AM is written that
* way; MR version is).
*/
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
job.makeUberDecision(inputLength);
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
job.allowedReduceFailuresPercent =
job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// do the setup
job.committer.setupJob(job.jobContext);
job.setupProgress = 1.0f;
// create the Tasks but don't start them yet
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobState.INITED;
//TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
} catch (IOException e) {
LOG.warn("Job init failed", e);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
job.metrics.endPreparingJob(job);
return job.finished(JobState.FAILED);
}
}
protected void setup(JobImpl job) throws IOException {
String oldJobIDString = job.oldJobId.toString();
String user =
UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(job.conf, user);
if(LOG.isDebugEnabled()) {
LOG.debug("startJobs: parent=" + path + " child=" + oldJobIDString);
}
job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
new Path(path, oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
// Prepare the TaskAttemptListener server for authentication of Containers
// TaskAttemptListener gets the information via jobTokenSecretManager.
JobTokenIdentifier identifier =
new JobTokenIdentifier(new Text(oldJobIDString));
job.jobToken =
new Token<JobTokenIdentifier>(identifier, job.jobTokenSecretManager);
job.jobToken.setService(identifier.getJobId());
// Add it to the jobTokenSecretManager so that TaskAttemptListener server
// can authenticate containers(tasks)
job.jobTokenSecretManager.addTokenForJob(oldJobIDString, job.jobToken);
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");
// Upload the jobTokens onto the remote FS so that ContainerManager can
// localize it to be used by the Containers(tasks)
Credentials tokenStorage = new Credentials();
TokenCache.setJobToken(job.jobToken, tokenStorage);
if (UserGroupInformation.isSecurityEnabled()) {
tokenStorage.addAll(job.fsTokens);
}
}
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens.getAllTokens(),
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics);
job.addTask(task);
}
LOG.info("Input size for job " + job.jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
}
private void createReduceTasks(JobImpl job) {
for (int i = 0; i < job.numReduceTasks; i++) {
TaskImpl task =
new ReduceTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.committer, job.jobToken,
job.fsTokens.getAllTokens(), job.clock,
job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics);
job.addTask(task);
}
LOG.info("Number of reduces for job " + job.jobId + " = "
+ job.numReduceTasks);
}
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
TaskSplitMetaInfo[] allTaskSplitMetaInfo;
try {
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
job.oldJobId, job.fs,
job.conf,
job.remoteJobSubmitDir);
} catch (IOException e) {
throw new YarnException(e);
}
return allTaskSplitMetaInfo;
}
/**
* If the number of tasks are greater than the configured value
* throw an exception that will fail job initialization
*/
private void checkTaskLimits() {
// no code, for now
}
} // end of InitTransition
public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
/**
* This transition executes in the event-dispatcher thread, though it's
* triggered in MRAppMaster's startJobs() method.
*/
@Override
public void transition(JobImpl job, JobEvent event) {
job.startTime = job.clock.getTime();
job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
job.scheduleTasks(job.reduceTasks);
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
job.numMapTasks, job.numReduceTasks,
job.getState().toString(),
job.isUber()); //Will transition to state running. Currently in INITED
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
job.eventHandler.handle(new JobEvent(job.jobId, JobEventType.JOB_COMPLETED));
}
}
}
private void abortJob(
org.apache.hadoop.mapreduce.JobStatus.State finalState) {
try {
committer.abortJob(jobContext, finalState);
} catch (IOException e) {
LOG.warn("Could not abortJob", e);
}
if (finishTime == 0) setFinishTime();
cleanupProgress = 1.0f;
JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
new JobUnsuccessfulCompletionEvent(oldJobId,
finishTime,
succeededMapTaskCount,
succeededReduceTaskCount,
finalState.toString());
eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
}
// JobFinishedEvent triggers the move of the history file out of the staging
// area. May need to create a new event type for this if JobFinished should
// not be generated for KilledJobs, etc.
private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
job.mayBeConstructFinalFullCounters();
JobFinishedEvent jfe = new JobFinishedEvent(
job.oldJobId, job.finishTime,
job.succeededMapTaskCount, job.succeededReduceTaskCount,
job.failedMapTaskCount, job.failedReduceTaskCount,
job.finalMapCounters,
job.finalReduceCounters,
job.fullCounters);
return jfe;
}
private void mayBeConstructFinalFullCounters() {
// Calculating full-counters. This should happen only once for the job.
synchronized (this.fullCountersLock) {
if (this.fullCounters != null) {
// Already constructed. Just return.
return;
}
this.constructFinalFullcounters();
}
}
@Private
public void constructFinalFullcounters() {
this.fullCounters = new Counters();
this.finalMapCounters = new Counters();
this.finalReduceCounters = new Counters();
this.fullCounters.incrAllCounters(jobCounters);
for (Task t : this.tasks.values()) {
Counters counters = t.getCounters();
switch (t.getType()) {
case MAP:
this.finalMapCounters.incrAllCounters(counters);
break;
case REDUCE:
this.finalReduceCounters.incrAllCounters(counters);
break;
}
this.fullCounters.incrAllCounters(counters);
}
}
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
private static class KillNewJobTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
JobState.KILLED.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobState.KILLED);
}
}
private static class KillInitedJobTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
job.addDiagnostic("Job received Kill in INITED state.");
job.finished(JobState.KILLED);
}
}
private static class KillTasksTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.addDiagnostic("Job received Kill while in RUNNING state.");
for (Task task : job.tasks.values()) {
job.eventHandler.handle(
new TaskEvent(task.getID(), TaskEventType.T_KILL));
}
job.metrics.endRunningJob(job);
}
}
private static class TaskAttemptCompletedEventTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
TaskAttemptCompletionEvent tce =
((JobTaskAttemptCompletedEvent) event).getCompletionEvent();
// Add the TaskAttemptCompletionEvent
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
//make the previous completion event as obsolete if it exists
Object successEventNo =
job.successAttemptCompletionEventNoMap.remove(tce.getAttemptId().getTaskId());
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get((Integer) successEventNo);
successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
}
if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
job.successAttemptCompletionEventNoMap.put(tce.getAttemptId().getTaskId(),
tce.getEventId());
}
}
}
private static class TaskAttemptFetchFailureTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
JobTaskAttemptFetchFailureEvent fetchfailureEvent =
(JobTaskAttemptFetchFailureEvent) event;
for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId :
fetchfailureEvent.getMaps()) {
Integer fetchFailures = job.fetchFailuresMapping.get(mapId);
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
job.fetchFailuresMapping.put(mapId, fetchFailures);
//get number of running reduces
int runningReduceTasks = 0;
for (TaskId taskId : job.reduceTasks) {
if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
runningReduceTasks++;
}
}
float failureRate = (float) fetchFailures / runningReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
job.fetchFailuresMapping.remove(mapId);
}
}
}
}
private static class TaskCompletedTransition implements
MultipleArcTransition<JobImpl, JobEvent, JobState> {
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event;
Task task = job.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
taskSucceeded(job, task);
} else if (taskEvent.getState() == TaskState.FAILED) {
taskFailed(job, task);
} else if (taskEvent.getState() == TaskState.KILLED) {
taskKilled(job, task);
}
return checkJobForCompletion(job);
}
protected JobState checkJobForCompletion(JobImpl job) {
//check for Job failure
if (job.failedMapTaskCount*100 >
job.allowedMapFailuresPercent*job.numMapTasks ||
job.failedReduceTaskCount*100 >
job.allowedReduceFailuresPercent*job.numReduceTasks) {
job.setFinishTime();
String diagnosticMsg = "Job failed as tasks failed. " +
"failedMaps:" + job.failedMapTaskCount +
" failedReduces:" + job.failedReduceTaskCount;
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
return job.finished(JobState.FAILED);
}
JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) {
return jobCompleteSuccess;
}
//return the current state, Job not finished yet
return job.getState();
}
private void taskSucceeded(JobImpl job, Task task) {
if (task.getType() == TaskType.MAP) {
job.succeededMapTaskCount++;
} else {
job.succeededReduceTaskCount++;
}
job.metrics.completedTask(task);
}
private void taskFailed(JobImpl job, Task task) {
if (task.getType() == TaskType.MAP) {
job.failedMapTaskCount++;
} else if (task.getType() == TaskType.REDUCE) {
job.failedReduceTaskCount++;
}
job.addDiagnostic("Task failed " + task.getID());
job.metrics.failedTask(task);
}
private void taskKilled(JobImpl job, Task task) {
if (task.getType() == TaskType.MAP) {
job.killedMapTaskCount++;
} else if (task.getType() == TaskType.REDUCE) {
job.killedReduceTaskCount++;
}
job.metrics.killedTask(task);
}
}
// Transition class for handling jobs with no tasks
static class JobNoTasksCompletedTransition implements
MultipleArcTransition<JobImpl, JobEvent, JobState> {
@Override
public JobState transition(JobImpl job, JobEvent event) {
JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) {
return jobCompleteSuccess;
}
// Return the current state, Job not finished yet
return job.getState();
}
}
private static class MapTaskRescheduledTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
//succeeded map task is restarted back
job.completedTaskCount--;
job.succeededMapTaskCount--;
}
}
private static class KillWaitTaskCompletedTransition extends
TaskCompletedTransition {
@Override
protected JobState checkJobForCompletion(JobImpl job) {
if (job.completedTaskCount == job.tasks.size()) {
job.setFinishTime();
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
return job.finished(JobState.KILLED);
}
//return the current state, Job not finished yet
return job.getState();
}
}
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
private static class DiagnosticsUpdateTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.addDiagnostic(((JobDiagnosticsUpdateEvent) event)
.getDiagnosticUpdate());
}
}
private static class CounterUpdateTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
.getCounterUpdates()) {
job.jobCounters.findCounter(ci.getCounterKey()).increment(
ci.getIncrementValue());
}
}
}
private static class InternalErrorTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
//TODO Is this JH event required.
job.setFinishTime();
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
JobState.ERROR.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobState.ERROR);
}
}
}