| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.app.job.impl; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobContext; |
| import org.apache.hadoop.mapred.MapReduceChildJVM; |
| import org.apache.hadoop.mapred.ShuffleHandler; |
| import org.apache.hadoop.mapred.Task; |
| import org.apache.hadoop.mapred.TaskAttemptContextImpl; |
| import org.apache.hadoop.mapred.WrappedJvmID; |
| import org.apache.hadoop.mapred.WrappedProgressSplitsBlock; |
| import org.apache.hadoop.mapreduce.Counter; |
| import org.apache.hadoop.mapreduce.Counters; |
| import org.apache.hadoop.mapreduce.JobCounter; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskCounter; |
| import org.apache.hadoop.mapreduce.TypeConverter; |
| import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; |
| import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; |
| import org.apache.hadoop.mapreduce.security.TokenCache; |
| import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; |
| import org.apache.hadoop.mapreduce.v2.api.records.Phase; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskId; |
| import org.apache.hadoop.mapreduce.v2.api.records.TaskType; |
| import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; |
| import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; |
| import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; |
| import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; |
| import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; |
| import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; |
| import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; |
| import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; |
| import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; |
| import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; |
| import org.apache.hadoop.mapreduce.v2.util.MRApps; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.Clock; |
| import org.apache.hadoop.yarn.YarnException; |
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; |
| import org.apache.hadoop.yarn.api.records.ContainerToken; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.LocalResourceType; |
| import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.URL; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitonException; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Apps; |
| import org.apache.hadoop.yarn.util.BuilderUtils; |
| import org.apache.hadoop.yarn.util.ConverterUtils; |
| import org.apache.hadoop.yarn.util.RackResolver; |
| |
| /** |
| * Implementation of TaskAttempt interface. |
| */ |
| @SuppressWarnings({ "rawtypes" }) |
| public abstract class TaskAttemptImpl implements |
| org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, |
| EventHandler<TaskAttemptEvent> { |
| |
| static final Counters EMPTY_COUNTERS = new Counters(); |
| private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class); |
| private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable? |
| private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| |
| protected final JobConf conf; |
| protected final Path jobFile; |
| protected final int partition; |
| protected EventHandler eventHandler; |
| private final TaskAttemptId attemptId; |
| private final Clock clock; |
| private final org.apache.hadoop.mapred.JobID oldJobId; |
| private final TaskAttemptListener taskAttemptListener; |
| private final OutputCommitter committer; |
| private final Resource resourceCapability; |
| private final String[] dataLocalHosts; |
| private final List<String> diagnostics = new ArrayList<String>(); |
| private final Lock readLock; |
| private final Lock writeLock; |
| private final AppContext appContext; |
| private Credentials credentials; |
| private Token<JobTokenIdentifier> jobToken; |
| private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); |
| private static String initialClasspath = null; |
| private static Object commonContainerSpecLock = new Object(); |
| private static ContainerLaunchContext commonContainerSpec = null; |
| private static final Object classpathLock = new Object(); |
| private long launchTime; |
| private long finishTime; |
| private WrappedProgressSplitsBlock progressSplitBlock; |
| private int shufflePort = -1; |
| private String trackerName; |
| private int httpPort; |
| |
| private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = |
| new CleanupContainerTransition(); |
| |
| private static final DiagnosticInformationUpdater |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION |
| = new DiagnosticInformationUpdater(); |
| |
| private static final StateMachineFactory |
| <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> |
| stateMachineFactory |
| = new StateMachineFactory |
| <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> |
| (TaskAttemptState.NEW) |
| |
| // Transitions from the NEW state. |
| .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED, |
| TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false)) |
| .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED, |
| TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true)) |
| .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED, |
| TaskAttemptEventType.TA_KILL, new KilledTransition()) |
| .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED, |
| TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) |
| |
| // Transitions from the UNASSIGNED state. |
| .addTransition(TaskAttemptState.UNASSIGNED, |
| TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, |
| new ContainerAssignedTransition()) |
| .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED, |
| TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( |
| TaskAttemptState.KILLED, true)) |
| .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED, |
| TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( |
| TaskAttemptState.FAILED, true)) |
| |
| // Transitions from the ASSIGNED state. |
| .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| new LaunchedContainerTransition()) |
| .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, |
| new DeallocateContainerTransition(TaskAttemptState.FAILED, false)) |
| .addTransition(TaskAttemptState.ASSIGNED, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| CLEANUP_CONTAINER_TRANSITION) |
| // ^ If RM kills the container due to expiry, preemption etc. |
| .addTransition(TaskAttemptState.ASSIGNED, |
| TaskAttemptState.KILL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) |
| .addTransition(TaskAttemptState.ASSIGNED, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) |
| |
| // Transitions from RUNNING state. |
| .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, |
| TaskAttemptEventType.TA_UPDATE, new StatusUpdater()) |
| .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // If no commit is required, task directly goes to success |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) |
| // If commit is required, task goes through commit pending state. |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) |
| // Failure handling while RUNNING |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) |
| //for handling container exit without sending the done or fail msg |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| CLEANUP_CONTAINER_TRANSITION) |
| // Timeout handling while RUNNING |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) |
| // if container killed by AM shutting down |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.KILLED, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) |
| // Kill handling |
| .addTransition(TaskAttemptState.RUNNING, |
| TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, |
| CLEANUP_CONTAINER_TRANSITION) |
| |
| // Transitions from COMMIT_PENDING state |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, |
| new StatusUpdater()) |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, |
| CLEANUP_CONTAINER_TRANSITION) |
| // if container killed by AM shutting down |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.KILLED, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| CLEANUP_CONTAINER_TRANSITION) |
| .addTransition(TaskAttemptState.COMMIT_PENDING, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) |
| |
| // Transitions from SUCCESS_CONTAINER_CLEANUP state |
| // kill and cleanup the container |
| .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, |
| new SucceededTransition()) |
| .addTransition( |
| TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events |
| .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_TIMED_OUT, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED)) |
| |
| // Transitions from FAIL_CONTAINER_CLEANUP state. |
| .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptState.FAIL_TASK_CLEANUP, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) |
| .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events |
| .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| TaskAttemptState.FAIL_CONTAINER_CLEANUP, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| TaskAttemptEventType.TA_UPDATE, |
| TaskAttemptEventType.TA_COMMIT_PENDING, |
| // Container launch events can arrive late |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, |
| TaskAttemptEventType.TA_DONE, |
| TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_TIMED_OUT)) |
| |
| // Transitions from KILL_CONTAINER_CLEANUP |
| .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP, |
| TaskAttemptState.KILL_TASK_CLEANUP, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) |
| .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP, |
| TaskAttemptState.KILL_CONTAINER_CLEANUP, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events |
| .addTransition( |
| TaskAttemptState.KILL_CONTAINER_CLEANUP, |
| TaskAttemptState.KILL_CONTAINER_CLEANUP, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| TaskAttemptEventType.TA_UPDATE, |
| TaskAttemptEventType.TA_COMMIT_PENDING, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, |
| TaskAttemptEventType.TA_DONE, |
| TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_TIMED_OUT)) |
| |
| // Transitions from FAIL_TASK_CLEANUP |
| // run the task cleanup |
| .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, |
| TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE, |
| new FailedTransition()) |
| .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, |
| TaskAttemptState.FAIL_TASK_CLEANUP, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events |
| .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, |
| TaskAttemptState.FAIL_TASK_CLEANUP, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| TaskAttemptEventType.TA_UPDATE, |
| TaskAttemptEventType.TA_COMMIT_PENDING, |
| TaskAttemptEventType.TA_DONE, |
| TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, |
| // Container launch events can arrive late |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) |
| |
| // Transitions from KILL_TASK_CLEANUP |
| .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, |
| TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE, |
| new KilledTransition()) |
| .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, |
| TaskAttemptState.KILL_TASK_CLEANUP, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events |
| .addTransition(TaskAttemptState.KILL_TASK_CLEANUP, |
| TaskAttemptState.KILL_TASK_CLEANUP, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| TaskAttemptEventType.TA_UPDATE, |
| TaskAttemptEventType.TA_COMMIT_PENDING, |
| TaskAttemptEventType.TA_DONE, |
| TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, |
| // Container launch events can arrive late |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) |
| |
| // Transitions from SUCCEEDED |
| .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts |
| TaskAttemptState.FAILED, |
| TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE, |
| new TooManyFetchFailureTransition()) |
| .addTransition( |
| TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED, |
| TaskAttemptEventType.TA_KILL, |
| new KilledAfterSuccessTransition()) |
| .addTransition( |
| TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events for SUCCEEDED state |
| .addTransition(TaskAttemptState.SUCCEEDED, |
| TaskAttemptState.SUCCEEDED, |
| EnumSet.of(TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED)) |
| |
| // Transitions from FAILED state |
| .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events for FAILED state |
| .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_ASSIGNED, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| TaskAttemptEventType.TA_UPDATE, |
| // Container launch events can arrive late |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, |
| TaskAttemptEventType.TA_COMMIT_PENDING, |
| TaskAttemptEventType.TA_DONE, |
| TaskAttemptEventType.TA_FAILMSG, |
| TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)) |
| |
| // Transitions from KILLED state |
| .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, |
| TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, |
| DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) |
| // Ignore-able events for KILLED state |
| .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, |
| EnumSet.of(TaskAttemptEventType.TA_KILL, |
| TaskAttemptEventType.TA_ASSIGNED, |
| TaskAttemptEventType.TA_CONTAINER_COMPLETED, |
| TaskAttemptEventType.TA_UPDATE, |
| // Container launch events can arrive late |
| TaskAttemptEventType.TA_CONTAINER_LAUNCHED, |
| TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, |
| TaskAttemptEventType.TA_CONTAINER_CLEANED, |
| TaskAttemptEventType.TA_COMMIT_PENDING, |
| TaskAttemptEventType.TA_DONE, |
| TaskAttemptEventType.TA_FAILMSG)) |
| |
| // create the topology tables |
| .installTopology(); |
| |
| private final StateMachine |
| <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> |
| stateMachine; |
| |
| private ContainerId containerID; |
| private NodeId containerNodeId; |
| private String containerMgrAddress; |
| private String nodeHttpAddress; |
| private String nodeRackName; |
| private WrappedJvmID jvmID; |
| private ContainerToken containerToken; |
| private Resource assignedCapability; |
| |
| //this takes good amount of memory ~ 30KB. Instantiate it lazily |
| //and make it null once task is launched. |
| private org.apache.hadoop.mapred.Task remoteTask; |
| |
| //this is the last status reported by the REMOTE running attempt |
| private TaskAttemptStatus reportedStatus; |
| |
| private static final String LINE_SEPARATOR = System |
| .getProperty("line.separator"); |
| |
| public TaskAttemptImpl(TaskId taskId, int i, |
| EventHandler eventHandler, |
| TaskAttemptListener taskAttemptListener, Path jobFile, int partition, |
| JobConf conf, String[] dataLocalHosts, OutputCommitter committer, |
| Token<JobTokenIdentifier> jobToken, |
| Credentials credentials, Clock clock, |
| AppContext appContext) { |
| oldJobId = TypeConverter.fromYarn(taskId.getJobId()); |
| this.conf = conf; |
| this.clock = clock; |
| attemptId = recordFactory.newRecordInstance(TaskAttemptId.class); |
| attemptId.setTaskId(taskId); |
| attemptId.setId(i); |
| this.taskAttemptListener = taskAttemptListener; |
| this.appContext = appContext; |
| |
| // Initialize reportedStatus |
| reportedStatus = new TaskAttemptStatus(); |
| initTaskAttemptStatus(reportedStatus); |
| |
| ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); |
| readLock = readWriteLock.readLock(); |
| writeLock = readWriteLock.writeLock(); |
| |
| this.credentials = credentials; |
| this.jobToken = jobToken; |
| this.eventHandler = eventHandler; |
| this.committer = committer; |
| this.jobFile = jobFile; |
| this.partition = partition; |
| |
| //TODO:create the resource reqt for this Task attempt |
| this.resourceCapability = recordFactory.newRecordInstance(Resource.class); |
| this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType())); |
| this.dataLocalHosts = dataLocalHosts; |
| RackResolver.init(conf); |
| |
| // This "this leak" is okay because the retained pointer is in an |
| // instance variable. |
| stateMachine = stateMachineFactory.make(this); |
| } |
| |
| private int getMemoryRequired(Configuration conf, TaskType taskType) { |
| int memory = 1024; |
| if (taskType == TaskType.MAP) { |
| memory = |
| conf.getInt(MRJobConfig.MAP_MEMORY_MB, |
| MRJobConfig.DEFAULT_MAP_MEMORY_MB); |
| } else if (taskType == TaskType.REDUCE) { |
| memory = |
| conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, |
| MRJobConfig.DEFAULT_REDUCE_MEMORY_MB); |
| } |
| |
| return memory; |
| } |
| |
| /** |
| * Create a {@link LocalResource} record with all the given parameters. |
| */ |
| private static LocalResource createLocalResource(FileSystem fc, Path file, |
| LocalResourceType type, LocalResourceVisibility visibility) |
| throws IOException { |
| FileStatus fstat = fc.getFileStatus(file); |
| URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat |
| .getPath())); |
| long resourceSize = fstat.getLen(); |
| long resourceModificationTime = fstat.getModificationTime(); |
| |
| return BuilderUtils.newLocalResource(resourceURL, type, visibility, |
| resourceSize, resourceModificationTime); |
| } |
| |
| /** |
| * Lock this on initialClasspath so that there is only one fork in the AM for |
| * getting the initial class-path. TODO: We already construct |
| * a parent CLC and use it for all the containers, so this should go away |
| * once the mr-generated-classpath stuff is gone. |
| */ |
| private static String getInitialClasspath(Configuration conf) throws IOException { |
| synchronized (classpathLock) { |
| if (initialClasspathFlag.get()) { |
| return initialClasspath; |
| } |
| Map<String, String> env = new HashMap<String, String>(); |
| MRApps.setClasspath(env, conf); |
| initialClasspath = env.get(Environment.CLASSPATH.name()); |
| initialClasspathFlag.set(true); |
| return initialClasspath; |
| } |
| } |
| |
| |
| /** |
| * Create the common {@link ContainerLaunchContext} for all attempts. |
| * @param applicationACLs |
| */ |
| private static ContainerLaunchContext createCommonContainerLaunchContext( |
| Map<ApplicationAccessType, String> applicationACLs, Configuration conf, |
| Token<JobTokenIdentifier> jobToken, |
| final org.apache.hadoop.mapred.JobID oldJobId, |
| Credentials credentials) { |
| |
| // Application resources |
| Map<String, LocalResource> localResources = |
| new HashMap<String, LocalResource>(); |
| |
| // Application environment |
| Map<String, String> environment = new HashMap<String, String>(); |
| |
| // Service data |
| Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>(); |
| |
| // Tokens |
| ByteBuffer taskCredentialsBuffer = ByteBuffer.wrap(new byte[]{}); |
| try { |
| FileSystem remoteFS = FileSystem.get(conf); |
| |
| // //////////// Set up JobJar to be localized properly on the remote NM. |
| String jobJar = conf.get(MRJobConfig.JAR); |
| if (jobJar != null) { |
| Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS |
| .getUri(), remoteFS.getWorkingDirectory()); |
| LocalResource rc = createLocalResource(remoteFS, remoteJobJar, |
| LocalResourceType.PATTERN, LocalResourceVisibility.APPLICATION); |
| String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, |
| JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); |
| rc.setPattern(pattern); |
| localResources.put(MRJobConfig.JOB_JAR, rc); |
| LOG.info("The job-jar file on the remote FS is " |
| + remoteJobJar.toUri().toASCIIString()); |
| } else { |
| // Job jar may be null. For e.g, for pipes, the job jar is the hadoop |
| // mapreduce jar itself which is already on the classpath. |
| LOG.info("Job jar is not present. " |
| + "Not adding any jar to the list of resources."); |
| } |
| // //////////// End of JobJar setup |
| |
| // //////////// Set up JobConf to be localized properly on the remote NM. |
| Path path = |
| MRApps.getStagingAreaDir(conf, UserGroupInformation |
| .getCurrentUser().getShortUserName()); |
| Path remoteJobSubmitDir = |
| new Path(path, oldJobId.toString()); |
| Path remoteJobConfPath = |
| new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE); |
| localResources.put( |
| MRJobConfig.JOB_CONF_FILE, |
| createLocalResource(remoteFS, remoteJobConfPath, |
| LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)); |
| LOG.info("The job-conf file on the remote FS is " |
| + remoteJobConfPath.toUri().toASCIIString()); |
| // //////////// End of JobConf setup |
| |
| // Setup DistributedCache |
| MRApps.setupDistributedCache(conf, localResources); |
| |
| // Setup up task credentials buffer |
| Credentials taskCredentials = new Credentials(); |
| |
| if (UserGroupInformation.isSecurityEnabled()) { |
| LOG.info("Adding #" + credentials.numberOfTokens() |
| + " tokens and #" + credentials.numberOfSecretKeys() |
| + " secret keys for NM use for launching container"); |
| taskCredentials.addAll(credentials); |
| } |
| |
| // LocalStorageToken is needed irrespective of whether security is enabled |
| // or not. |
| TokenCache.setJobToken(jobToken, taskCredentials); |
| |
| DataOutputBuffer containerTokens_dob = new DataOutputBuffer(); |
| LOG.info("Size of containertokens_dob is " |
| + taskCredentials.numberOfTokens()); |
| taskCredentials.writeTokenStorageToStream(containerTokens_dob); |
| taskCredentialsBuffer = |
| ByteBuffer.wrap(containerTokens_dob.getData(), 0, |
| containerTokens_dob.getLength()); |
| |
| // Add shuffle token |
| LOG.info("Putting shuffle token in serviceData"); |
| serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, |
| ShuffleHandler.serializeServiceData(jobToken)); |
| |
| Apps.addToEnvironment( |
| environment, |
| Environment.CLASSPATH.name(), |
| getInitialClasspath(conf)); |
| } catch (IOException e) { |
| throw new YarnException(e); |
| } |
| |
| // Shell |
| environment.put( |
| Environment.SHELL.name(), |
| conf.get( |
| MRJobConfig.MAPRED_ADMIN_USER_SHELL, |
| MRJobConfig.DEFAULT_SHELL) |
| ); |
| |
| // Add pwd to LD_LIBRARY_PATH, add this before adding anything else |
| Apps.addToEnvironment( |
| environment, |
| Environment.LD_LIBRARY_PATH.name(), |
| Environment.PWD.$()); |
| |
| // Add the env variables passed by the admin |
| Apps.setEnvFromInputString( |
| environment, |
| conf.get( |
| MRJobConfig.MAPRED_ADMIN_USER_ENV, |
| MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV) |
| ); |
| |
| // Construct the actual Container |
| // The null fields are per-container and will be constructed for each |
| // container separately. |
| ContainerLaunchContext container = BuilderUtils |
| .newContainerLaunchContext(null, conf |
| .get(MRJobConfig.USER_NAME), null, localResources, |
| environment, null, serviceData, taskCredentialsBuffer, |
| applicationACLs); |
| |
| return container; |
| } |
| |
| static ContainerLaunchContext createContainerLaunchContext( |
| Map<ApplicationAccessType, String> applicationACLs, |
| ContainerId containerID, Configuration conf, |
| Token<JobTokenIdentifier> jobToken, Task remoteTask, |
| final org.apache.hadoop.mapred.JobID oldJobId, |
| Resource assignedCapability, WrappedJvmID jvmID, |
| TaskAttemptListener taskAttemptListener, |
| Credentials credentials) { |
| |
| synchronized (commonContainerSpecLock) { |
| if (commonContainerSpec == null) { |
| commonContainerSpec = createCommonContainerLaunchContext( |
| applicationACLs, conf, jobToken, oldJobId, credentials); |
| } |
| } |
| |
| // Fill in the fields needed per-container that are missing in the common |
| // spec. |
| |
| // Setup environment by cloning from common env. |
| Map<String, String> env = commonContainerSpec.getEnvironment(); |
| Map<String, String> myEnv = new HashMap<String, String>(env.size()); |
| myEnv.putAll(env); |
| MapReduceChildJVM.setVMEnv(myEnv, remoteTask); |
| |
| // Set up the launch command |
| List<String> commands = MapReduceChildJVM.getVMCommand( |
| taskAttemptListener.getAddress(), remoteTask, jvmID); |
| |
| // Duplicate the ByteBuffers for access by multiple containers. |
| Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>(); |
| for (Entry<String, ByteBuffer> entry : commonContainerSpec |
| .getServiceData().entrySet()) { |
| myServiceData.put(entry.getKey(), entry.getValue().duplicate()); |
| } |
| |
| // Construct the actual Container |
| ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext( |
| containerID, commonContainerSpec.getUser(), assignedCapability, |
| commonContainerSpec.getLocalResources(), myEnv, commands, |
| myServiceData, commonContainerSpec.getContainerTokens().duplicate(), |
| applicationACLs); |
| |
| return container; |
| } |
| |
| @Override |
| public ContainerId getAssignedContainerID() { |
| readLock.lock(); |
| try { |
| return containerID; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getAssignedContainerMgrAddress() { |
| readLock.lock(); |
| try { |
| return containerMgrAddress; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getLaunchTime() { |
| readLock.lock(); |
| try { |
| return launchTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getFinishTime() { |
| readLock.lock(); |
| try { |
| return finishTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getShuffleFinishTime() { |
| readLock.lock(); |
| try { |
| return this.reportedStatus.shuffleFinishTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getSortFinishTime() { |
| readLock.lock(); |
| try { |
| return this.reportedStatus.sortFinishTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getShufflePort() { |
| readLock.lock(); |
| try { |
| return shufflePort; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public NodeId getNodeId() { |
| readLock.lock(); |
| try { |
| return containerNodeId; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /**If container Assigned then return the node's address, otherwise null. |
| */ |
| @Override |
| public String getNodeHttpAddress() { |
| readLock.lock(); |
| try { |
| return nodeHttpAddress; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| /** |
| * If container Assigned then return the node's rackname, otherwise null. |
| */ |
| @Override |
| public String getNodeRackName() { |
| this.readLock.lock(); |
| try { |
| return this.nodeRackName; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| protected abstract org.apache.hadoop.mapred.Task createRemoteTask(); |
| |
| @Override |
| public TaskAttemptId getID() { |
| return attemptId; |
| } |
| |
| @Override |
| public boolean isFinished() { |
| readLock.lock(); |
| try { |
| // TODO: Use stateMachine level method? |
| return (getState() == TaskAttemptState.SUCCEEDED || |
| getState() == TaskAttemptState.FAILED || |
| getState() == TaskAttemptState.KILLED); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TaskAttemptReport getReport() { |
| TaskAttemptReport result = recordFactory.newRecordInstance(TaskAttemptReport.class); |
| readLock.lock(); |
| try { |
| result.setTaskAttemptId(attemptId); |
| //take the LOCAL state of attempt |
| //DO NOT take from reportedStatus |
| |
| result.setTaskAttemptState(getState()); |
| result.setProgress(reportedStatus.progress); |
| result.setStartTime(launchTime); |
| result.setFinishTime(finishTime); |
| result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime); |
| result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics())); |
| result.setPhase(reportedStatus.phase); |
| result.setStateString(reportedStatus.stateString); |
| result.setCounters(TypeConverter.toYarn(getCounters())); |
| result.setContainerId(this.getAssignedContainerID()); |
| result.setNodeManagerHost(trackerName); |
| result.setNodeManagerHttpPort(httpPort); |
| if (this.containerNodeId != null) { |
| result.setNodeManagerPort(this.containerNodeId.getPort()); |
| } |
| return result; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<String> getDiagnostics() { |
| List<String> result = new ArrayList<String>(); |
| readLock.lock(); |
| try { |
| result.addAll(diagnostics); |
| return result; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Counters getCounters() { |
| readLock.lock(); |
| try { |
| Counters counters = reportedStatus.counters; |
| if (counters == null) { |
| counters = EMPTY_COUNTERS; |
| // counters.groups = new HashMap<String, CounterGroup>(); |
| } |
| return counters; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public float getProgress() { |
| readLock.lock(); |
| try { |
| return reportedStatus.progress; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public TaskAttemptState getState() { |
| readLock.lock(); |
| try { |
| return stateMachine.getCurrentState(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void handle(TaskAttemptEvent event) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing " + event.getTaskAttemptID() + " of type " |
| + event.getType()); |
| } |
| writeLock.lock(); |
| try { |
| final TaskAttemptState oldState = getState(); |
| try { |
| stateMachine.doTransition(event.getType(), event); |
| } catch (InvalidStateTransitonException e) { |
| LOG.error("Can't handle this event at current state for " |
| + this.attemptId, e); |
| eventHandler.handle(new JobDiagnosticsUpdateEvent( |
| this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() + |
| " on TaskAttempt " + this.attemptId)); |
| eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(), |
| JobEventType.INTERNAL_ERROR)); |
| } |
| if (oldState != getState()) { |
| LOG.info(attemptId + " TaskAttempt Transitioned from " |
| + oldState + " to " |
| + getState()); |
| } |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| //always called in write lock |
| private void setFinishTime() { |
| //set the finish time only if launch time is set |
| if (launchTime != 0) { |
| finishTime = clock.getTime(); |
| } |
| } |
| |
| private static long computeSlotMillis(TaskAttemptImpl taskAttempt) { |
| TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); |
| int slotMemoryReq = |
| taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); |
| |
| int minSlotMemSize = |
| taskAttempt.appContext.getClusterInfo().getMinContainerCapability() |
| .getMemory(); |
| |
| int simSlotsRequired = |
| minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq |
| / minSlotMemSize); |
| |
| long slotMillisIncrement = |
| simSlotsRequired |
| * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime()); |
| return slotMillisIncrement; |
| } |
| |
| private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed( |
| TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { |
| TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); |
| JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); |
| |
| long slotMillisIncrement = computeSlotMillis(taskAttempt); |
| |
| if (taskType == TaskType.MAP) { |
| jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1); |
| if(!taskAlreadyCompleted) { |
| // dont double count the elapsed time |
| jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); |
| } |
| } else { |
| jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1); |
| if(!taskAlreadyCompleted) { |
| // dont double count the elapsed time |
| jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); |
| } |
| } |
| return jce; |
| } |
| |
| private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled( |
| TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) { |
| TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); |
| JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId()); |
| |
| long slotMillisIncrement = computeSlotMillis(taskAttempt); |
| |
| if (taskType == TaskType.MAP) { |
| jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1); |
| if(!taskAlreadyCompleted) { |
| // dont double count the elapsed time |
| jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement); |
| } |
| } else { |
| jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1); |
| if(!taskAlreadyCompleted) { |
| // dont double count the elapsed time |
| jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement); |
| } |
| } |
| return jce; |
| } |
| |
| private static |
| TaskAttemptUnsuccessfulCompletionEvent |
| createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt, |
| TaskAttemptState attemptState) { |
| TaskAttemptUnsuccessfulCompletionEvent tauce = |
| new TaskAttemptUnsuccessfulCompletionEvent( |
| TypeConverter.fromYarn(taskAttempt.attemptId), |
| TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId() |
| .getTaskType()), attemptState.toString(), |
| taskAttempt.finishTime, |
| taskAttempt.containerNodeId == null ? "UNKNOWN" |
| : taskAttempt.containerNodeId.getHost(), |
| taskAttempt.containerNodeId == null ? -1 |
| : taskAttempt.containerNodeId.getPort(), |
| taskAttempt.nodeRackName == null ? "UNKNOWN" |
| : taskAttempt.nodeRackName, |
| StringUtils.join( |
| LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt |
| .getProgressSplitBlock().burst()); |
| return tauce; |
| } |
| |
| private WrappedProgressSplitsBlock getProgressSplitBlock() { |
| readLock.lock(); |
| try { |
| if (progressSplitBlock == null) { |
| progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt( |
| MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS, |
| MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS)); |
| } |
| return progressSplitBlock; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| private void updateProgressSplits() { |
| double newProgress = reportedStatus.progress; |
| newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D); |
| Counters counters = reportedStatus.counters; |
| if (counters == null) |
| return; |
| |
| WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock(); |
| if (splitsBlock != null) { |
| long now = clock.getTime(); |
| long start = getLaunchTime(); // TODO Ensure not 0 |
| |
| if (start != 0 && now - start <= Integer.MAX_VALUE) { |
| splitsBlock.getProgressWallclockTime().extend(newProgress, |
| (int) (now - start)); |
| } |
| |
| Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS); |
| if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) { |
| splitsBlock.getProgressCPUTime().extend(newProgress, |
| (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below |
| } |
| |
| Counter virtualBytes = counters |
| .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES); |
| if (virtualBytes != null) { |
| splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress, |
| (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION))); |
| } |
| |
| Counter physicalBytes = counters |
| .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES); |
| if (physicalBytes != null) { |
| splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress, |
| (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION))); |
| } |
| } |
| } |
| |
| static class RequestContainerTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| private final boolean rescheduled; |
| public RequestContainerTransition(boolean rescheduled) { |
| this.rescheduled = rescheduled; |
| } |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| // Tell any speculator that we're requesting a container |
| taskAttempt.eventHandler.handle |
| (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1)); |
| //request for container |
| if (rescheduled) { |
| taskAttempt.eventHandler.handle( |
| ContainerRequestEvent.createContainerRequestEventForFailedContainer( |
| taskAttempt.attemptId, |
| taskAttempt.resourceCapability)); |
| } else { |
| Set<String> racks = new HashSet<String>(); |
| for (String host : taskAttempt.dataLocalHosts) { |
| racks.add(RackResolver.resolve(host).getNetworkLocation()); |
| } |
| taskAttempt.eventHandler.handle(new ContainerRequestEvent( |
| taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt |
| .resolveHosts(taskAttempt.dataLocalHosts), racks |
| .toArray(new String[racks.size()]))); |
| } |
| } |
| } |
| |
| protected String[] resolveHosts(String[] src) { |
| String[] result = new String[src.length]; |
| for (int i = 0; i < src.length; i++) { |
| if (isIP(src[i])) { |
| result[i] = resolveHost(src[i]); |
| } else { |
| result[i] = src[i]; |
| } |
| } |
| return result; |
| } |
| |
| protected String resolveHost(String src) { |
| String result = src; // Fallback in case of failure. |
| try { |
| InetAddress addr = InetAddress.getByName(src); |
| result = addr.getHostName(); |
| } catch (UnknownHostException e) { |
| LOG.warn("Failed to resolve address: " + src |
| + ". Continuing to use the same."); |
| } |
| return result; |
| } |
| |
| private static final Pattern ipPattern = // Pattern for matching ip |
| Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}"); |
| |
| protected boolean isIP(String src) { |
| return ipPattern.matcher(src).matches(); |
| } |
| |
| private static class ContainerAssignedTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings({ "unchecked" }) |
| @Override |
| public void transition(final TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| final TaskAttemptContainerAssignedEvent cEvent = |
| (TaskAttemptContainerAssignedEvent) event; |
| taskAttempt.containerID = cEvent.getContainer().getId(); |
| taskAttempt.containerNodeId = cEvent.getContainer().getNodeId(); |
| taskAttempt.containerMgrAddress = taskAttempt.containerNodeId |
| .toString(); |
| taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress(); |
| taskAttempt.nodeRackName = RackResolver.resolve( |
| taskAttempt.containerNodeId.getHost()).getNetworkLocation(); |
| taskAttempt.containerToken = cEvent.getContainer().getContainerToken(); |
| taskAttempt.assignedCapability = cEvent.getContainer().getResource(); |
| // this is a _real_ Task (classic Hadoop mapred flavor): |
| taskAttempt.remoteTask = taskAttempt.createRemoteTask(); |
| taskAttempt.jvmID = new WrappedJvmID( |
| taskAttempt.remoteTask.getTaskID().getJobID(), |
| taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); |
| taskAttempt.taskAttemptListener.registerPendingTask( |
| taskAttempt.remoteTask, taskAttempt.jvmID); |
| |
| //launch the container |
| //create the container object to be launched for a given Task attempt |
| ContainerLaunchContext launchContext = createContainerLaunchContext( |
| cEvent.getApplicationACLs(), taskAttempt.containerID, |
| taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask, |
| taskAttempt.oldJobId, taskAttempt.assignedCapability, |
| taskAttempt.jvmID, taskAttempt.taskAttemptListener, |
| taskAttempt.credentials); |
| taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent( |
| taskAttempt.attemptId, taskAttempt.containerID, |
| taskAttempt.containerMgrAddress, taskAttempt.containerToken, |
| launchContext, taskAttempt.remoteTask)); |
| |
| // send event to speculator that our container needs are satisfied |
| taskAttempt.eventHandler.handle |
| (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); |
| } |
| } |
| |
| private static class DeallocateContainerTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| private final TaskAttemptState finalState; |
| private final boolean withdrawsContainerRequest; |
| DeallocateContainerTransition |
| (TaskAttemptState finalState, boolean withdrawsContainerRequest) { |
| this.finalState = finalState; |
| this.withdrawsContainerRequest = withdrawsContainerRequest; |
| } |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| //set the finish time |
| taskAttempt.setFinishTime(); |
| //send the deallocate event to ContainerAllocator |
| taskAttempt.eventHandler.handle( |
| new ContainerAllocatorEvent(taskAttempt.attemptId, |
| ContainerAllocator.EventType.CONTAINER_DEALLOCATE)); |
| |
| // send event to speculator that we withdraw our container needs, if |
| // we're transitioning out of UNASSIGNED |
| if (withdrawsContainerRequest) { |
| taskAttempt.eventHandler.handle |
| (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); |
| } |
| |
| switch(finalState) { |
| case FAILED: |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, |
| TaskEventType.T_ATTEMPT_FAILED)); |
| break; |
| case KILLED: |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, |
| TaskEventType.T_ATTEMPT_KILLED)); |
| break; |
| } |
| if (taskAttempt.getLaunchTime() != 0) { |
| TaskAttemptUnsuccessfulCompletionEvent tauce = |
| createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, |
| finalState); |
| if(finalState == TaskAttemptState.FAILED) { |
| taskAttempt.eventHandler |
| .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); |
| } else if(finalState == TaskAttemptState.KILLED) { |
| taskAttempt.eventHandler |
| .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); |
| } |
| taskAttempt.eventHandler.handle(new JobHistoryEvent( |
| taskAttempt.attemptId.getTaskId().getJobId(), tauce)); |
| } else { |
| LOG.debug("Not generating HistoryFinish event since start event not " + |
| "generated for taskAttempt: " + taskAttempt.getID()); |
| } |
| } |
| } |
| |
| private static class LaunchedContainerTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent evnt) { |
| |
| TaskAttemptContainerLaunchedEvent event = |
| (TaskAttemptContainerLaunchedEvent) evnt; |
| |
| //set the launch time |
| taskAttempt.launchTime = taskAttempt.clock.getTime(); |
| taskAttempt.shufflePort = event.getShufflePort(); |
| |
| // register it to TaskAttemptListener so that it can start monitoring it. |
| taskAttempt.taskAttemptListener |
| .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); |
| //TODO Resolve to host / IP in case of a local address. |
| InetSocketAddress nodeHttpInetAddr = |
| NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: |
| // Costly? |
| taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); |
| taskAttempt.httpPort = nodeHttpInetAddr.getPort(); |
| JobCounterUpdateEvent jce = |
| new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId() |
| .getJobId()); |
| jce.addCounterUpdate( |
| taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? |
| JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES |
| , 1); |
| taskAttempt.eventHandler.handle(jce); |
| |
| LOG.info("TaskAttempt: [" + taskAttempt.attemptId |
| + "] using containerId: [" + taskAttempt.containerID + " on NM: [" |
| + taskAttempt.containerMgrAddress + "]"); |
| TaskAttemptStartedEvent tase = |
| new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), |
| TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), |
| taskAttempt.launchTime, |
| nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), |
| taskAttempt.shufflePort, taskAttempt.containerID); |
| taskAttempt.eventHandler.handle |
| (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase)); |
| taskAttempt.eventHandler.handle |
| (new SpeculatorEvent |
| (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); |
| //make remoteTask reference as null as it is no more needed |
| //and free up the memory |
| taskAttempt.remoteTask = null; |
| |
| //tell the Task that attempt has started |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, |
| TaskEventType.T_ATTEMPT_LAUNCHED)); |
| } |
| } |
| |
| private static class CommitPendingTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, |
| TaskEventType.T_ATTEMPT_COMMIT_PENDING)); |
| } |
| } |
| |
| private static class TaskCleanupTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| TaskAttemptContext taskContext = |
| new TaskAttemptContextImpl(taskAttempt.conf, |
| TypeConverter.fromYarn(taskAttempt.attemptId)); |
| taskAttempt.eventHandler.handle(new TaskCleanupEvent( |
| taskAttempt.attemptId, |
| taskAttempt.committer, |
| taskContext)); |
| } |
| } |
| |
| private static class SucceededTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| //set the finish time |
| taskAttempt.setFinishTime(); |
| long slotMillis = computeSlotMillis(taskAttempt); |
| TaskId taskId = taskAttempt.attemptId.getTaskId(); |
| JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); |
| jce.addCounterUpdate( |
| taskId.getTaskType() == TaskType.MAP ? |
| JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, |
| slotMillis); |
| taskAttempt.eventHandler.handle(jce); |
| taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED); |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, |
| TaskEventType.T_ATTEMPT_SUCCEEDED)); |
| taskAttempt.eventHandler.handle |
| (new SpeculatorEvent |
| (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); |
| } |
| } |
| |
| private static class FailedTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { |
| // set the finish time |
| taskAttempt.setFinishTime(); |
| |
| if (taskAttempt.getLaunchTime() != 0) { |
| taskAttempt.eventHandler |
| .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); |
| TaskAttemptUnsuccessfulCompletionEvent tauce = |
| createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, |
| TaskAttemptState.FAILED); |
| taskAttempt.eventHandler.handle(new JobHistoryEvent( |
| taskAttempt.attemptId.getTaskId().getJobId(), tauce)); |
| // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not |
| // handling failed map/reduce events. |
| }else { |
| LOG.debug("Not generating HistoryFinish event since start event not " + |
| "generated for taskAttempt: " + taskAttempt.getID()); |
| } |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); |
| } |
| } |
| |
| @SuppressWarnings({ "unchecked" }) |
| private void logAttemptFinishedEvent(TaskAttemptState state) { |
| //Log finished events only if an attempt started. |
| if (getLaunchTime() == 0) return; |
| if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { |
| MapAttemptFinishedEvent mfe = |
| new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), |
| TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), |
| state.toString(), |
| this.reportedStatus.mapFinishTime, |
| finishTime, |
| this.containerNodeId == null ? "UNKNOWN" |
| : this.containerNodeId.getHost(), |
| this.containerNodeId == null ? -1 : this.containerNodeId.getPort(), |
| this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, |
| this.reportedStatus.stateString, |
| getCounters(), |
| getProgressSplitBlock().burst()); |
| eventHandler.handle( |
| new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); |
| } else { |
| ReduceAttemptFinishedEvent rfe = |
| new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), |
| TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), |
| state.toString(), |
| this.reportedStatus.shuffleFinishTime, |
| this.reportedStatus.sortFinishTime, |
| finishTime, |
| this.containerNodeId == null ? "UNKNOWN" |
| : this.containerNodeId.getHost(), |
| this.containerNodeId == null ? -1 : this.containerNodeId.getPort(), |
| this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, |
| this.reportedStatus.stateString, |
| getCounters(), |
| getProgressSplitBlock().burst()); |
| eventHandler.handle( |
| new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); |
| } |
| } |
| |
| private static class TooManyFetchFailureTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { |
| //add to diagnostic |
| taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); |
| //set the finish time |
| taskAttempt.setFinishTime(); |
| |
| if (taskAttempt.getLaunchTime() != 0) { |
| taskAttempt.eventHandler |
| .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); |
| TaskAttemptUnsuccessfulCompletionEvent tauce = |
| createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, |
| TaskAttemptState.FAILED); |
| taskAttempt.eventHandler.handle(new JobHistoryEvent( |
| taskAttempt.attemptId.getTaskId().getJobId(), tauce)); |
| }else { |
| LOG.debug("Not generating HistoryFinish event since start event not " + |
| "generated for taskAttempt: " + taskAttempt.getID()); |
| } |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); |
| } |
| } |
| |
| private static class KilledAfterSuccessTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event; |
| //add to diagnostic |
| taskAttempt.addDiagnosticInfo(msgEvent.getMessage()); |
| |
| // not setting a finish time since it was set on success |
| assert (taskAttempt.getFinishTime() != 0); |
| |
| assert (taskAttempt.getLaunchTime() != 0); |
| taskAttempt.eventHandler |
| .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true)); |
| TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( |
| taskAttempt, TaskAttemptState.KILLED); |
| taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId |
| .getTaskId().getJobId(), tauce)); |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); |
| } |
| } |
| |
| private static class KilledTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| //set the finish time |
| taskAttempt.setFinishTime(); |
| if (taskAttempt.getLaunchTime() != 0) { |
| taskAttempt.eventHandler |
| .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); |
| TaskAttemptUnsuccessfulCompletionEvent tauce = |
| createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, |
| TaskAttemptState.KILLED); |
| taskAttempt.eventHandler.handle(new JobHistoryEvent( |
| taskAttempt.attemptId.getTaskId().getJobId(), tauce)); |
| }else { |
| LOG.debug("Not generating HistoryFinish event since start event not " + |
| "generated for taskAttempt: " + taskAttempt.getID()); |
| } |
| // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure. |
| taskAttempt.eventHandler.handle(new TaskTAttemptEvent( |
| taskAttempt.attemptId, |
| TaskEventType.T_ATTEMPT_KILLED)); |
| } |
| } |
| |
| private static class CleanupContainerTransition implements |
| SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| // unregister it to TaskAttemptListener so that it stops listening |
| // for it |
| taskAttempt.taskAttemptListener.unregister( |
| taskAttempt.attemptId, taskAttempt.jvmID); |
| taskAttempt.reportedStatus.progress = 1.0f; |
| taskAttempt.updateProgressSplits(); |
| //send the cleanup event to containerLauncher |
| taskAttempt.eventHandler.handle(new ContainerLauncherEvent( |
| taskAttempt.attemptId, |
| taskAttempt.containerID, taskAttempt.containerMgrAddress, |
| taskAttempt.containerToken, |
| ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); |
| } |
| } |
| |
| private void addDiagnosticInfo(String diag) { |
| if (diag != null && !diag.equals("")) { |
| diagnostics.add(diag); |
| } |
| } |
| |
| private static class StatusUpdater |
| implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| // Status update calls don't really change the state of the attempt. |
| TaskAttemptStatus newReportedStatus = |
| ((TaskAttemptStatusUpdateEvent) event) |
| .getReportedTaskAttemptStatus(); |
| // Now switch the information in the reportedStatus |
| taskAttempt.reportedStatus = newReportedStatus; |
| taskAttempt.reportedStatus.taskState = taskAttempt.getState(); |
| |
| // send event to speculator about the reported status |
| taskAttempt.eventHandler.handle |
| (new SpeculatorEvent |
| (taskAttempt.reportedStatus, taskAttempt.clock.getTime())); |
| |
| taskAttempt.updateProgressSplits(); |
| |
| //if fetch failures are present, send the fetch failure event to job |
| //this only will happen in reduce attempt type |
| if (taskAttempt.reportedStatus.fetchFailedMaps != null && |
| taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) { |
| taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent( |
| taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps)); |
| } |
| } |
| } |
| |
| private static class DiagnosticInformationUpdater |
| implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { |
| @Override |
| public void transition(TaskAttemptImpl taskAttempt, |
| TaskAttemptEvent event) { |
| TaskAttemptDiagnosticsUpdateEvent diagEvent = |
| (TaskAttemptDiagnosticsUpdateEvent) event; |
| LOG.info("Diagnostics report from " + taskAttempt.attemptId + ": " |
| + diagEvent.getDiagnosticInfo()); |
| taskAttempt.addDiagnosticInfo(diagEvent.getDiagnosticInfo()); |
| } |
| } |
| |
| private void initTaskAttemptStatus(TaskAttemptStatus result) { |
| result.progress = 0.0f; |
| result.phase = Phase.STARTING; |
| result.stateString = "NEW"; |
| result.taskState = TaskAttemptState.NEW; |
| Counters counters = EMPTY_COUNTERS; |
| // counters.groups = new HashMap<String, CounterGroup>(); |
| result.counters = counters; |
| } |
| |
| } |