blob: 1f9ea90c7cc11fb660e023505ce0607a89598e79 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MapReduceChildJVM;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
import org.apache.hadoop.mapreduce.v2.api.records.Locality;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of TaskAttempt interface.
*/
@SuppressWarnings({ "rawtypes" })
public abstract class TaskAttemptImpl implements
org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
EventHandler<TaskAttemptEvent> {
@VisibleForTesting
protected final static Map<TaskType, Resource> RESOURCE_REQUEST_CACHE
= new HashMap<>();
static final Counters EMPTY_COUNTERS = new Counters();
private static final Logger LOG =
LoggerFactory.getLogger(TaskAttemptImpl.class);
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final JobConf conf;
protected final Path jobFile;
protected final int partition;
protected EventHandler eventHandler;
private final TaskAttemptId attemptId;
private final Clock clock;
private final org.apache.hadoop.mapred.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private Resource resourceCapability;
protected Set<String> dataLocalHosts;
protected Set<String> dataLocalRacks;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
private final AppContext appContext;
private Credentials credentials;
private Token<JobTokenIdentifier> jobToken;
private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
private static String initialClasspath = null;
private static String initialAppClasspath = null;
private static Object commonContainerSpecLock = new Object();
private static ContainerLaunchContext commonContainerSpec = null;
private static final Object classpathLock = new Object();
private long launchTime;
private long finishTime;
private WrappedProgressSplitsBlock progressSplitBlock;
private int shufflePort = -1;
private String trackerName;
private int httpPort;
private Locality locality;
private Avataar avataar;
private boolean rescheduleNextAttempt = false;
private boolean failFast = false;
private static final CleanupContainerTransition
CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
private static final MoveContainerToSucceededFinishingTransition
SUCCEEDED_FINISHING_TRANSITION =
new MoveContainerToSucceededFinishingTransition();
private static final MoveContainerToFailedFinishingTransition
FAILED_FINISHING_TRANSITION =
new MoveContainerToFailedFinishingTransition();
private static final ExitFinishingOnTimeoutTransition
FINISHING_ON_TIMEOUT_TRANSITION =
new ExitFinishingOnTimeoutTransition();
private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION =
new FinalizeFailedTransition();
private static final DiagnosticInformationUpdater
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION
= new DiagnosticInformationUpdater();
private static final EnumSet<TaskAttemptEventType>
FAILED_KILLED_STATE_IGNORED_EVENTS = EnumSet.of(
TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
private static final StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory
= new StateMachineFactory
<TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptStateInternal.NEW)
// Transitions from the NEW state.
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW,
EnumSet.of(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
.addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
EnumSet.of(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED,
new ContainerAssignedTransition())
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
TaskAttemptStateInternal.KILLED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
new LaunchedContainerTransition())
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
.addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
FINALIZE_FAILED_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
CLEANUP_CONTAINER_TRANSITION)
// Transitions from RUNNING state.
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// If no commit is required, task goes to finishing state
// This will give a chance for the container to exit by itself
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
// If commit is required, task goes through commit pending state.
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
// Failure handling while RUNNING
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION)
//for handling container exit without sending the done or fail msg
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
FINALIZE_FAILED_TRANSITION)
// Timeout handling while RUNNING
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
// Transitions from SUCCESS_FINISHING_CONTAINER state
// When the container exits by itself, the notification of container
// completed event will be routed via NM -> RM -> AM.
// After MRAppMaster gets notification from RM, it will generate
// TA_CONTAINER_COMPLETED event.
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
new ExitFinishingOnContainerCompletedTransition())
// Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to
// SUCCESS_FINISHING_CONTAINER, it is possible to receive the event
// TA_CONTAINER_CLEANED in the following scenario.
// 1. It is the last task for the job.
// 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job.
// 3. Job will be marked completed.
// 4. As part of MRAppMaster's shutdown, all containers will be killed.
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
new ExitFinishingOnContainerCleanedupTransition())
// The client wants to kill the task. Given the task is in finishing
// state, it could go to succeeded state or killed state. If it is a
// reducer, it will go to succeeded state;
// otherwise, it goes to killed state.
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP),
TaskAttemptEventType.TA_KILL,
new KilledAfterSucceededFinishingTransition())
// The attempt stays in finishing state for too long
// Let us clean up the container
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition())
// ignore-able events
.addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
EnumSet.of(TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
// Transitions from FAIL_FINISHING_CONTAINER state
// When the container exits by itself, the notification of container
// completed event will be routed via NM -> RM -> AM.
// After MRAppMaster gets notification from RM, it will generate
// TA_CONTAINER_COMPLETED event.
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
new ExitFinishingOnContainerCompletedTransition())
// Given TA notifies task T_ATTEMPT_FAILED when it transitions to
// FAIL_FINISHING_CONTAINER, it is possible to receive the event
// TA_CONTAINER_CLEANED in the following scenario.
// 1. It is the last task attempt for the task.
// 2. After the task receives T_ATTEMPT_FAILED, it will notify job.
// 3. Job will be marked failed.
// 4. As part of MRAppMaster's shutdown, all containers will be killed.
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
new ExitFinishingOnContainerCleanedupTransition())
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// ignore-able events
.addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
// Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
new StatusUpdater())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
FINALIZE_FAILED_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// AM is likely to receive duplicate TA_COMMIT_PENDINGs as the task attempt
// will re-send the commit message until it doesn't encounter any
// IOException and succeeds in delivering the commit message.
// Ignoring the duplicate commit message is a short-term fix. In long term,
// we need to make use of retry cache to help this and other MR protocol
// APIs that can be considered as @AtMostOnce.
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING)
// Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container
.addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_CONTAINER_CLEANED)
.addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition())
.addTransition(
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
.addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAIL_CONTAINER_CLEANUP state.
.addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
.addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
.addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from KILL_CONTAINER_CLEANUP
.addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
.addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
.addTransition(
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from FAIL_TASK_CLEANUP
// run the task cleanup
.addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
new FailedTransition())
.addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
.addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from KILL_TASK_CLEANUP
.addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
new KilledTransition())
.addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events
.addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptStateInternal.KILL_TASK_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE,
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_PREEMPTED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from SUCCEEDED
.addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts
TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
TaskAttemptEventType.TA_KILL,
new KilledAfterSuccessTransition())
.addTransition(
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for SUCCEEDED state
.addTransition(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
// TaskAttemptFinishingMonitor might time out the attempt right
// after the attempt receives TA_CONTAINER_COMPLETED.
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAILED state
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for FAILED state
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
FAILED_KILLED_STATE_IGNORED_EVENTS)
// Transitions from KILLED state
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for KILLED state
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
FAILED_KILLED_STATE_IGNORED_EVENTS)
// create the topology tables
.installTopology();
private final StateMachine
<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachine;
@VisibleForTesting
public Container container;
private String nodeRackName;
private WrappedJvmID jvmID;
//this takes good amount of memory ~ 30KB. Instantiate it lazily
//and make it null once task is launched.
private org.apache.hadoop.mapred.Task remoteTask;
//this is the last status reported by the REMOTE running attempt
private TaskAttemptStatus reportedStatus;
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
public TaskAttemptImpl(TaskId taskId, int i,
EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, String[] dataLocalHosts,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
oldJobId = TypeConverter.fromYarn(taskId.getJobId());
this.conf = conf;
this.clock = clock;
attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
attemptId.setTaskId(taskId);
attemptId.setId(i);
this.taskAttemptListener = taskAttemptListener;
this.appContext = appContext;
// Initialize reportedStatus
reportedStatus = new TaskAttemptStatus();
initTaskAttemptStatus(reportedStatus);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.credentials = credentials;
this.jobToken = jobToken;
this.eventHandler = eventHandler;
this.jobFile = jobFile;
this.partition = partition;
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
populateResourceCapability(taskId.getTaskType());
this.dataLocalHosts = resolveHosts(dataLocalHosts);
RackResolver.init(conf);
this.dataLocalRacks = new HashSet<String>();
for (String host : this.dataLocalHosts) {
this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
}
locality = Locality.OFF_SWITCH;
avataar = Avataar.VIRGIN;
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
private void populateResourceCapability(TaskType taskType) {
String resourceTypePrefix =
getResourceTypePrefix(taskType);
boolean memorySet = false;
boolean cpuVcoresSet = false;
if (RESOURCE_REQUEST_CACHE.get(taskType) != null) {
resourceCapability = RESOURCE_REQUEST_CACHE.get(taskType);
return;
}
if (resourceTypePrefix != null) {
List<ResourceInformation> resourceRequests =
ResourceUtils.getRequestedResourcesFromConfig(conf,
resourceTypePrefix);
for (ResourceInformation resourceRequest : resourceRequests) {
String resourceName = resourceRequest.getName();
if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
resourceName)) {
if (memorySet) {
throw new IllegalArgumentException(
"Only one of the following keys " +
"can be specified for a single job: " +
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
}
String units = isEmpty(resourceRequest.getUnits()) ?
ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
resourceRequest.getUnits();
this.resourceCapability.setMemorySize(
UnitsConversionUtil.convert(units, "Mi",
resourceRequest.getValue()));
memorySet = true;
String memoryKey = getMemoryKey(taskType);
if (memoryKey != null && conf.get(memoryKey) != null) {
LOG.warn("Configuration " + resourceTypePrefix + resourceName +
"=" + resourceRequest.getValue() + resourceRequest.getUnits() +
" is overriding the " + memoryKey + "=" + conf.get(memoryKey) +
" configuration");
}
} else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(
resourceName)) {
this.resourceCapability.setVirtualCores(
(int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "",
resourceRequest.getValue()));
cpuVcoresSet = true;
String cpuKey = getCpuVcoresKey(taskType);
if (cpuKey != null && conf.get(cpuKey) != null) {
LOG.warn("Configuration " + resourceTypePrefix +
MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" +
resourceRequest.getValue() + resourceRequest.getUnits() +
" is overriding the " + cpuKey + "=" +
conf.get(cpuKey) + " configuration");
}
} else {
ResourceInformation resourceInformation =
this.resourceCapability.getResourceInformation(resourceName);
resourceInformation.setUnits(resourceRequest.getUnits());
resourceInformation.setValue(resourceRequest.getValue());
this.resourceCapability.setResourceInformation(resourceName,
resourceInformation);
}
}
}
if (!memorySet) {
this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType));
}
if (!cpuVcoresSet) {
this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
}
RESOURCE_REQUEST_CACHE.put(taskType, resourceCapability);
LOG.info("Resource capability of task type {} is set to {}",
taskType, resourceCapability);
}
private String getCpuVcoresKey(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.MAP_CPU_VCORES;
case REDUCE:
return MRJobConfig.REDUCE_CPU_VCORES;
default:
return null;
}
}
private String getMemoryKey(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.MAP_MEMORY_MB;
case REDUCE:
return MRJobConfig.REDUCE_MEMORY_MB;
default:
return null;
}
}
private Integer getCpuVcoreDefault(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.DEFAULT_MAP_CPU_VCORES;
case REDUCE:
return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES;
default:
return null;
}
}
private int getMemoryRequired(JobConf conf, TaskType taskType) {
return conf.getMemoryRequired(TypeConverter.fromYarn(taskType));
}
private int getCpuRequired(Configuration conf, TaskType taskType) {
int vcores = 1;
String cpuVcoreKey = getCpuVcoresKey(taskType);
if (cpuVcoreKey != null) {
Integer defaultCpuVcores = getCpuVcoreDefault(taskType);
if (null == defaultCpuVcores) {
defaultCpuVcores = vcores;
}
vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores);
}
return vcores;
}
private String getResourceTypePrefix(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX;
case REDUCE:
return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX;
default:
LOG.info("TaskType " + taskType +
" does not support custom resource types - this support can be " +
"added in " + getClass().getSimpleName());
return null;
}
}
/**
* Create a {@link LocalResource} record with all the given parameters.
* The NM that hosts AM container will upload resources to shared cache.
* Thus there is no need to ask task container's NM to upload the
* resources to shared cache. Set the shared cache upload policy to
* false.
*/
private static LocalResource createLocalResource(FileSystem fc, Path file,
String fileSymlink, LocalResourceType type,
LocalResourceVisibility visibility) throws IOException {
FileStatus fstat = fc.getFileStatus(file);
// We need to be careful when converting from path to URL to add a fragment
// so that the symlink name when localized will be correct.
Path qualifiedPath = fc.resolvePath(fstat.getPath());
URI uriWithFragment = null;
boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
try {
if (useFragment) {
uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
} else {
uriWithFragment = qualifiedPath.toUri();
}
} catch (URISyntaxException e) {
throw new IOException(
"Error parsing local resource path."
+ " Path was not able to be converted to a URI: " + qualifiedPath,
e);
}
URL resourceURL = URL.fromURI(uriWithFragment);
long resourceSize = fstat.getLen();
long resourceModificationTime = fstat.getModificationTime();
return LocalResource.newInstance(resourceURL, type, visibility,
resourceSize, resourceModificationTime, false);
}
/**
* Lock this on initialClasspath so that there is only one fork in the AM for
* getting the initial class-path. TODO: We already construct
* a parent CLC and use it for all the containers, so this should go away
* once the mr-generated-classpath stuff is gone.
*/
private static String getInitialClasspath(Configuration conf) throws IOException {
synchronized (classpathLock) {
if (initialClasspathFlag.get()) {
return initialClasspath;
}
Map<String, String> env = new HashMap<String, String>();
MRApps.setClasspath(env, conf);
initialClasspath = env.get(Environment.CLASSPATH.name());
initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
initialClasspathFlag.set(true);
return initialClasspath;
}
}
/**
* Create the common {@link ContainerLaunchContext} for all attempts.
* @param applicationACLs
*/
private static ContainerLaunchContext createCommonContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
Token<JobTokenIdentifier> jobToken,
final org.apache.hadoop.mapred.JobID oldJobId,
Credentials credentials) {
// Application resources
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
// Application environment
Map<String, String> environment;
// Service data
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
// Tokens
ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{});
try {
configureJobJar(conf, localResources);
configureJobConf(conf, localResources, oldJobId);
// Setup DistributedCache
MRApps.setupDistributedCache(conf, localResources);
taskCredentialsBuffer =
configureTokens(jobToken, credentials, serviceData);
addExternalShuffleProviders(conf, serviceData);
environment = configureEnv(conf);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
// Construct the actual Container
// The null fields are per-container and will be constructed for each
// container separately.
ContainerLaunchContext container =
ContainerLaunchContext.newInstance(localResources, environment, null,
serviceData, taskCredentialsBuffer, applicationACLs);
return container;
}
private static Map<String, String> configureEnv(Configuration conf)
throws IOException {
Map<String, String> environment = new HashMap<String, String>();
MRApps.addToEnvironment(environment, Environment.CLASSPATH.name(),
getInitialClasspath(conf), conf);
if (initialAppClasspath != null) {
MRApps.addToEnvironment(environment, Environment.APP_CLASSPATH.name(),
initialAppClasspath, conf);
}
// Shell
environment.put(Environment.SHELL.name(), conf
.get(MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
// Add pwd to LD_LIBRARY_PATH, add this before adding anything else
MRApps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
// Add the env variables passed by the admin
MRApps.setEnvFromInputProperty(environment,
MRJobConfig.MAPRED_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV, conf);
return environment;
}
private static void configureJobJar(Configuration conf,
Map<String, LocalResource> localResources) throws IOException {
// Set up JobJar to be localized properly on the remote NM.
String jobJar = conf.get(MRJobConfig.JAR);
if (jobJar != null) {
final Path jobJarPath = new Path(jobJar);
final FileSystem jobJarFs = FileSystem.get(jobJarPath.toUri(), conf);
Path remoteJobJar = jobJarPath.makeQualified(jobJarFs.getUri(),
jobJarFs.getWorkingDirectory());
LocalResourceVisibility jobJarViz =
conf.getBoolean(MRJobConfig.JOBJAR_VISIBILITY,
MRJobConfig.JOBJAR_VISIBILITY_DEFAULT)
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.APPLICATION;
// We hard code the job.jar localized symlink in the container directory.
// This is because the mapreduce app expects the job.jar to be named
// accordingly. Additionally we set the shared cache upload policy to
// false. Resources are uploaded by the AM if necessary.
LocalResource rc =
createLocalResource(jobJarFs, remoteJobJar, MRJobConfig.JOB_JAR,
LocalResourceType.PATTERN, jobJarViz);
String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
rc.setPattern(pattern);
localResources.put(MRJobConfig.JOB_JAR, rc);
LOG.info("The job-jar file on the remote FS is "
+ remoteJobJar.toUri().toASCIIString());
} else {
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
// mapreduce jar itself which is already on the classpath.
LOG.info("Job jar is not present. "
+ "Not adding any jar to the list of resources.");
}
}
private static void configureJobConf(Configuration conf,
Map<String, LocalResource> localResources,
final org.apache.hadoop.mapred.JobID oldJobId) throws IOException {
// Set up JobConf to be localized properly on the remote NM.
Path path = MRApps.getStagingAreaDir(conf,
UserGroupInformation.getCurrentUser().getShortUserName());
Path remoteJobSubmitDir = new Path(path, oldJobId.toString());
Path remoteJobConfPath =
new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
FileSystem remoteFS = FileSystem.get(conf);
// There is no point to ask task container's NM to upload the resource
// to shared cache (job conf is not shared). Therefore, createLocalResource
// will set the shared cache upload policy to false
localResources.put(MRJobConfig.JOB_CONF_FILE,
createLocalResource(remoteFS, remoteJobConfPath, null,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
LOG.info("The job-conf file on the remote FS is "
+ remoteJobConfPath.toUri().toASCIIString());
}
private static ByteBuffer configureTokens(Token<JobTokenIdentifier> jobToken,
Credentials credentials,
Map<String, ByteBuffer> serviceData) throws IOException {
// Setup up task credentials buffer
LOG.info("Adding #" + credentials.numberOfTokens() + " tokens and #"
+ credentials.numberOfSecretKeys()
+ " secret keys for NM use for launching container");
Credentials taskCredentials = new Credentials(credentials);
// LocalStorageToken is needed irrespective of whether security is enabled
// or not.
TokenCache.setJobToken(jobToken, taskCredentials);
DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
LOG.info(
"Size of containertokens_dob is " + taskCredentials.numberOfTokens());
taskCredentials.writeTokenStorageToStream(containerTokens_dob);
ByteBuffer taskCredentialsBuffer =
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
containerTokens_dob.getLength());
// Add shuffle secret key
// The secret key is converted to a JobToken to preserve backwards
// compatibility with an older ShuffleHandler running on an NM.
LOG.info("Putting shuffle token in serviceData");
byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials);
if (shuffleSecret == null) {
LOG.warn("Cannot locate shuffle secret in credentials."
+ " Using job token as shuffle secret.");
shuffleSecret = jobToken.getPassword();
}
Token<JobTokenIdentifier> shuffleToken =
new Token<JobTokenIdentifier>(jobToken.getIdentifier(), shuffleSecret,
jobToken.getKind(), jobToken.getService());
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeServiceData(shuffleToken));
return taskCredentialsBuffer;
}
private static void addExternalShuffleProviders(Configuration conf,
Map<String, ByteBuffer> serviceData) {
// add external shuffle-providers - if any
Collection<String> shuffleProviders = conf.getStringCollection(
MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES);
if (!shuffleProviders.isEmpty()) {
Collection<String> auxNames =
conf.getStringCollection(YarnConfiguration.NM_AUX_SERVICES);
for (final String shuffleProvider : shuffleProviders) {
if (shuffleProvider
.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) {
continue; // skip built-in shuffle-provider that was already inserted
// with shuffle secret key
}
if (auxNames.contains(shuffleProvider)) {
LOG.info("Adding ShuffleProvider Service: " + shuffleProvider
+ " to serviceData");
// This only serves for INIT_APP notifications
// The shuffle service needs to be able to work with the host:port
// information provided by the AM
// (i.e. shuffle services which require custom location / other
// configuration are not supported)
serviceData.put(shuffleProvider, ByteBuffer.allocate(0));
} else {
throw new YarnRuntimeException("ShuffleProvider Service: "
+ shuffleProvider
+ " was NOT found in the list of aux-services that are "
+ "available in this NM. You may need to specify this "
+ "ShuffleProvider as an aux-service in your yarn-site.xml");
}
}
}
}
static ContainerLaunchContext createContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,
final org.apache.hadoop.mapred.JobID oldJobId,
WrappedJvmID jvmID,
TaskAttemptListener taskAttemptListener,
Credentials credentials) {
synchronized (commonContainerSpecLock) {
if (commonContainerSpec == null) {
commonContainerSpec = createCommonContainerLaunchContext(
applicationACLs, conf, jobToken, oldJobId, credentials);
}
}
// Fill in the fields needed per-container that are missing in the common
// spec.
boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
// Setup environment by cloning from common env.
Map<String, String> env = commonContainerSpec.getEnvironment();
Map<String, String> myEnv = new HashMap<String, String>(env.size());
myEnv.putAll(env);
if (userClassesTakesPrecedence) {
myEnv.put(Environment.CLASSPATH_PREPEND_DISTCACHE.name(), "true");
}
MapReduceChildJVM.setVMEnv(myEnv, remoteTask);
// Set up the launch command
List<String> commands = MapReduceChildJVM.getVMCommand(
taskAttemptListener.getAddress(), remoteTask, jvmID);
// Duplicate the ByteBuffers for access by multiple containers.
Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
for (Entry<String, ByteBuffer> entry : commonContainerSpec
.getServiceData().entrySet()) {
myServiceData.put(entry.getKey(), entry.getValue().duplicate());
}
// Construct the actual Container
ContainerLaunchContext container = ContainerLaunchContext.newInstance(
commonContainerSpec.getLocalResources(), myEnv, commands,
myServiceData, commonContainerSpec.getTokens().duplicate(),
applicationACLs);
return container;
}
@Override
public ContainerId getAssignedContainerID() {
readLock.lock();
try {
return container == null ? null : container.getId();
} finally {
readLock.unlock();
}
}
@Override
public String getAssignedContainerMgrAddress() {
readLock.lock();
try {
return container == null ? null : StringInterner.weakIntern(container
.getNodeId().toString());
} finally {
readLock.unlock();
}
}
@Override
public long getLaunchTime() {
readLock.lock();
try {
return launchTime;
} finally {
readLock.unlock();
}
}
@Override
public long getFinishTime() {
readLock.lock();
try {
return finishTime;
} finally {
readLock.unlock();
}
}
@Override
public long getShuffleFinishTime() {
readLock.lock();
try {
return this.reportedStatus.shuffleFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public long getSortFinishTime() {
readLock.lock();
try {
return this.reportedStatus.sortFinishTime;
} finally {
readLock.unlock();
}
}
@Override
public int getShufflePort() {
readLock.lock();
try {
return shufflePort;
} finally {
readLock.unlock();
}
}
@Override
public NodeId getNodeId() {
readLock.lock();
try {
return container == null ? null : container.getNodeId();
} finally {
readLock.unlock();
}
}
/**If container Assigned then return the node's address, otherwise null.
*/
@Override
public String getNodeHttpAddress() {
readLock.lock();
try {
return container == null ? null : container.getNodeHttpAddress();
} finally {
readLock.unlock();
}
}
/**
* If container Assigned then return the node's rackname, otherwise null.
*/
@Override
public String getNodeRackName() {
this.readLock.lock();
try {
return this.nodeRackName;
} finally {
this.readLock.unlock();
}
}
protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
@Override
public TaskAttemptId getID() {
return attemptId;
}
@Override
public boolean isFinished() {
readLock.lock();
try {
// TODO: Use stateMachine level method?
return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED ||
getInternalState() == TaskAttemptStateInternal.FAILED ||
getInternalState() == TaskAttemptStateInternal.KILLED);
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptReport getReport() {
TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class);
readLock.lock();
try {
result.setTaskAttemptId(attemptId);
//take the LOCAL state of attempt
//DO NOT take from reportedStatus
result.setTaskAttemptState(getState());
result.setProgress(reportedStatus.progress);
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(TypeConverter.toYarn(getCounters()));
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.container != null) {
result.setNodeManagerPort(this.container.getNodeId().getPort());
}
return result;
} finally {
readLock.unlock();
}
}
@Override
public List<String> getDiagnostics() {
List<String> result = new ArrayList<String>();
readLock.lock();
try {
result.addAll(diagnostics);
return result;
} finally {
readLock.unlock();
}
}
@Override
public Counters getCounters() {
readLock.lock();
try {
Counters counters = reportedStatus.counters;
if (counters == null) {
counters = EMPTY_COUNTERS;
}
return counters;
} finally {
readLock.unlock();
}
}
@Override
public float getProgress() {
readLock.lock();
try {
return reportedStatus.progress;
} finally {
readLock.unlock();
}
}
@Override
public Phase getPhase() {
readLock.lock();
try {
return reportedStatus.phase;
} finally {
readLock.unlock();
}
}
@Override
public TaskAttemptState getState() {
readLock.lock();
try {
return getExternalState(stateMachine.getCurrentState());
} finally {
readLock.unlock();
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getTaskAttemptID() + " of type "
+ event.getType());
}
writeLock.lock();
try {
final TaskAttemptStateInternal oldState = getInternalState() ;
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state for "
+ this.attemptId, e);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
" on TaskAttempt " + this.attemptId));
eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
if (oldState != getInternalState()) {
if (getInternalState() == TaskAttemptStateInternal.FAILED) {
String nodeId = null == this.container ? "Not-assigned"
: this.container.getNodeId().toString();
LOG.info(attemptId + " transitioned from state " + oldState + " to "
+ getInternalState() + ", event type is " + event.getType()
+ " and nodeId=" + nodeId);
} else {
LOG.info(attemptId + " TaskAttempt Transitioned from " + oldState
+ " to " + getInternalState());
}
}
} finally {
writeLock.unlock();
}
}
@VisibleForTesting
public TaskAttemptStateInternal getInternalState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
public Locality getLocality() {
return locality;
}
public void setLocality(Locality locality) {
this.locality = locality;
}
public Avataar getAvataar()
{
return avataar;
}
public void setAvataar(Avataar avataar) {
this.avataar = avataar;
}
public void setTaskFailFast(boolean failFast) {
this.failFast = failFast;
}
public boolean isTaskFailFast() {
return failFast;
}
@SuppressWarnings("unchecked")
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
OutputCommitter committer, boolean recoverOutput) {
ContainerId containerId = taInfo.getContainerId();
NodeId containerNodeId = NodeId.fromString(
taInfo.getHostname() + ":" + taInfo.getPort());
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ taInfo.getHttpPort());
// Resource/Priority/Tokens are only needed while launching the container on
// an NM, these are already completed tasks, so setting them to null
container =
Container.newInstance(containerId, containerNodeId,
nodeHttpAddress, null, null, null);
computeRackAndLocality();
launchTime = taInfo.getStartTime();
finishTime = (taInfo.getFinishTime() != -1) ?
taInfo.getFinishTime() : clock.getTime();
shufflePort = taInfo.getShufflePort();
trackerName = taInfo.getHostname();
httpPort = taInfo.getHttpPort();
sendLaunchedEvents();
reportedStatus.id = attemptId;
reportedStatus.progress = 1.0f;
reportedStatus.counters = taInfo.getCounters();
reportedStatus.stateString = taInfo.getState();
reportedStatus.phase = Phase.CLEANUP;
reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
addDiagnosticInfo(taInfo.getError());
boolean needToClean = false;
String recoveredState = taInfo.getTaskStatus();
if (recoverOutput
&& TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptId));
try {
committer.recoverTask(tac);
LOG.info("Recovered output from task attempt " + attemptId);
} catch (Exception e) {
LOG.error("Unable to recover task attempt " + attemptId, e);
LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
recoveredState = TaskAttemptState.KILLED.toString();
needToClean = true;
}
}
TaskAttemptStateInternal attemptState;
if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
attemptState = TaskAttemptStateInternal.SUCCEEDED;
reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
logAttemptFinishedEvent(attemptState);
} else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
attemptState = TaskAttemptStateInternal.FAILED;
reportedStatus.taskState = TaskAttemptState.FAILED;
eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(this,
TaskAttemptStateInternal.FAILED);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
} else {
if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
if (String.valueOf(recoveredState).isEmpty()) {
LOG.info("TaskAttempt" + attemptId
+ " had not completed, recovering as KILLED");
} else {
LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
+ recoveredState + ", recovering as KILLED");
}
addDiagnosticInfo("Killed during application recovery");
needToClean = true;
}
attemptState = TaskAttemptStateInternal.KILLED;
reportedStatus.taskState = TaskAttemptState.KILLED;
eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(this,
TaskAttemptStateInternal.KILLED);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
}
if (needToClean) {
TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptId));
try {
committer.abortTask(tac);
} catch (Exception e) {
LOG.warn("Task cleanup failed for attempt " + attemptId, e);
}
}
return attemptState;
}
protected static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
case ASSIGNED:
case UNASSIGNED:
return TaskAttemptState.STARTING;
case COMMIT_PENDING:
return TaskAttemptState.COMMIT_PENDING;
case FAIL_CONTAINER_CLEANUP:
case FAIL_TASK_CLEANUP:
case FAIL_FINISHING_CONTAINER:
case FAILED:
return TaskAttemptState.FAILED;
case KILL_CONTAINER_CLEANUP:
case KILL_TASK_CLEANUP:
case KILLED:
return TaskAttemptState.KILLED;
case RUNNING:
return TaskAttemptState.RUNNING;
case NEW:
return TaskAttemptState.NEW;
case SUCCESS_CONTAINER_CLEANUP:
case SUCCESS_FINISHING_CONTAINER:
case SUCCEEDED:
return TaskAttemptState.SUCCEEDED;
default:
throw new YarnRuntimeException("Attempt to convert invalid "
+ "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ smState);
}
}
// check whether the attempt is assigned if container is not null
boolean isContainerAssigned() {
return container != null;
}
//always called in write lock
private boolean getRescheduleNextAttempt() {
return rescheduleNextAttempt;
}
//always called in write lock
private void setRescheduleNextAttempt(boolean reschedule) {
rescheduleNextAttempt = reschedule;
}
//always called in write lock
private void setFinishTime() {
//set the finish time only if launch time is set
if (launchTime != 0) {
finishTime = clock.getTime();
}
}
private void computeRackAndLocality() {
NodeId containerNodeId = container.getNodeId();
nodeRackName = RackResolver.resolve(
containerNodeId.getHost()).getNetworkLocation();
locality = Locality.OFF_SWITCH;
if (dataLocalHosts.size() > 0) {
String cHost = resolveHost(containerNodeId.getHost());
if (dataLocalHosts.contains(cHost)) {
locality = Locality.NODE_LOCAL;
}
}
if (locality == Locality.OFF_SWITCH) {
if (dataLocalRacks.contains(nodeRackName)) {
locality = Locality.RACK_LOCAL;
}
}
}
private static void updateMillisCounters(JobCounterUpdateEvent jce,
TaskAttemptImpl taskAttempt) {
// if container/resource if not allocated, do not update
if (null == taskAttempt.container ||
null == taskAttempt.container.getResource()) {
return;
}
long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
Resource allocatedResource = taskAttempt.container.getResource();
int mbAllocated = (int) allocatedResource.getMemorySize();
int vcoresAllocated = allocatedResource.getVirtualCores();
int minSlotMemSize = taskAttempt.conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int simSlotsAllocated = minSlotMemSize == 0 ? 0 :
(int) Math.ceil((float) mbAllocated / minSlotMemSize);
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS,
simSlotsAllocated * duration);
jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbAllocated);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS,
duration * vcoresAllocated);
jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration);
} else {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES,
simSlotsAllocated * duration);
jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES,
duration * mbAllocated);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES,
duration * vcoresAllocated);
jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration);
}
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) {
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
updateMillisCounters(jce, taskAttempt);
return jce;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
} else {
jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
}
if (!taskAlreadyCompleted) {
updateMillisCounters(jce, taskAttempt);
}
return jce;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
} else {
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
}
if (!taskAlreadyCompleted) {
updateMillisCounters(jce, taskAttempt);
}
return jce;
}
private static
TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
TaskAttemptStateInternal attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
taskAttempt.container == null ? "UNKNOWN"
: taskAttempt.container.getNodeId().getHost(),
taskAttempt.container == null ? -1
: taskAttempt.container.getNodeId().getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getCounters(), taskAttempt
.getProgressSplitBlock().burst(), taskAttempt.launchTime);
return tauce;
}
private static void
sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) {
if (null == taskAttempt.container) {
return;
}
taskAttempt.launchTime = taskAttempt.clock.getTime();
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
taskAttempt.sendLaunchedEvents();
}
@SuppressWarnings("unchecked")
private void sendLaunchedEvents() {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
.getJobId());
jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + attemptId
+ "] using containerId: [" + container.getId() + " on NM: ["
+ StringInterner.weakIntern(container.getNodeId().toString()) + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
launchTime, trackerName, httpPort, shufflePort, container.getId(),
locality.toString(), avataar.toString());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
}
private WrappedProgressSplitsBlock getProgressSplitBlock() {
readLock.lock();
try {
if (progressSplitBlock == null) {
progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
}
return progressSplitBlock;
} finally {
readLock.unlock();
}
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
private static void finalizeProgress(TaskAttemptImpl taskAttempt) {
// unregister it to TaskAttemptListener so that it stops listening
taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID);
taskAttempt.reportedStatus.progress = 1.0f;
taskAttempt.updateProgressSplits();
}
static class RequestContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final boolean rescheduled;
public RequestContainerTransition(boolean rescheduled) {
this.rescheduled = rescheduled;
}
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// Tell any speculator that we're requesting a container
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
//request for container
if (rescheduled) {
taskAttempt.eventHandler.handle(
ContainerRequestEvent.createContainerRequestEventForFailedContainer(
taskAttempt.attemptId,
taskAttempt.resourceCapability));
} else {
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
taskAttempt.attemptId, taskAttempt.resourceCapability,
taskAttempt.dataLocalHosts.toArray(
new String[taskAttempt.dataLocalHosts.size()]),
taskAttempt.dataLocalRacks.toArray(
new String[taskAttempt.dataLocalRacks.size()])));
}
}
}
protected Set<String> resolveHosts(String[] src) {
Set<String> result = new HashSet<String>();
if (src != null) {
for (int i = 0; i < src.length; i++) {
if (src[i] == null) {
continue;
} else if (isIP(src[i])) {
result.add(resolveHost(src[i]));
} else {
result.add(src[i]);
}
}
}
return result;
}
protected String resolveHost(String src) {
String result = src; // Fallback in case of failure.
try {
InetAddress addr = InetAddress.getByName(src);
result = addr.getHostName();
} catch (UnknownHostException e) {
LOG.warn("Failed to resolve address: " + src
+ ". Continuing to use the same.");
}
return result;
}
private static final Pattern ipPattern = // Pattern for matching ip
Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}");
protected boolean isIP(String src) {
return ipPattern.matcher(src).matches();
}
private static class ContainerAssignedTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
TaskAttemptStateInternal> {
@SuppressWarnings({ "unchecked" })
@Override
public TaskAttemptStateInternal transition(
final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
final TaskAttemptContainerAssignedEvent cEvent =
(TaskAttemptContainerAssignedEvent) event;
Container container = cEvent.getContainer();
taskAttempt.container = container;
// this is a _real_ Task (classic Hadoop mapred flavor):
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
taskAttempt.jvmID =
new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
taskAttempt.remoteTask.isMapTask(),
taskAttempt.container.getId().getContainerId());
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);
taskAttempt.computeRackAndLocality();
if (cEvent.getShufflePort() == -1) {
// launch the container
// create the container object to be launched for a given Task attempt
ContainerLaunchContext launchContext = createContainerLaunchContext(
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
taskAttempt.taskAttemptListener, taskAttempt.credentials);
taskAttempt.eventHandler
.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
launchContext, container, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler
.handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
return TaskAttemptStateInternal.ASSIGNED;
} else {
taskAttempt.onContainerLaunch(cEvent.getShufflePort());
return TaskAttemptStateInternal.RUNNING;
}
}
}
private static class DeallocateContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final TaskAttemptStateInternal finalState;
private final boolean withdrawsContainerRequest;
DeallocateContainerTransition
(TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) {
this.finalState = finalState;
this.withdrawsContainerRequest = withdrawsContainerRequest;
}
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
//set the finish time
taskAttempt.setFinishTime();
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
//send the deallocate event to ContainerAllocator
taskAttempt.eventHandler.handle(
new ContainerAllocatorEvent(taskAttempt.attemptId,
ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
// send event to speculator that we withdraw our container needs, if
// we're transitioning out of UNASSIGNED
if (withdrawsContainerRequest) {
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
}
switch(finalState) {
case FAILED:
boolean fastFail = false;
if (event instanceof TaskAttemptFailEvent) {
fastFail = ((TaskAttemptFailEvent) event).isFastFail();
}
taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
taskAttempt.attemptId, fastFail));
break;
case KILLED:
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, false));
break;
default:
LOG.error("Task final state is not FAILED or KILLED: " + finalState);
}
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState);
if(finalState == TaskAttemptStateInternal.FAILED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
} else if(finalState == TaskAttemptStateInternal.KILLED) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
}
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}
}
private static class LaunchedContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent evnt) {
TaskAttemptContainerLaunchedEvent event =
(TaskAttemptContainerLaunchedEvent) evnt;
taskAttempt.onContainerLaunch(event.getShufflePort());
}
}
@SuppressWarnings("unchecked")
private void onContainerLaunch(int shufflePortParam) {
// set the launch time
launchTime = clock.getTime();
this.shufflePort = shufflePortParam;
// register it to TaskAttemptListener so that it can start monitoring it.
taskAttemptListener.registerLaunchedTask(attemptId, jvmID);
// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(container.getNodeHttpAddress());
trackerName = nodeHttpInetAddr.getHostName();
httpPort = nodeHttpInetAddr.getPort();
sendLaunchedEvents();
eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime()));
// make remoteTask reference as null as it is no more needed
// and free up the memory
remoteTask = null;
// tell the Task that attempt has started
eventHandler.handle(
new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED));
}
private static class CommitPendingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
}
}
private static class TaskCleanupTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));
taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent(
taskAttempt.attemptId, taskContext));
}
}
/**
* Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
* state upon receiving TA_CONTAINER_COMPLETED event
*/
private static class ExitFinishingOnContainerCompletedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
sendContainerCompleted(taskAttempt);
}
}
private static class ExitFinishingOnContainerCleanedupTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
}
}
private static class FailedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// set the finish time
taskAttempt.setFinishTime();
notifyTaskAttemptFailed(taskAttempt, taskAttempt.isTaskFailFast());
}
}
private static class FinalizeFailedTransition extends FailedTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
finalizeProgress(taskAttempt);
sendContainerCompleted(taskAttempt);
super.transition(taskAttempt, event);
}
}
@SuppressWarnings("unchecked")
private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) {
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_COMPLETED));
}
private static class RecoverTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
return taskAttempt.recover(tare.getTaskAttemptInfo(),
tare.getCommitter(), tare.getRecoverOutput());
}
}
@SuppressWarnings({ "unchecked" })
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
if (getLaunchTime() == 0) return;
String containerHostName = this.container == null ? "UNKNOWN"
: this.container.getNodeId().getHost();
int containerNodePort =
this.container == null ? -1 : this.container.getNodeId().getPort();
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime,
containerHostName,
containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst(), launchTime);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
} else {
ReduceAttemptFinishedEvent rfe =
new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
state.toString(),
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime,
containerHostName,
containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
getProgressSplitBlock().burst(), launchTime);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
}
}
private static class TooManyFetchFailureTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
if (taskAttempt.getInternalState() ==
TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER) {
sendContainerCleanup(taskAttempt, event);
}
TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
(TaskAttemptTooManyFetchFailureEvent) event;
// too many fetch failure can only happen for map tasks
Preconditions
.checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
//add to diagnostic
taskAttempt.addDiagnosticInfo("Too many fetch failures."
+ " Failing the attempt. Last failure reported by " +
fetchFailureEvent.getReduceId() +
" from host " + fetchFailureEvent.getReduceHost());
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
taskAttempt.attemptId));
}
}
private static class KilledAfterSuccessTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@SuppressWarnings("unchecked")
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
// after a reduce task has succeeded, its outputs are in safe in HDFS.
// logically such a task should not be killed. we only come here when
// there is a race condition in the event queue. E.g. some logic sends
// a kill request to this attempt when the successful completion event
// for this task is already in the event queue. so the kill event will
// get executed immediately after the attempt is marked successful and
// result in this transition being exercised.
// ignore this for reduce tasks
LOG.info("Ignoring killed event for successful reduce task attempt" +
taskAttempt.getID().toString());
return TaskAttemptStateInternal.SUCCEEDED;
}
if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP
&& taskAttempt.conf.getNumReduceTasks() == 0) {
// same reason as above for map only job after map task has succeeded.
// ignore this for map only tasks
LOG.info("Ignoring killed event for successful map only task attempt" +
taskAttempt.getID().toString());
return TaskAttemptStateInternal.SUCCEEDED;
}
if(event instanceof TaskAttemptKillEvent) {
TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
//add to diagnostic
taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
}
// not setting a finish time since it was set on success
assert (taskAttempt.getFinishTime() != 0);
assert (taskAttempt.getLaunchTime() != 0);
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
taskAttempt, TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
.getTaskId().getJobId(), tauce));
boolean rescheduleNextTaskAttempt = false;
if (event instanceof TaskAttemptKillEvent) {
rescheduleNextTaskAttempt =
((TaskAttemptKillEvent)event).getRescheduleAttempt();
}
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, rescheduleNextTaskAttempt));
return TaskAttemptStateInternal.KILLED;
}
}
private static class KilledAfterSucceededFinishingTransition
implements MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
TaskAttemptStateInternal> {
@SuppressWarnings("unchecked")
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
sendContainerCleanup(taskAttempt, event);
if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
// after a reduce task has succeeded, its outputs are in safe in HDFS.
// logically such a task should not be killed. we only come here when
// there is a race condition in the event queue. E.g. some logic sends
// a kill request to this attempt when the successful completion event
// for this task is already in the event queue. so the kill event will
// get executed immediately after the attempt is marked successful and
// result in this transition being exercised.
// ignore this for reduce tasks
LOG.info("Ignoring killed event for successful reduce task attempt" +
taskAttempt.getID().toString());
return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
} else if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP
&& taskAttempt.conf.getNumReduceTasks() == 0) {
// same reason as above for map only job after map task has succeeded.
// ignore this for map only tasks
LOG.info("Ignoring killed event for successful map only task attempt" +
taskAttempt.getID().toString());
return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
} else {
// Store reschedule flag so that after clean up is completed, new
// attempt is scheduled/rescheduled based on it.
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.setRescheduleNextAttempt(
((TaskAttemptKillEvent)event).getRescheduleAttempt());
}
return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
}
}
}
private static class KilledTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
//set the finish time
taskAttempt.setFinishTime();
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt()));
}
}
private static class PreemptedTransition implements
SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.setFinishTime();
taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID);
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP, false));
taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent(
taskAttempt.attemptId, false));
}
}
/**
* Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
* state upon receiving TA_TIMED_OUT event
*/
private static class ExitFinishingOnTimeoutTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
taskAttempt.attemptId);
// The attempt stays in finishing state for too long
String msg = "Task attempt " + taskAttempt.getID() + " is done from " +
"TaskUmbilicalProtocol's point of view. However, it stays in " +
"finishing state for too long";
LOG.warn(msg);
taskAttempt.addDiagnosticInfo(msg);
sendContainerCleanup(taskAttempt, event);
}
}
/**
* Finish and clean up the container
*/
private static class CleanupContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// unregister it to TaskAttemptListener so that it stops listening
// for it.
finalizeProgress(taskAttempt);
sendContainerCleanup(taskAttempt, event);
// Store reschedule flag so that after clean up is completed, new
// attempt is scheduled/rescheduled based on it.
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.setRescheduleNextAttempt(
((TaskAttemptKillEvent)event).getRescheduleAttempt());
} else if (event instanceof TaskAttemptFailEvent) {
taskAttempt.setTaskFailFast(((TaskAttemptFailEvent)event).isFastFail());
}
}
}
@SuppressWarnings("unchecked")
private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
//send the cleanup event to containerLauncher
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP,
event.getType() == TaskAttemptEventType.TA_TIMED_OUT));
}
/**
* Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event
*/
private static class MoveContainerToSucceededFinishingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
finalizeProgress(taskAttempt);
// register it to finishing state
taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
taskAttempt.attemptId);
// set the finish time
taskAttempt.setFinishTime();
// notify job history
taskAttempt.eventHandler.handle(
createJobCounterUpdateEventTASucceeded(taskAttempt));
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
//notify the task even though the container might not have exited yet.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
}
}
/**
* Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event
*/
private static class MoveContainerToFailedFinishingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
finalizeProgress(taskAttempt);
// register it to finishing state
taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
taskAttempt.attemptId);
notifyTaskAttemptFailed(taskAttempt, false);
}
}
@SuppressWarnings("unchecked")
private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt,
boolean fastFail) {
if (taskAttempt.getLaunchTime() == 0) {
sendJHStartEventForAssignedFailTask(taskAttempt);
}
// set the finish time
taskAttempt.setFinishTime();
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(
taskAttempt.attemptId, fastFail));
}
private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) {
diagnostics.add(diag);
}
}
private static class StatusUpdater
implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptStatusUpdateEvent statusEvent =
((TaskAttemptStatusUpdateEvent)event);
AtomicReference<TaskAttemptStatus> taskAttemptStatusRef =
statusEvent.getTaskAttemptStatusRef();
TaskAttemptStatus newReportedStatus =
taskAttemptStatusRef.getAndSet(null);
// Now switch the information in the reportedStatus
taskAttempt.reportedStatus = newReportedStatus;
taskAttempt.reportedStatus.taskState = taskAttempt.getState();
// send event to speculator about the reported status
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
taskAttempt.updateProgressSplits();
//if fetch failures are present, send the fetch failure event to job
//this only will happen in reduce attempt type
if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
String hostname = taskAttempt.container == null ? "UNKNOWN"
: taskAttempt.container.getNodeId().getHost();
taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps,
hostname));
}
}
}
private static class DiagnosticInformationUpdater
implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptDiagnosticsUpdateEvent diagEvent =
(TaskAttemptDiagnosticsUpdateEvent) event;
LOG.info("Diagnostics report from " + taskAttempt.attemptId + ": "
+ diagEvent.getDiagnosticInfo());
taskAttempt.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
}
}
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
result.phase = Phase.STARTING;
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Counters counters = EMPTY_COUNTERS;
result.counters = counters;
}
}