| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; |
| |
| import static org.apache.hadoop.yarn.util.StringHelper.pjoin; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| import javax.crypto.SecretKey; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.StringInterner; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; |
| import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; |
| import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; |
| |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels |
| .RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; |
| import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitionException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Apps; |
| import org.apache.hadoop.yarn.util.BoundedAppender; |
| import org.apache.hadoop.yarn.webapp.util.WebAppUtils; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { |
| private static final String STATE_CHANGE_MESSAGE = |
| "%s State change from %s to %s on event = %s"; |
| private static final String RECOVERY_MESSAGE = |
| "Recovering attempt: %s with final state = %s"; |
| private static final String DIAGNOSTIC_LIMIT_CONFIG_ERROR_MESSAGE = |
| "The value of %s should be a positive integer: %s"; |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(RMAppAttemptImpl.class); |
| |
| private static final RecordFactory recordFactory = RecordFactoryProvider |
| .getRecordFactory(null); |
| |
| public final static Priority AM_CONTAINER_PRIORITY = recordFactory |
| .newRecordInstance(Priority.class); |
| |
| static { |
| AM_CONTAINER_PRIORITY.setPriority(0); |
| } |
| |
| private final StateMachine<RMAppAttemptState, |
| RMAppAttemptEventType, |
| RMAppAttemptEvent> stateMachine; |
| |
| private final RMContext rmContext; |
| private final EventHandler eventHandler; |
| private final YarnScheduler scheduler; |
| private final ApplicationMasterService masterService; |
| private final RMApp rmApp; |
| |
| private final ReadLock readLock; |
| private final WriteLock writeLock; |
| |
| private final ApplicationAttemptId applicationAttemptId; |
| private final ApplicationSubmissionContext submissionContext; |
| private Token<AMRMTokenIdentifier> amrmToken = null; |
| private volatile Integer amrmTokenKeyId = null; |
| private SecretKey clientTokenMasterKey = null; |
| |
| private ConcurrentMap<NodeId, List<ContainerStatus>> |
| justFinishedContainers = new ConcurrentHashMap<>(); |
| // Tracks the previous finished containers that are waiting to be |
| // verified as received by the AM. If the AM sends the next allocate |
| // request it implicitly acks this list. |
| private ConcurrentMap<NodeId, List<ContainerStatus>> |
| finishedContainersSentToAM = new ConcurrentHashMap<>(); |
| private volatile Container masterContainer; |
| |
| private float progress = 0; |
| private String host = "N/A"; |
| private int rpcPort = -1; |
| private String originalTrackingUrl = "N/A"; |
| private String proxiedTrackingUrl = "N/A"; |
| private long startTime = 0; |
| private long finishTime = 0; |
| private long launchAMStartTime = 0; |
| private long launchAMEndTime = 0; |
| private long scheduledTime = 0; |
| private long containerAllocatedTime = 0; |
| private boolean nonWorkPreservingAMContainerFinished = false; |
| |
| // Set to null initially. Will eventually get set |
| // if an RMAppAttemptUnregistrationEvent occurs |
| private FinalApplicationStatus finalStatus = null; |
| private final BoundedAppender diagnostics; |
| private int amContainerExitStatus = ContainerExitStatus.INVALID; |
| |
| private Configuration conf; |
| private static final ExpiredTransition EXPIRED_TRANSITION = |
| new ExpiredTransition(); |
| private static final AttemptFailedTransition FAILED_TRANSITION = |
| new AttemptFailedTransition(); |
| private static final AMRegisteredTransition REGISTERED_TRANSITION = |
| new AMRegisteredTransition(); |
| private static final AMLaunchedTransition LAUNCHED_TRANSITION = |
| new AMLaunchedTransition(); |
| private RMAppAttemptEvent eventCausingFinalSaving; |
| private RMAppAttemptState targetedFinalState; |
| private RMAppAttemptState recoveredFinalState; |
| private RMAppAttemptState stateBeforeFinalSaving; |
| private Object transitionTodo; |
| |
| private RMAppAttemptMetrics attemptMetrics = null; |
| private List<ResourceRequest> amReqs = null; |
| private BlacklistManager blacklistedNodesForAM = null; |
| |
| private String amLaunchDiagnostics; |
| |
| private static final StateMachineFactory<RMAppAttemptImpl, |
| RMAppAttemptState, |
| RMAppAttemptEventType, |
| RMAppAttemptEvent> |
| stateMachineFactory = new StateMachineFactory<RMAppAttemptImpl, |
| RMAppAttemptState, |
| RMAppAttemptEventType, |
| RMAppAttemptEvent>(RMAppAttemptState.NEW) |
| |
| // Transitions from NEW State |
| .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, |
| RMAppAttemptEventType.START, new AttemptStartedTransition()) |
| .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new BaseFinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.REGISTERED, |
| new FinalSavingTransition( |
| new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) |
| .addTransition( RMAppAttemptState.NEW, |
| EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED, |
| RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED), |
| RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) |
| |
| // Transitions from SUBMITTED state |
| .addTransition(RMAppAttemptState.SUBMITTED, |
| EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, |
| RMAppAttemptState.SCHEDULED), |
| RMAppAttemptEventType.ATTEMPT_ADDED, |
| new ScheduleTransition()) |
| .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new BaseFinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.REGISTERED, |
| new FinalSavingTransition( |
| new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) |
| |
| // Transitions from SCHEDULED State |
| .addTransition(RMAppAttemptState.SCHEDULED, |
| EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING, |
| RMAppAttemptState.SCHEDULED), |
| RMAppAttemptEventType.CONTAINER_ALLOCATED, |
| new AMContainerAllocatedTransition()) |
| .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new BaseFinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.SCHEDULED, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new FinalSavingTransition( |
| new AMContainerCrashedBeforeRunningTransition(), |
| RMAppAttemptState.FAILED)) |
| |
| // Transitions from ALLOCATED_SAVING State |
| .addTransition(RMAppAttemptState.ALLOCATED_SAVING, |
| RMAppAttemptState.ALLOCATED, |
| RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) |
| |
| // App could be killed by the client. So need to handle this. |
| .addTransition(RMAppAttemptState.ALLOCATED_SAVING, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new BaseFinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.ALLOCATED_SAVING, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new FinalSavingTransition( |
| new AMContainerCrashedBeforeRunningTransition(), |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.ALLOCATED_SAVING, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| |
| // Transitions from LAUNCHED_UNMANAGED_SAVING State |
| .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, |
| RMAppAttemptState.LAUNCHED, |
| RMAppAttemptEventType.ATTEMPT_NEW_SAVED, |
| new UnmanagedAMAttemptSavedTransition()) |
| // attempt should not try to register in this state |
| .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.REGISTERED, |
| new FinalSavingTransition( |
| new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) |
| // App could be killed by the client. So need to handle this. |
| .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new BaseFinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, |
| RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| |
| // Transitions from ALLOCATED State |
| .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED, |
| RMAppAttemptEventType.LAUNCHED, LAUNCHED_TRANSITION) |
| .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.LAUNCH_FAILED, |
| new FinalSavingTransition(new LaunchFailedTransition(), |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition( |
| new KillAllocatedAMTransition(), RMAppAttemptState.KILLED)) |
| |
| .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.RUNNING, |
| RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION) |
| .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new FinalSavingTransition( |
| new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED)) |
| |
| // Transitions from LAUNCHED State |
| .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, |
| RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION) |
| .addTransition(RMAppAttemptState.LAUNCHED, |
| EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new ContainerFinishedTransition( |
| new AMContainerCrashedBeforeRunningTransition(), |
| RMAppAttemptState.LAUNCHED)) |
| .addTransition( |
| RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.EXPIRE, |
| new FinalSavingTransition(EXPIRED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new FinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| |
| // Transitions from RUNNING State |
| .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, |
| EnumSet.of( |
| RMAppAttemptEventType.LAUNCHED, |
| // Valid only for UAM restart |
| RMAppAttemptEventType.REGISTERED)) |
| .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition()) |
| .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, |
| RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) |
| .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, |
| RMAppAttemptEventType.CONTAINER_ALLOCATED) |
| .addTransition( |
| RMAppAttemptState.RUNNING, |
| EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new ContainerFinishedTransition( |
| new AMContainerCrashedAtRunningTransition(), |
| RMAppAttemptState.RUNNING)) |
| .addTransition( |
| RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.EXPIRE, |
| new FinalSavingTransition(EXPIRED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| .addTransition( |
| RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.KILL, |
| new FinalSavingTransition(new FinalTransition( |
| RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) |
| .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.FAIL, |
| new FinalSavingTransition(FAILED_TRANSITION, |
| RMAppAttemptState.FAILED)) |
| |
| // Transitions from FINAL_SAVING State |
| .addTransition(RMAppAttemptState.FINAL_SAVING, |
| EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FAILED, |
| RMAppAttemptState.KILLED, RMAppAttemptState.FINISHED), |
| RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED, |
| new FinalStateSavedTransition()) |
| .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new ContainerFinishedAtFinalSavingTransition()) |
| .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING, |
| RMAppAttemptEventType.EXPIRE, |
| new AMExpiredAtFinalSavingTransition()) |
| .addTransition(RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING, |
| EnumSet.of( |
| RMAppAttemptEventType.UNREGISTERED, |
| RMAppAttemptEventType.STATUS_UPDATE, |
| RMAppAttemptEventType.LAUNCHED, |
| RMAppAttemptEventType.LAUNCH_FAILED, |
| // should be fixed to reject container allocate request at Final |
| // Saving in scheduler |
| RMAppAttemptEventType.CONTAINER_ALLOCATED, |
| RMAppAttemptEventType.ATTEMPT_NEW_SAVED, |
| RMAppAttemptEventType.KILL, |
| RMAppAttemptEventType.FAIL, |
| RMAppAttemptEventType.ATTEMPT_ADDED)) |
| |
| // Transitions from FAILED State |
| // For work-preserving AM restart, failed attempt are still capturing |
| // CONTAINER_FINISHED event and record the finished containers for the |
| // use by the next new attempt. |
| .addTransition(RMAppAttemptState.FAILED, RMAppAttemptState.FAILED, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new ContainerFinishedAtFinalStateTransition()) |
| .addTransition( |
| RMAppAttemptState.FAILED, |
| RMAppAttemptState.FAILED, |
| EnumSet.of( |
| RMAppAttemptEventType.LAUNCHED, |
| RMAppAttemptEventType.LAUNCH_FAILED, |
| RMAppAttemptEventType.EXPIRE, |
| RMAppAttemptEventType.KILL, |
| RMAppAttemptEventType.FAIL, |
| RMAppAttemptEventType.REGISTERED, |
| RMAppAttemptEventType.UNREGISTERED, |
| RMAppAttemptEventType.STATUS_UPDATE, |
| RMAppAttemptEventType.CONTAINER_ALLOCATED)) |
| |
| // Transitions from FINISHING State |
| .addTransition(RMAppAttemptState.FINISHING, |
| EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED), |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new AMFinishingContainerFinishedTransition()) |
| .addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED, |
| RMAppAttemptEventType.EXPIRE, |
| new FinalTransition(RMAppAttemptState.FINISHED)) |
| .addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING, |
| EnumSet.of( |
| RMAppAttemptEventType.LAUNCHED, |
| RMAppAttemptEventType.UNREGISTERED, |
| RMAppAttemptEventType.STATUS_UPDATE, |
| RMAppAttemptEventType.CONTAINER_ALLOCATED, |
| // ignore Kill as we have already saved the final Finished state in |
| // state store. |
| RMAppAttemptEventType.KILL, |
| RMAppAttemptEventType.FAIL)) |
| |
| // Transitions from FINISHED State |
| .addTransition( |
| RMAppAttemptState.FINISHED, |
| RMAppAttemptState.FINISHED, |
| EnumSet.of( |
| RMAppAttemptEventType.LAUNCHED, |
| RMAppAttemptEventType.EXPIRE, |
| RMAppAttemptEventType.UNREGISTERED, |
| RMAppAttemptEventType.CONTAINER_ALLOCATED, |
| RMAppAttemptEventType.KILL, |
| RMAppAttemptEventType.FAIL)) |
| .addTransition(RMAppAttemptState.FINISHED, |
| RMAppAttemptState.FINISHED, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new ContainerFinishedAtFinalStateTransition()) |
| |
| // Transitions from KILLED State |
| .addTransition( |
| RMAppAttemptState.KILLED, |
| RMAppAttemptState.KILLED, |
| EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED, |
| RMAppAttemptEventType.LAUNCHED, |
| RMAppAttemptEventType.LAUNCH_FAILED, |
| RMAppAttemptEventType.EXPIRE, |
| RMAppAttemptEventType.REGISTERED, |
| RMAppAttemptEventType.CONTAINER_ALLOCATED, |
| RMAppAttemptEventType.UNREGISTERED, |
| RMAppAttemptEventType.KILL, |
| RMAppAttemptEventType.FAIL, |
| RMAppAttemptEventType.STATUS_UPDATE)) |
| .addTransition(RMAppAttemptState.KILLED, |
| RMAppAttemptState.KILLED, |
| RMAppAttemptEventType.CONTAINER_FINISHED, |
| new ContainerFinishedAtFinalStateTransition()) |
| .installTopology(); |
| |
| public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, |
| RMContext rmContext, YarnScheduler scheduler, |
| ApplicationMasterService masterService, |
| ApplicationSubmissionContext submissionContext, |
| Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp) { |
| this(appAttemptId, rmContext, scheduler, masterService, submissionContext, |
| conf, amReqs, rmApp, new DisabledBlacklistManager()); |
| } |
| |
| public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, |
| RMContext rmContext, YarnScheduler scheduler, |
| ApplicationMasterService masterService, |
| ApplicationSubmissionContext submissionContext, |
| Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp, |
| BlacklistManager amBlacklistManager) { |
| this.conf = conf; |
| this.applicationAttemptId = appAttemptId; |
| this.rmContext = rmContext; |
| this.eventHandler = rmContext.getDispatcher().getEventHandler(); |
| this.submissionContext = submissionContext; |
| this.scheduler = scheduler; |
| this.masterService = masterService; |
| |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| this.readLock = lock.readLock(); |
| this.writeLock = lock.writeLock(); |
| |
| this.proxiedTrackingUrl = rmContext.getAppProxyUrl(conf, |
| appAttemptId.getApplicationId()); |
| this.stateMachine = stateMachineFactory.make(this); |
| |
| this.attemptMetrics = |
| new RMAppAttemptMetrics(applicationAttemptId, rmContext); |
| |
| this.amReqs = amReqs; |
| this.blacklistedNodesForAM = amBlacklistManager; |
| |
| final int diagnosticsLimitKC = getDiagnosticsLimitKCOrThrow(conf); |
| |
| LOG.debug("{} : {}", YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, |
| diagnosticsLimitKC); |
| |
| this.diagnostics = new BoundedAppender(diagnosticsLimitKC * 1024); |
| this.rmApp = rmApp; |
| } |
| |
| private int getDiagnosticsLimitKCOrThrow(final Configuration configuration) { |
| try { |
| final int diagnosticsLimitKC = configuration.getInt( |
| YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, |
| YarnConfiguration.DEFAULT_APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC); |
| |
| if (diagnosticsLimitKC <= 0) { |
| final String message = |
| String.format(DIAGNOSTIC_LIMIT_CONFIG_ERROR_MESSAGE, |
| YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, |
| diagnosticsLimitKC); |
| LOG.error(message); |
| |
| throw new YarnRuntimeException(message); |
| } |
| |
| return diagnosticsLimitKC; |
| } catch (final NumberFormatException ignored) { |
| final String diagnosticsLimitKCString = configuration |
| .get(YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC); |
| final String message = |
| String.format(DIAGNOSTIC_LIMIT_CONFIG_ERROR_MESSAGE, |
| YarnConfiguration.APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC, |
| diagnosticsLimitKCString); |
| LOG.error(message); |
| |
| throw new YarnRuntimeException(message); |
| } |
| } |
| |
| @Override |
| public ApplicationAttemptId getAppAttemptId() { |
| return this.applicationAttemptId; |
| } |
| |
| @Override |
| public ApplicationSubmissionContext getSubmissionContext() { |
| return this.submissionContext; |
| } |
| |
| @Override |
| public FinalApplicationStatus getFinalApplicationStatus() { |
| this.readLock.lock(); |
| try { |
| return this.finalStatus; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public RMAppAttemptState getAppAttemptState() { |
| this.readLock.lock(); |
| try { |
| return this.stateMachine.getCurrentState(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getHost() { |
| this.readLock.lock(); |
| |
| try { |
| return this.host; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getRpcPort() { |
| this.readLock.lock(); |
| |
| try { |
| return this.rpcPort; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getTrackingUrl() { |
| this.readLock.lock(); |
| try { |
| return (getSubmissionContext().getUnmanagedAM()) ? |
| this.originalTrackingUrl : this.proxiedTrackingUrl; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getOriginalTrackingUrl() { |
| this.readLock.lock(); |
| try { |
| return this.originalTrackingUrl; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getWebProxyBase() { |
| this.readLock.lock(); |
| try { |
| return ProxyUriUtils.getPath(applicationAttemptId.getApplicationId()); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| private void setTrackingUrlToRMAppPage(RMAppAttemptState stateToBeStored) { |
| originalTrackingUrl = pjoin( |
| WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), |
| "cluster", "app", getAppAttemptId().getApplicationId()); |
| switch (stateToBeStored) { |
| case KILLED: |
| case FAILED: |
| proxiedTrackingUrl = originalTrackingUrl; |
| break; |
| default: |
| break; |
| } |
| } |
| |
| private void setTrackingUrlToAHSPage(RMAppAttemptState stateToBeStored) { |
| originalTrackingUrl = pjoin( |
| WebAppUtils.getHttpSchemePrefix(conf) + |
| WebAppUtils.getAHSWebAppURLWithoutScheme(conf), |
| "applicationhistory", "app", getAppAttemptId().getApplicationId()); |
| switch (stateToBeStored) { |
| case KILLED: |
| case FAILED: |
| proxiedTrackingUrl = originalTrackingUrl; |
| break; |
| default: |
| break; |
| } |
| } |
| |
| private void invalidateAMHostAndPort() { |
| this.host = "N/A"; |
| this.rpcPort = -1; |
| } |
| |
| // This is only used for RMStateStore. Normal operation must invoke the secret |
| // manager to get the key and not use the local key directly. |
| @Override |
| public SecretKey getClientTokenMasterKey() { |
| return this.clientTokenMasterKey; |
| } |
| |
| @Override |
| public Token<AMRMTokenIdentifier> getAMRMToken() { |
| this.readLock.lock(); |
| try { |
| return this.amrmToken; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Private |
| public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) { |
| this.writeLock.lock(); |
| try { |
| this.amrmToken = lastToken; |
| this.amrmTokenKeyId = null; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Private |
| public int getAMRMTokenKeyId() { |
| Integer keyId = this.amrmTokenKeyId; |
| if (keyId == null) { |
| this.readLock.lock(); |
| try { |
| if (this.amrmToken == null) { |
| throw new YarnRuntimeException("Missing AMRM token for " |
| + this.applicationAttemptId); |
| } |
| keyId = this.amrmToken.decodeIdentifier().getKeyId(); |
| this.amrmTokenKeyId = keyId; |
| } catch (IOException e) { |
| throw new YarnRuntimeException("AMRM token decode error for " |
| + this.applicationAttemptId, e); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| return keyId; |
| } |
| |
| @Override |
| public Token<ClientToAMTokenIdentifier> createClientToken(String client) { |
| this.readLock.lock(); |
| |
| try { |
| Token<ClientToAMTokenIdentifier> token = null; |
| ClientToAMTokenSecretManagerInRM secretMgr = |
| this.rmContext.getClientToAMTokenSecretManager(); |
| if (client != null && |
| secretMgr.getMasterKey(this.applicationAttemptId) != null) { |
| token = new Token<ClientToAMTokenIdentifier>( |
| new ClientToAMTokenIdentifier(this.applicationAttemptId, client), |
| secretMgr); |
| } |
| return token; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getDiagnostics() { |
| this.readLock.lock(); |
| try { |
| if (diagnostics.length() == 0 && amLaunchDiagnostics != null) { |
| return amLaunchDiagnostics; |
| } |
| return this.diagnostics.toString(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @VisibleForTesting |
| void appendDiagnostics(final CharSequence message) { |
| this.diagnostics.append(message); |
| } |
| |
| public int getAMContainerExitStatus() { |
| this.readLock.lock(); |
| try { |
| return this.amContainerExitStatus; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public float getProgress() { |
| this.readLock.lock(); |
| |
| try { |
| return this.progress; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @VisibleForTesting |
| @Override |
| public List<ContainerStatus> getJustFinishedContainers() { |
| this.readLock.lock(); |
| try { |
| List<ContainerStatus> returnList = new ArrayList<>(); |
| for (Collection<ContainerStatus> containerStatusList : |
| justFinishedContainers.values()) { |
| returnList.addAll(containerStatusList); |
| } |
| return returnList; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ConcurrentMap<NodeId, List<ContainerStatus>> |
| getJustFinishedContainersReference |
| () { |
| this.readLock.lock(); |
| try { |
| return this.justFinishedContainers; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ConcurrentMap<NodeId, List<ContainerStatus>> |
| getFinishedContainersSentToAMReference() { |
| this.readLock.lock(); |
| try { |
| return this.finishedContainersSentToAM; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<ContainerStatus> pullJustFinishedContainers() { |
| this.writeLock.lock(); |
| |
| try { |
| List<ContainerStatus> returnList = new ArrayList<>(); |
| |
| // A new allocate means the AM received the previously sent |
| // finishedContainers. We can ack this to NM now |
| sendFinishedContainersToNM(finishedContainersSentToAM); |
| |
| // Mark every containerStatus as being sent to AM though we may return |
| // only the ones that belong to the current attempt |
| boolean keepContainersAcrossAppAttempts = this.submissionContext |
| .getKeepContainersAcrossApplicationAttempts(); |
| for (Map.Entry<NodeId, List<ContainerStatus>> entry : |
| justFinishedContainers.entrySet()) { |
| NodeId nodeId = entry.getKey(); |
| List<ContainerStatus> finishedContainers = entry.getValue(); |
| if (finishedContainers.isEmpty()) { |
| continue; |
| } |
| |
| if (keepContainersAcrossAppAttempts) { |
| returnList.addAll(finishedContainers); |
| } else { |
| // Filter out containers from previous attempt |
| for (ContainerStatus containerStatus: finishedContainers) { |
| if (containerStatus.getContainerId().getApplicationAttemptId() |
| .equals(this.getAppAttemptId())) { |
| returnList.add(containerStatus); |
| } |
| } |
| } |
| |
| finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList<>()); |
| finishedContainersSentToAM.get(nodeId).addAll(finishedContainers); |
| } |
| justFinishedContainers.clear(); |
| |
| return returnList; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Container getMasterContainer() { |
| return this.masterContainer; |
| } |
| |
| @InterfaceAudience.Private |
| @VisibleForTesting |
| public void setMasterContainer(Container container) { |
| masterContainer = container; |
| } |
| |
| @Override |
| public void handle(RMAppAttemptEvent event) { |
| |
| this.writeLock.lock(); |
| |
| try { |
| ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); |
| LOG.debug("Processing event for {} of type {}", |
| appAttemptID, event.getType()); |
| final RMAppAttemptState oldState = getAppAttemptState(); |
| try { |
| /* keep the master in sync with the state machine */ |
| this.stateMachine.doTransition(event.getType(), event); |
| } catch (InvalidStateTransitionException e) { |
| LOG.error("App attempt: " + appAttemptID |
| + " can't handle this event at current state", e); |
| onInvalidTranstion(event.getType(), oldState); |
| } |
| |
| // Log at INFO if we're not recovering or not in a terminal state. |
| // Log at DEBUG otherwise. |
| if ((oldState != getAppAttemptState()) && |
| ((recoveredFinalState == null) || |
| (event.getType() != RMAppAttemptEventType.RECOVER))) { |
| LOG.info(String.format(STATE_CHANGE_MESSAGE, appAttemptID, oldState, |
| getAppAttemptState(), event.getType())); |
| } else if ((oldState != getAppAttemptState()) && LOG.isDebugEnabled()) { |
| LOG.debug(String.format(STATE_CHANGE_MESSAGE, appAttemptID, oldState, |
| getAppAttemptState(), event.getType())); |
| } |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ApplicationResourceUsageReport getApplicationResourceUsageReport() { |
| this.readLock.lock(); |
| try { |
| ApplicationResourceUsageReport report = |
| scheduler.getAppResourceUsageReport(this.getAppAttemptId()); |
| if (report == null) { |
| report = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; |
| } |
| AggregateAppResourceUsage resUsage = |
| this.attemptMetrics.getAggregateAppResourceUsage(); |
| report.setResourceSecondsMap(resUsage.getResourceUsageSecondsMap()); |
| report.setPreemptedResourceSecondsMap( |
| this.attemptMetrics.getPreemptedResourceSecondsMap()); |
| return report; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void recover(RMState state) { |
| ApplicationStateData appState = |
| state.getApplicationState().get(getAppAttemptId().getApplicationId()); |
| ApplicationAttemptStateData attemptState = |
| appState.getAttempt(getAppAttemptId()); |
| assert attemptState != null; |
| |
| if (attemptState.getState() == null) { |
| LOG.info(String.format(RECOVERY_MESSAGE, getAppAttemptId(), "NONE")); |
| } else if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format(RECOVERY_MESSAGE, getAppAttemptId(), |
| attemptState.getState())); |
| } |
| |
| this.diagnostics.append("Attempt recovered after RM restart"); |
| this.diagnostics.append(attemptState.getDiagnostics()); |
| this.amContainerExitStatus = attemptState.getAMContainerExitStatus(); |
| if (amContainerExitStatus == ContainerExitStatus.PREEMPTED) { |
| this.attemptMetrics.setIsPreempted(); |
| } |
| |
| Credentials credentials = attemptState.getAppAttemptTokens(); |
| setMasterContainer(attemptState.getMasterContainer()); |
| recoverAppAttemptCredentials(credentials, attemptState.getState()); |
| this.recoveredFinalState = attemptState.getState(); |
| this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); |
| this.finalStatus = attemptState.getFinalApplicationStatus(); |
| this.startTime = attemptState.getStartTime(); |
| this.finishTime = attemptState.getFinishTime(); |
| this.attemptMetrics |
| .updateAggregateAppResourceUsage(attemptState.getResourceSecondsMap()); |
| this.attemptMetrics.updateAggregatePreemptedAppResourceUsage( |
| attemptState.getPreemptedResourceSecondsMap()); |
| this.attemptMetrics.setTotalAllocatedContainers( |
| attemptState.getTotalAllocatedContainers()); |
| } |
| |
| public void transferStateFromAttempt(RMAppAttempt attempt) { |
| this.justFinishedContainers = attempt.getJustFinishedContainersReference(); |
| this.finishedContainersSentToAM = |
| attempt.getFinishedContainersSentToAMReference(); |
| // container complete msg was moved from justFinishedContainers to |
| // finishedContainersSentToAM in ApplicationMasterService#allocate, |
| // if am crashed and not received this response, we should resend |
| // this msg again after am restart |
| if (!this.finishedContainersSentToAM.isEmpty()) { |
| for (Map.Entry<NodeId, List<ContainerStatus>> finishedContainer |
| : this.finishedContainersSentToAM.entrySet()) { |
| List<ContainerStatus> containerStatuses = |
| finishedContainer.getValue(); |
| NodeId nodeId = finishedContainer.getKey(); |
| this.justFinishedContainers.putIfAbsent(nodeId, new ArrayList<>()); |
| this.justFinishedContainers.get(nodeId).addAll(containerStatuses); |
| } |
| this.finishedContainersSentToAM.clear(); |
| } |
| } |
| |
| private void recoverAppAttemptCredentials(Credentials appAttemptTokens, |
| RMAppAttemptState state) { |
| if (appAttemptTokens == null || state == RMAppAttemptState.FAILED |
| || state == RMAppAttemptState.FINISHED |
| || state == RMAppAttemptState.KILLED) { |
| return; |
| } |
| |
| if (UserGroupInformation.isSecurityEnabled()) { |
| byte[] clientTokenMasterKeyBytes = appAttemptTokens.getSecretKey( |
| RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME); |
| if (clientTokenMasterKeyBytes != null) { |
| clientTokenMasterKey = rmContext.getClientToAMTokenSecretManager() |
| .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); |
| } |
| } |
| |
| setAMRMToken(rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( |
| applicationAttemptId)); |
| } |
| |
| private static class BaseTransition implements |
| SingleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent> { |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| } |
| |
| } |
| |
| private static final class AttemptStartedTransition extends BaseTransition { |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| boolean transferStateFromPreviousAttempt = false; |
| if (event instanceof RMAppStartAttemptEvent) { |
| transferStateFromPreviousAttempt = |
| ((RMAppStartAttemptEvent) event) |
| .getTransferStateFromPreviousAttempt(); |
| } |
| appAttempt.startTime = System.currentTimeMillis(); |
| |
| // Register with the ApplicationMasterService |
| appAttempt.masterService |
| .registerAppAttempt(appAttempt.applicationAttemptId); |
| |
| if (UserGroupInformation.isSecurityEnabled()) { |
| appAttempt.clientTokenMasterKey = |
| appAttempt.rmContext.getClientToAMTokenSecretManager() |
| .createMasterKey(appAttempt.applicationAttemptId); |
| } |
| |
| // Add the applicationAttempt to the scheduler and inform the scheduler |
| // whether to transfer the state from previous attempt. |
| appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( |
| appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); |
| } |
| } |
| |
| private static final List<ContainerId> EMPTY_CONTAINER_RELEASE_LIST = |
| new ArrayList<ContainerId>(); |
| |
| private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST = |
| new ArrayList<ResourceRequest>(); |
| |
| @VisibleForTesting |
| public static final class ScheduleTransition |
| implements |
| MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { |
| @Override |
| public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| ApplicationSubmissionContext subCtx = appAttempt.submissionContext; |
| if (!subCtx.getUnmanagedAM()) { |
| // Need reset #containers before create new attempt, because this request |
| // will be passed to scheduler, and scheduler will deduct the number after |
| // AM container allocated |
| |
| // Currently, following fields are all hard coded, |
| // TODO: change these fields when we want to support |
| // priority or multiple containers AM container allocation. |
| for (ResourceRequest amReq : appAttempt.amReqs) { |
| amReq.setNumContainers(1); |
| amReq.setPriority(AM_CONTAINER_PRIORITY); |
| } |
| |
| int numNodes = |
| RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext, |
| appAttempt.conf, appAttempt.amReqs); |
| LOG.debug("Setting node count for blacklist to {}", numNodes); |
| appAttempt.getAMBlacklistManager().refreshNodeHostCount(numNodes); |
| |
| ResourceBlacklistRequest amBlacklist = |
| appAttempt.getAMBlacklistManager().getBlacklistUpdates(); |
| |
| LOG.debug("Using blacklist for AM: additions({}) and removals({})", |
| amBlacklist.getBlacklistAdditions(), |
| amBlacklist.getBlacklistRemovals()); |
| |
| QueueInfo queueInfo = null; |
| for (ResourceRequest amReq : appAttempt.amReqs) { |
| if (amReq.getNodeLabelExpression() == null && ResourceRequest.ANY |
| .equals(amReq.getResourceName())) { |
| String queue = appAttempt.rmApp.getQueue(); |
| |
| //Load queue only once since queue will be same across attempts |
| if (queueInfo == null) { |
| try { |
| queueInfo = appAttempt.scheduler.getQueueInfo(queue, false, |
| false); |
| } catch (IOException e) { |
| LOG.error("Could not find queue for application : ", e); |
| // Set application status to REJECTED since we cant find the |
| // queue |
| appAttempt.rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppAttemptEvent(appAttempt.getAppAttemptId(), |
| RMAppAttemptEventType.FAIL, |
| "Could not find queue for application : " + |
| appAttempt.rmApp.getQueue())); |
| appAttempt.rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppEvent(appAttempt.rmApp.getApplicationId(), RMAppEventType |
| .APP_REJECTED, |
| "Could not find queue for application : " + |
| appAttempt.rmApp.getQueue())); |
| return RMAppAttemptState.FAILED; |
| } |
| } |
| |
| String labelExp = RMNodeLabelsManager.NO_LABEL; |
| if (queueInfo != null) { |
| LOG.debug("Setting default node label expression : {}", |
| queueInfo.getDefaultNodeLabelExpression()); |
| labelExp = queueInfo.getDefaultNodeLabelExpression(); |
| } |
| |
| amReq.setNodeLabelExpression(labelExp); |
| } |
| } |
| |
| // AM resource has been checked when submission |
| Allocation amContainerAllocation = |
| appAttempt.scheduler.allocate( |
| appAttempt.applicationAttemptId, |
| appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST, |
| amBlacklist.getBlacklistAdditions(), |
| amBlacklist.getBlacklistRemovals(), |
| new ContainerUpdates()); |
| if (amContainerAllocation != null |
| && amContainerAllocation.getContainers() != null) { |
| assert (amContainerAllocation.getContainers().size() == 0); |
| } |
| appAttempt.scheduledTime = System.currentTimeMillis(); |
| return RMAppAttemptState.SCHEDULED; |
| } else { |
| // save state and then go to LAUNCHED state |
| appAttempt.storeAttempt(); |
| return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; |
| } |
| } |
| } |
| |
| private static final class AMContainerAllocatedTransition |
| implements |
| MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { |
| @Override |
| public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| // Acquire the AM container from the scheduler. |
| Allocation amContainerAllocation = |
| appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, |
| EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null, |
| null, new ContainerUpdates()); |
| // There must be at least one container allocated, because a |
| // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, |
| // and is put in SchedulerApplication#newlyAllocatedContainers. |
| |
| // Note that YarnScheduler#allocate is not guaranteed to be able to |
| // fetch it since container may not be fetchable for some reason like |
| // DNS unavailable causing container token not generated. As such, we |
| // return to the previous state and keep retry until am container is |
| // fetched. |
| if (amContainerAllocation.getContainers().size() == 0) { |
| appAttempt.retryFetchingAMContainer(appAttempt); |
| return RMAppAttemptState.SCHEDULED; |
| } |
| |
| // Set the masterContainer |
| Container amContainer = amContainerAllocation.getContainers().get(0); |
| RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler |
| .getRMContainer(amContainer.getId()); |
| //while one NM is removed, the scheduler will clean the container,the |
| //following CONTAINER_FINISHED event will handle the cleaned container. |
| //so just return RMAppAttemptState.SCHEDULED |
| if (rmMasterContainer == null) { |
| return RMAppAttemptState.SCHEDULED; |
| } |
| appAttempt.setMasterContainer(amContainer); |
| rmMasterContainer.setAMContainer(true); |
| // The node set in NMTokenSecrentManager is used for marking whether the |
| // NMToken has been issued for this node to the AM. |
| // When AM container was allocated to RM itself, the node which allocates |
| // this AM container was marked as the NMToken already sent. Thus, |
| // clear this node set so that the following allocate requests from AM are |
| // able to retrieve the corresponding NMToken. |
| appAttempt.rmContext.getNMTokenSecretManager() |
| .clearNodeSetForAttempt(appAttempt.applicationAttemptId); |
| appAttempt.getSubmissionContext().setResource( |
| appAttempt.getMasterContainer().getResource()); |
| appAttempt.containerAllocatedTime = System.currentTimeMillis(); |
| long allocationDelay = |
| appAttempt.containerAllocatedTime - appAttempt.scheduledTime; |
| ClusterMetrics.getMetrics().addAMContainerAllocationDelay( |
| allocationDelay); |
| appAttempt.storeAttempt(); |
| return RMAppAttemptState.ALLOCATED_SAVING; |
| } |
| } |
| |
| private void retryFetchingAMContainer(final RMAppAttemptImpl appAttempt) { |
| // start a new thread so that we are not blocking main dispatcher thread. |
| new Thread() { |
| @Override |
| public void run() { |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while waiting to resend the" |
| + " ContainerAllocated Event."); |
| } |
| appAttempt.eventHandler.handle( |
| new RMAppAttemptEvent(appAttempt.applicationAttemptId, |
| RMAppAttemptEventType.CONTAINER_ALLOCATED)); |
| } |
| }.start(); |
| } |
| |
| private static final class AttemptStoredTransition extends BaseTransition { |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| appAttempt.registerClientToken(); |
| appAttempt.launchAttempt(); |
| } |
| } |
| |
| private static class AttemptRecoveredTransition |
| implements |
| MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { |
| @Override |
| public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| RMApp rmApp = appAttempt.rmApp; |
| |
| /* |
| * If last attempt recovered final state is null .. it means attempt was |
| * started but AM container may or may not have started / finished. |
| * Therefore we should wait for it to finish. |
| */ |
| if (appAttempt.recoveredFinalState != null) { |
| appAttempt.progress = 1.0f; |
| |
| if (appAttempt.submissionContext |
| .getKeepContainersAcrossApplicationAttempts() |
| && rmApp.getCurrentAppAttempt() != appAttempt) { |
| appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt()); |
| } |
| // We will replay the final attempt only if last attempt is in final |
| // state but application is not in final state. |
| if (rmApp.getCurrentAppAttempt() == appAttempt |
| && !RMAppImpl.isAppInFinalState(rmApp)) { |
| // Add the previous finished attempt to scheduler synchronously so |
| // that scheduler knows the previous attempt. |
| appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( |
| appAttempt.getAppAttemptId(), false, true)); |
| (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition( |
| appAttempt, event); |
| } |
| return appAttempt.recoveredFinalState; |
| } else if (RMAppImpl.isAppInFinalState(rmApp)) { |
| // Somehow attempt final state was not saved but app final state was saved. |
| // Skip adding the attempt into scheduler |
| RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState(); |
| LOG.warn(rmApp.getApplicationId() + " final state (" + appState |
| + ") was recorded, but " + appAttempt.applicationAttemptId |
| + " final state (" + appAttempt.recoveredFinalState |
| + ") was not recorded."); |
| switch (appState) { |
| case FINISHED: |
| return RMAppAttemptState.FINISHED; |
| case FAILED: |
| return RMAppAttemptState.FAILED; |
| case KILLED: |
| return RMAppAttemptState.KILLED; |
| } |
| return RMAppAttemptState.FAILED; |
| } else{ |
| // Add the current attempt to the scheduler. |
| if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { |
| // Need to register an app attempt before AM can register |
| appAttempt.masterService |
| .registerAppAttempt(appAttempt.applicationAttemptId); |
| |
| // Add attempt to scheduler synchronously to guarantee scheduler |
| // knows attempts before AM or NM re-registers. |
| appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( |
| appAttempt.getAppAttemptId(), false, true)); |
| } |
| |
| /* |
| * Since the application attempt's final state is not saved that means |
| * for AM container (previous attempt) state must be one of these. |
| * 1) AM container may not have been launched (RM failed right before |
| * this). |
| * 2) AM container was successfully launched but may or may not have |
| * registered / unregistered. |
| * In whichever case we will wait (by moving attempt into LAUNCHED |
| * state) and mark this attempt failed (assuming non work preserving |
| * restart) only after |
| * 1) Node manager during re-registration heart beats back saying |
| * am container finished. |
| * 2) OR AMLivelinessMonitor expires this attempt (when am doesn't |
| * heart beat back). |
| */ |
| LAUNCHED_TRANSITION.transition(appAttempt, event); |
| return RMAppAttemptState.LAUNCHED; |
| } |
| } |
| } |
| |
| private void rememberTargetTransitions(RMAppAttemptEvent event, |
| Object transitionToDo, RMAppAttemptState targetFinalState) { |
| transitionTodo = transitionToDo; |
| targetedFinalState = targetFinalState; |
| eventCausingFinalSaving = event; |
| } |
| |
| private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event, |
| Object transitionToDo, RMAppAttemptState targetFinalState, |
| RMAppAttemptState stateToBeStored) { |
| |
| rememberTargetTransitions(event, transitionToDo, targetFinalState); |
| stateBeforeFinalSaving = getState(); |
| |
| // As of today, finalState, diagnostics, final-tracking-url and |
| // finalAppStatus are the only things that we store into the StateStore |
| // AFTER the initial saving on app-attempt-start |
| // These fields can be visible from outside only after they are saved in |
| // StateStore |
| BoundedAppender diags = new BoundedAppender(diagnostics.getLimit()); |
| |
| // don't leave the tracking URL pointing to a non-existent AM |
| if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, |
| YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { |
| setTrackingUrlToAHSPage(stateToBeStored); |
| } else { |
| setTrackingUrlToRMAppPage(stateToBeStored); |
| } |
| String finalTrackingUrl = getOriginalTrackingUrl(); |
| FinalApplicationStatus finalStatus = null; |
| int exitStatus = ContainerExitStatus.INVALID; |
| switch (event.getType()) { |
| case LAUNCH_FAILED: |
| diags.append(event.getDiagnosticMsg()); |
| break; |
| case REGISTERED: |
| diags.append(getUnexpectedAMRegisteredDiagnostics()); |
| break; |
| case UNREGISTERED: |
| RMAppAttemptUnregistrationEvent unregisterEvent = |
| (RMAppAttemptUnregistrationEvent) event; |
| diags.append(unregisterEvent.getDiagnosticMsg()); |
| // reset finalTrackingUrl to url sent by am |
| finalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); |
| finalStatus = unregisterEvent.getFinalApplicationStatus(); |
| break; |
| case CONTAINER_FINISHED: |
| RMAppAttemptContainerFinishedEvent finishEvent = |
| (RMAppAttemptContainerFinishedEvent) event; |
| diags.append(getAMContainerCrashedDiagnostics(finishEvent)); |
| exitStatus = finishEvent.getContainerStatus().getExitStatus(); |
| break; |
| case KILL: |
| break; |
| case FAIL: |
| diags.append(event.getDiagnosticMsg()); |
| break; |
| case EXPIRE: |
| diags.append(getAMExpiredDiagnostics(event)); |
| break; |
| default: |
| break; |
| } |
| AggregateAppResourceUsage resUsage = |
| this.attemptMetrics.getAggregateAppResourceUsage(); |
| RMStateStore rmStore = rmContext.getStateStore(); |
| setFinishTime(System.currentTimeMillis()); |
| |
| ApplicationAttemptStateData attemptState = ApplicationAttemptStateData |
| .newInstance(applicationAttemptId, getMasterContainer(), |
| rmStore.getCredentialsFromAppAttempt(this), startTime, |
| stateToBeStored, finalTrackingUrl, diags.toString(), finalStatus, exitStatus, |
| getFinishTime(), resUsage.getResourceUsageSecondsMap(), |
| this.attemptMetrics.getPreemptedResourceSecondsMap(), |
| this.attemptMetrics.getTotalAllocatedContainers()); |
| LOG.info("Updating application attempt " + applicationAttemptId |
| + " with final state: " + targetedFinalState + ", and exit status: " |
| + exitStatus); |
| rmStore.updateApplicationAttemptState(attemptState); |
| } |
| |
| private static class FinalSavingTransition extends BaseTransition { |
| |
| Object transitionToDo; |
| RMAppAttemptState targetedFinalState; |
| |
| public FinalSavingTransition(Object transitionToDo, |
| RMAppAttemptState targetedFinalState) { |
| this.transitionToDo = transitionToDo; |
| this.targetedFinalState = targetedFinalState; |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| // For cases Killed/Failed, targetedFinalState is the same as the state to |
| // be stored |
| appAttempt.rememberTargetTransitionsAndStoreState(event, transitionToDo, |
| targetedFinalState, targetedFinalState); |
| } |
| } |
| |
| private static class FinalStateSavedTransition implements |
| MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { |
| @Override |
| public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; |
| |
| if (appAttempt.transitionTodo instanceof SingleArcTransition) { |
| ((SingleArcTransition) appAttempt.transitionTodo).transition( |
| appAttempt, causeEvent); |
| } else if (appAttempt.transitionTodo instanceof MultipleArcTransition) { |
| ((MultipleArcTransition) appAttempt.transitionTodo).transition( |
| appAttempt, causeEvent); |
| } |
| return appAttempt.targetedFinalState; |
| } |
| } |
| |
| private static class BaseFinalTransition extends BaseTransition { |
| |
| private final RMAppAttemptState finalAttemptState; |
| |
| public BaseFinalTransition(RMAppAttemptState finalAttemptState) { |
| this.finalAttemptState = finalAttemptState; |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId(); |
| |
| // Tell the AMS. Unregister from the ApplicationMasterService |
| appAttempt.masterService.unregisterAttempt(appAttemptId); |
| |
| // Tell the application and the scheduler |
| ApplicationId applicationId = appAttemptId.getApplicationId(); |
| RMAppEvent appEvent = null; |
| boolean keepContainersAcrossAppAttempts = false; |
| switch (finalAttemptState) { |
| case FINISHED: |
| { |
| appEvent = |
| new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHED, |
| appAttempt.getDiagnostics()); |
| } |
| break; |
| case KILLED: |
| { |
| appAttempt.invalidateAMHostAndPort(); |
| // Forward diagnostics received in attempt kill event. |
| appEvent = |
| new RMAppFailedAttemptEvent(applicationId, |
| RMAppEventType.ATTEMPT_KILLED, |
| event.getDiagnosticMsg(), false); |
| } |
| break; |
| case FAILED: |
| { |
| appAttempt.invalidateAMHostAndPort(); |
| |
| if (appAttempt.submissionContext |
| .getKeepContainersAcrossApplicationAttempts() |
| && !appAttempt.submissionContext.getUnmanagedAM()) { |
| int numberOfFailure = ((RMAppImpl)appAttempt.rmApp) |
| .getNumFailedAppAttempts(); |
| if (appAttempt.rmApp.getMaxAppAttempts() > 1 |
| && numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) { |
| keepContainersAcrossAppAttempts = true; |
| } |
| } |
| appEvent = |
| new RMAppFailedAttemptEvent(applicationId, |
| RMAppEventType.ATTEMPT_FAILED, appAttempt.getDiagnostics(), |
| keepContainersAcrossAppAttempts); |
| |
| } |
| break; |
| default: |
| { |
| LOG.error("Cannot get this state!! Error!!"); |
| } |
| break; |
| } |
| |
| appAttempt.eventHandler.handle(appEvent); |
| appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( |
| appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts)); |
| appAttempt.removeCredentials(appAttempt); |
| |
| appAttempt.rmContext.getRMApplicationHistoryWriter() |
| .applicationAttemptFinished(appAttempt, finalAttemptState); |
| appAttempt.rmContext.getSystemMetricsPublisher() |
| .appAttemptFinished(appAttempt, finalAttemptState, |
| appAttempt.rmApp, System.currentTimeMillis()); |
| } |
| } |
| |
| private static class AttemptFailedTransition extends BaseFinalTransition { |
| |
| public AttemptFailedTransition() { |
| super(RMAppAttemptState.FAILED); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| if (event.getDiagnosticMsg() != null) { |
| appAttempt.diagnostics.append(event.getDiagnosticMsg()); |
| } |
| super.transition(appAttempt, event); |
| } |
| } |
| |
| private static class AMLaunchedTransition extends BaseTransition { |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| if (event.getType() == RMAppAttemptEventType.LAUNCHED |
| || event.getType() == RMAppAttemptEventType.REGISTERED) { |
| appAttempt.launchAMEndTime = System.currentTimeMillis(); |
| long delay = appAttempt.launchAMEndTime - |
| appAttempt.launchAMStartTime; |
| ClusterMetrics.getMetrics().addAMLaunchDelay(delay); |
| } |
| appAttempt.eventHandler.handle( |
| new RMAppEvent(appAttempt.getAppAttemptId().getApplicationId(), |
| RMAppEventType.ATTEMPT_LAUNCHED, event.getTimestamp())); |
| appAttempt |
| .updateAMLaunchDiagnostics(AMState.LAUNCHED.getDiagnosticMessage()); |
| // Register with AMLivelinessMonitor |
| appAttempt.attemptLaunched(); |
| |
| } |
| } |
| |
| @Override |
| public boolean shouldCountTowardsMaxAttemptRetry() { |
| long attemptFailuresValidityInterval = this.submissionContext |
| .getAttemptFailuresValidityInterval(); |
| long end = System.currentTimeMillis(); |
| if (attemptFailuresValidityInterval > 0 |
| && this.getFinishTime() > 0 |
| && this.getFinishTime() < (end - attemptFailuresValidityInterval)) { |
| return false; |
| } |
| this.readLock.lock(); |
| try { |
| int exitStatus = getAMContainerExitStatus(); |
| return !(exitStatus == ContainerExitStatus.PREEMPTED |
| || exitStatus == ContainerExitStatus.ABORTED |
| || exitStatus == ContainerExitStatus.DISKS_FAILED |
| || exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| private static final class UnmanagedAMAttemptSavedTransition |
| extends AMLaunchedTransition { |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| // create AMRMToken |
| appAttempt.amrmToken = |
| appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( |
| appAttempt.applicationAttemptId); |
| appAttempt.registerClientToken(); |
| super.transition(appAttempt, event); |
| } |
| } |
| |
| private void registerClientToken() { |
| // register the ClientTokenMasterKey after it is saved in the store, |
| // otherwise client may hold an invalid ClientToken after RM restarts. |
| if (UserGroupInformation.isSecurityEnabled()) { |
| rmContext.getClientToAMTokenSecretManager() |
| .registerApplication(getAppAttemptId(), getClientTokenMasterKey()); |
| } |
| } |
| |
| private static final class LaunchFailedTransition extends BaseFinalTransition { |
| |
| public LaunchFailedTransition() { |
| super(RMAppAttemptState.FAILED); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| // Use diagnostic from launcher |
| appAttempt.diagnostics.append(event.getDiagnosticMsg()); |
| |
| // Tell the app, scheduler |
| super.transition(appAttempt, event); |
| |
| } |
| } |
| |
| private static final class KillAllocatedAMTransition extends |
| BaseFinalTransition { |
| public KillAllocatedAMTransition() { |
| super(RMAppAttemptState.KILLED); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| // Tell the application and scheduler |
| super.transition(appAttempt, event); |
| |
| // Tell the launcher to cleanup. |
| appAttempt.eventHandler.handle(new AMLauncherEvent( |
| AMLauncherEventType.CLEANUP, appAttempt)); |
| |
| } |
| } |
| |
| private static final class AMRegisteredTransition extends BaseTransition { |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| if (!RMAppAttemptState.LAUNCHED.equals(appAttempt.getState())) { |
| // registered received before launch |
| LAUNCHED_TRANSITION.transition(appAttempt, event); |
| } |
| long delay = System.currentTimeMillis() - appAttempt.launchAMEndTime; |
| ClusterMetrics.getMetrics().addAMRegisterDelay(delay); |
| RMAppAttemptRegistrationEvent registrationEvent |
| = (RMAppAttemptRegistrationEvent) event; |
| appAttempt.host = StringInterner.weakIntern(registrationEvent.getHost()); |
| appAttempt.rpcPort = registrationEvent.getRpcport(); |
| appAttempt.originalTrackingUrl = |
| sanitizeTrackingUrl(registrationEvent.getTrackingurl()); |
| |
| // reset AMLaunchDiagnostics once AM Registers with RM |
| appAttempt.updateAMLaunchDiagnostics(null); |
| |
| // Let the app know |
| appAttempt.eventHandler.handle(new RMAppEvent(appAttempt |
| .getAppAttemptId().getApplicationId(), |
| RMAppEventType.ATTEMPT_REGISTERED)); |
| |
| // TODO:FIXME: Note for future. Unfortunately we only do a state-store |
| // write at AM launch time, so we don't save the AM's tracking URL anywhere |
| // as that would mean an extra state-store write. For now, we hope that in |
| // work-preserving restart, AMs are forced to reregister. |
| |
| appAttempt.rmContext.getRMApplicationHistoryWriter() |
| .applicationAttemptStarted(appAttempt); |
| appAttempt.rmContext.getSystemMetricsPublisher() |
| .appAttemptRegistered(appAttempt, System.currentTimeMillis()); |
| } |
| } |
| |
| private static final class AMContainerCrashedBeforeRunningTransition extends |
| BaseFinalTransition { |
| |
| public AMContainerCrashedBeforeRunningTransition() { |
| super(RMAppAttemptState.FAILED); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| RMAppAttemptContainerFinishedEvent finishEvent = |
| ((RMAppAttemptContainerFinishedEvent)event); |
| |
| // UnRegister from AMLivelinessMonitor |
| appAttempt.rmContext.getAMLivelinessMonitor().unregister( |
| appAttempt.getAppAttemptId()); |
| |
| // Setup diagnostic message and exit status |
| appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent); |
| |
| // Tell the app, scheduler |
| super.transition(appAttempt, finishEvent); |
| } |
| } |
| |
| private void setAMContainerCrashedDiagnosticsAndExitStatus( |
| RMAppAttemptContainerFinishedEvent finishEvent) { |
| ContainerStatus status = finishEvent.getContainerStatus(); |
| this.diagnostics.append(getAMContainerCrashedDiagnostics(finishEvent)); |
| this.amContainerExitStatus = status.getExitStatus(); |
| } |
| |
| private String getAMContainerCrashedDiagnostics( |
| RMAppAttemptContainerFinishedEvent finishEvent) { |
| ContainerStatus status = finishEvent.getContainerStatus(); |
| StringBuilder diagnosticsBuilder = new StringBuilder(); |
| diagnosticsBuilder.append("AM Container for ").append( |
| finishEvent.getApplicationAttemptId()).append( |
| " exited with ").append(" exitCode: ").append(status.getExitStatus()). |
| append("\n"); |
| diagnosticsBuilder.append("Failing this attempt.").append("Diagnostics: ") |
| .append(status.getDiagnostics()); |
| if (this.getTrackingUrl() != null) { |
| diagnosticsBuilder.append("For more detailed output,").append( |
| " check the application tracking page: ").append( |
| this.getTrackingUrl()).append( |
| " Then click on links to logs of each attempt.\n"); |
| } |
| return diagnosticsBuilder.toString(); |
| } |
| |
| private static class FinalTransition extends BaseFinalTransition { |
| |
| public FinalTransition(RMAppAttemptState finalAttemptState) { |
| super(finalAttemptState); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| appAttempt.progress = 1.0f; |
| |
| // Tell the app and the scheduler |
| super.transition(appAttempt, event); |
| |
| // UnRegister from AMLivelinessMonitor. Perhaps for |
| // FAILING/KILLED/UnManaged AMs |
| appAttempt.rmContext.getAMLivelinessMonitor().unregister( |
| appAttempt.getAppAttemptId()); |
| appAttempt.rmContext.getAMFinishingMonitor().unregister( |
| appAttempt.getAppAttemptId()); |
| |
| if(!appAttempt.submissionContext.getUnmanagedAM()) { |
| // Tell the launcher to cleanup. |
| appAttempt.eventHandler.handle(new AMLauncherEvent( |
| AMLauncherEventType.CLEANUP, appAttempt)); |
| } |
| } |
| } |
| |
| private static class ExpiredTransition extends FinalTransition { |
| |
| public ExpiredTransition() { |
| super(RMAppAttemptState.FAILED); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| appAttempt.diagnostics.append(getAMExpiredDiagnostics(event)); |
| super.transition(appAttempt, event); |
| } |
| } |
| |
| private static String getAMExpiredDiagnostics(RMAppAttemptEvent event) { |
| String diag = |
| "ApplicationMaster for attempt " + event.getApplicationAttemptId() |
| + " timed out"; |
| return diag; |
| } |
| |
| private static class UnexpectedAMRegisteredTransition extends |
| BaseFinalTransition { |
| |
| public UnexpectedAMRegisteredTransition() { |
| super(RMAppAttemptState.FAILED); |
| } |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| assert appAttempt.submissionContext.getUnmanagedAM(); |
| appAttempt.diagnostics.append(getUnexpectedAMRegisteredDiagnostics()); |
| super.transition(appAttempt, event); |
| } |
| |
| } |
| |
| private static String getUnexpectedAMRegisteredDiagnostics() { |
| return "Unmanaged AM must register after AM attempt reaches LAUNCHED state."; |
| } |
| |
| private static final class StatusUpdateTransition extends |
| BaseTransition { |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| RMAppAttemptStatusupdateEvent statusUpdateEvent |
| = (RMAppAttemptStatusupdateEvent) event; |
| |
| // Update progress |
| appAttempt.progress = statusUpdateEvent.getProgress(); |
| |
| // Update tracking url if changed and save it to state store |
| String newTrackingUrl = statusUpdateEvent.getTrackingUrl(); |
| if (newTrackingUrl != null && |
| !newTrackingUrl.equals(appAttempt.originalTrackingUrl)) { |
| appAttempt.originalTrackingUrl = newTrackingUrl; |
| ApplicationAttemptStateData attemptState = ApplicationAttemptStateData |
| .newInstance(appAttempt.applicationAttemptId, |
| appAttempt.getMasterContainer(), |
| appAttempt.rmContext.getStateStore() |
| .getCredentialsFromAppAttempt(appAttempt), |
| appAttempt.startTime, appAttempt.recoveredFinalState, |
| newTrackingUrl, appAttempt.getDiagnostics(), null, |
| ContainerExitStatus.INVALID, appAttempt.getFinishTime(), |
| appAttempt.attemptMetrics.getAggregateAppResourceUsage() |
| .getResourceUsageSecondsMap(), |
| appAttempt.attemptMetrics.getPreemptedResourceSecondsMap(), |
| appAttempt.attemptMetrics.getTotalAllocatedContainers()); |
| appAttempt.rmContext.getStateStore() |
| .updateApplicationAttemptState(attemptState); |
| } |
| |
| // Ping to AMLivelinessMonitor |
| appAttempt.rmContext.getAMLivelinessMonitor().receivedPing( |
| statusUpdateEvent.getApplicationAttemptId()); |
| } |
| } |
| |
| private static final class AMUnregisteredTransition extends BaseTransition { |
| |
| @Override |
| public void transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| // Tell the app |
| if (appAttempt.getSubmissionContext().getUnmanagedAM()) { |
| // YARN-1815: Saving the attempt final state so that we do not recover |
| // the finished Unmanaged AM post RM failover |
| // Unmanaged AMs have no container to wait for, so they skip |
| // the FINISHING state and go straight to FINISHED. |
| appAttempt.rememberTargetTransitionsAndStoreState(event, |
| new AMFinishedAfterFinalSavingTransition(event), |
| RMAppAttemptState.FINISHED, RMAppAttemptState.FINISHED); |
| } else { |
| // Saving the attempt final state |
| appAttempt.rememberTargetTransitionsAndStoreState(event, |
| new FinalStateSavedAfterAMUnregisterTransition(), |
| RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED); |
| } |
| ApplicationId applicationId = |
| appAttempt.getAppAttemptId().getApplicationId(); |
| |
| // Tell the app immediately that AM is unregistering so that app itself |
| // can save its state as soon as possible. Whether we do it like this, or |
| // we wait till AppAttempt is saved, it doesn't make any difference on the |
| // app side w.r.t failure conditions. The only event going out of |
| // AppAttempt to App after this point of time is AM/AppAttempt Finished. |
| appAttempt.eventHandler.handle(new RMAppEvent(applicationId, |
| RMAppEventType.ATTEMPT_UNREGISTERED)); |
| return; |
| } |
| } |
| |
| private static class FinalStateSavedAfterAMUnregisterTransition extends |
| BaseTransition { |
| @Override |
| public void |
| transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| // Unregister from the AMlivenessMonitor and register with AMFinishingMonitor |
| appAttempt.rmContext.getAMLivelinessMonitor().unregister( |
| appAttempt.applicationAttemptId); |
| appAttempt.rmContext.getAMFinishingMonitor().register( |
| appAttempt.applicationAttemptId); |
| |
| // Do not make any more changes to this transition code. Make all changes |
| // to the following method. Unless you are absolutely sure that you have |
| // stuff to do that shouldn't be used by the callers of the following |
| // method. |
| appAttempt.updateInfoOnAMUnregister(event); |
| } |
| } |
| |
| private void updateInfoOnAMUnregister(RMAppAttemptEvent event) { |
| progress = 1.0f; |
| RMAppAttemptUnregistrationEvent unregisterEvent = |
| (RMAppAttemptUnregistrationEvent) event; |
| this.diagnostics.append(unregisterEvent.getDiagnosticMsg()); |
| originalTrackingUrl = sanitizeTrackingUrl(unregisterEvent.getFinalTrackingUrl()); |
| finalStatus = unregisterEvent.getFinalApplicationStatus(); |
| } |
| |
| private static final class ContainerFinishedTransition |
| implements |
| MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { |
| |
| // The transition To Do after attempt final state is saved. |
| private BaseTransition transitionToDo; |
| private RMAppAttemptState currentState; |
| |
| public ContainerFinishedTransition(BaseTransition transitionToDo, |
| RMAppAttemptState currentState) { |
| this.transitionToDo = transitionToDo; |
| this.currentState = currentState; |
| } |
| |
| @Override |
| public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| RMAppAttemptContainerFinishedEvent containerFinishedEvent = |
| (RMAppAttemptContainerFinishedEvent) event; |
| ContainerStatus containerStatus = |
| containerFinishedEvent.getContainerStatus(); |
| |
| // Is this container the AmContainer? If the finished container is same as |
| // the AMContainer, AppAttempt fails |
| if (appAttempt.masterContainer != null |
| && appAttempt.masterContainer.getId().equals( |
| containerStatus.getContainerId())) { |
| appAttempt.amContainerFinished(appAttempt, containerFinishedEvent); |
| |
| // Remember the follow up transition and save the final attempt state. |
| appAttempt.rememberTargetTransitionsAndStoreState(event, |
| transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); |
| return RMAppAttemptState.FINAL_SAVING; |
| } |
| |
| // Add all finished containers so that they can be acked to NM |
| addJustFinishedContainer(appAttempt, containerFinishedEvent); |
| return this.currentState; |
| } |
| } |
| |
| // Ack NM to remove finished AM container, not waiting for |
| // new appattempt to pull am container complete msg, new appattempt |
| // may launch fail and leaves too many completed container in NM |
| private void sendFinishedAMContainerToNM(NodeId nodeId, |
| ContainerId containerId) { |
| List<ContainerId> containerIdList = new ArrayList<ContainerId>(); |
| containerIdList.add(containerId); |
| eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent( |
| nodeId, containerIdList)); |
| } |
| |
| // Ack NM to remove finished containers from context. |
| private void sendFinishedContainersToNM( |
| Map<NodeId, List<ContainerStatus>> finishedContainers) { |
| for (NodeId nodeId : finishedContainers.keySet()) { |
| |
| // Clear and get current values |
| List<ContainerStatus> currentSentContainers = |
| finishedContainers.put(nodeId, new ArrayList<>()); |
| List<ContainerId> containerIdList = |
| new ArrayList<>(currentSentContainers.size()); |
| for (ContainerStatus containerStatus : currentSentContainers) { |
| containerIdList.add(containerStatus.getContainerId()); |
| } |
| eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, |
| containerIdList)); |
| } |
| finishedContainers.clear(); |
| } |
| |
| // Add am container to the list so that am container instance will be |
| // removed from NMContext. |
| private static void amContainerFinished(RMAppAttemptImpl appAttempt, |
| RMAppAttemptContainerFinishedEvent containerFinishedEvent) { |
| |
| NodeId nodeId = containerFinishedEvent.getNodeId(); |
| |
| ContainerStatus containerStatus = |
| containerFinishedEvent.getContainerStatus(); |
| if (containerStatus != null) { |
| int exitStatus = containerStatus.getExitStatus(); |
| if (Apps.shouldCountTowardsNodeBlacklisting(exitStatus)) { |
| appAttempt.addAMNodeToBlackList(nodeId); |
| } |
| } else { |
| LOG.warn("No ContainerStatus in containerFinishedEvent"); |
| } |
| |
| if (!appAttempt.getSubmissionContext() |
| .getKeepContainersAcrossApplicationAttempts()) { |
| appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, |
| new ArrayList<>()); |
| appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); |
| appAttempt.sendFinishedContainersToNM( |
| appAttempt.finishedContainersSentToAM); |
| // there might be some completed containers that have not been pulled |
| // by the AM heartbeat, explicitly add them for cleanup. |
| appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); |
| |
| // mark the fact that AM container has finished so that future finished |
| // containers will be cleaned up without the engagement of AM containers |
| // (through heartbeat) |
| appAttempt.nonWorkPreservingAMContainerFinished = true; |
| } else { |
| appAttempt.sendFinishedAMContainerToNM(nodeId, |
| containerStatus.getContainerId()); |
| } |
| } |
| |
| private void addAMNodeToBlackList(NodeId nodeId) { |
| SchedulerNode schedulerNode = scheduler.getSchedulerNode(nodeId); |
| if (schedulerNode != null) { |
| blacklistedNodesForAM.addNode(schedulerNode.getNodeName()); |
| } else { |
| LOG.info(nodeId + " is not added to AM blacklist for " |
| + applicationAttemptId + ", because it has been removed"); |
| } |
| } |
| |
| @Override |
| public BlacklistManager getAMBlacklistManager() { |
| return blacklistedNodesForAM; |
| } |
| |
| private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt, |
| RMAppAttemptContainerFinishedEvent containerFinishedEvent) { |
| appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent |
| .getNodeId(), new ArrayList<>()); |
| appAttempt.justFinishedContainers.get(containerFinishedEvent |
| .getNodeId()).add(containerFinishedEvent.getContainerStatus()); |
| |
| if (appAttempt.nonWorkPreservingAMContainerFinished) { |
| // AM container has finished, so no more AM heartbeats to do the cleanup. |
| appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers); |
| } |
| } |
| |
| private static final class ContainerFinishedAtFinalStateTransition |
| extends BaseTransition { |
| @Override |
| public void |
| transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| RMAppAttemptContainerFinishedEvent containerFinishedEvent = |
| (RMAppAttemptContainerFinishedEvent) event; |
| |
| // Normal container. Add it in completed containers list |
| addJustFinishedContainer(appAttempt, containerFinishedEvent); |
| } |
| } |
| |
| private static class AMContainerCrashedAtRunningTransition extends |
| BaseTransition { |
| @Override |
| public void |
| transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| RMAppAttemptContainerFinishedEvent finishEvent = |
| (RMAppAttemptContainerFinishedEvent) event; |
| // container associated with AM. must not be unmanaged |
| assert appAttempt.submissionContext.getUnmanagedAM() == false; |
| // Setup diagnostic message and exit status |
| appAttempt.setAMContainerCrashedDiagnosticsAndExitStatus(finishEvent); |
| new FinalTransition(RMAppAttemptState.FAILED).transition(appAttempt, |
| event); |
| } |
| } |
| |
| private static final class AMFinishingContainerFinishedTransition |
| implements |
| MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { |
| |
| @Override |
| public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, |
| RMAppAttemptEvent event) { |
| |
| RMAppAttemptContainerFinishedEvent containerFinishedEvent |
| = (RMAppAttemptContainerFinishedEvent) event; |
| ContainerStatus containerStatus = |
| containerFinishedEvent.getContainerStatus(); |
| |
| // Is this container the ApplicationMaster container? |
| if (appAttempt.masterContainer.getId().equals( |
| containerStatus.getContainerId())) { |
| new FinalTransition(RMAppAttemptState.FINISHED).transition( |
| appAttempt, containerFinishedEvent); |
| appAttempt.amContainerFinished(appAttempt, containerFinishedEvent); |
| return RMAppAttemptState.FINISHED; |
| } |
| // Add all finished containers so that they can be acked to NM. |
| addJustFinishedContainer(appAttempt, containerFinishedEvent); |
| |
| return RMAppAttemptState.FINISHING; |
| } |
| } |
| |
| private static class ContainerFinishedAtFinalSavingTransition extends |
| BaseTransition { |
| @Override |
| public void |
| transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| RMAppAttemptContainerFinishedEvent containerFinishedEvent = |
| (RMAppAttemptContainerFinishedEvent) event; |
| ContainerStatus containerStatus = |
| containerFinishedEvent.getContainerStatus(); |
| |
| // If this is the AM container, it means the AM container is finished, |
| // but we are not yet acknowledged that the final state has been saved. |
| // Thus, we still return FINAL_SAVING state here. |
| if (appAttempt.masterContainer.getId().equals( |
| containerStatus.getContainerId())) { |
| |
| appAttempt.amContainerFinished(appAttempt, containerFinishedEvent); |
| |
| if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED) |
| || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) { |
| // ignore Container_Finished Event if we were supposed to reach |
| // FAILED/KILLED state. |
| return; |
| } |
| |
| // pass in the earlier AMUnregistered Event also, as this is needed for |
| // AMFinishedAfterFinalSavingTransition later on |
| appAttempt.rememberTargetTransitions(event, |
| new AMFinishedAfterFinalSavingTransition( |
| appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED); |
| return; |
| } |
| |
| // Add all finished containers so that they can be acked to NM. |
| addJustFinishedContainer(appAttempt, containerFinishedEvent); |
| } |
| } |
| |
| private static class AMFinishedAfterFinalSavingTransition extends |
| BaseTransition { |
| RMAppAttemptEvent amUnregisteredEvent; |
| public AMFinishedAfterFinalSavingTransition( |
| RMAppAttemptEvent amUnregisteredEvent) { |
| this.amUnregisteredEvent = amUnregisteredEvent; |
| } |
| |
| @Override |
| public void |
| transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent); |
| new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt, |
| event); |
| } |
| } |
| |
| private static class AMExpiredAtFinalSavingTransition extends |
| BaseTransition { |
| @Override |
| public void |
| transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { |
| if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED) |
| || appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) { |
| // ignore Container_Finished Event if we were supposed to reach |
| // FAILED/KILLED state. |
| return; |
| } |
| |
| // pass in the earlier AMUnregistered Event also, as this is needed for |
| // AMFinishedAfterFinalSavingTransition later on |
| appAttempt.rememberTargetTransitions(event, |
| new AMFinishedAfterFinalSavingTransition( |
| appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED); |
| } |
| } |
| |
| @Override |
| public long getStartTime() { |
| this.readLock.lock(); |
| try { |
| return this.startTime; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public RMAppAttemptState getState() { |
| this.readLock.lock(); |
| |
| try { |
| return this.stateMachine.getCurrentState(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public YarnApplicationAttemptState createApplicationAttemptState() { |
| RMAppAttemptState state = getState(); |
| // If AppAttempt is in FINAL_SAVING state, return its previous state. |
| if (state.equals(RMAppAttemptState.FINAL_SAVING)) { |
| state = stateBeforeFinalSaving; |
| } |
| return RMServerUtils.createApplicationAttemptState(state); |
| } |
| |
| private void launchAttempt(){ |
| launchAMStartTime = System.currentTimeMillis(); |
| // Send event to launch the AM Container |
| eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); |
| } |
| |
| private void attemptLaunched() { |
| // Register with AMLivelinessMonitor |
| rmContext.getAMLivelinessMonitor().register(getAppAttemptId()); |
| } |
| |
| private void storeAttempt() { |
| // store attempt data in a non-blocking manner to prevent dispatcher |
| // thread starvation and wait for state to be saved |
| LOG.info("Storing attempt: AppId: " + |
| getAppAttemptId().getApplicationId() |
| + " AttemptId: " + |
| getAppAttemptId() |
| + " MasterContainer: " + masterContainer); |
| rmContext.getStateStore().storeNewApplicationAttempt(this); |
| } |
| |
| private void removeCredentials(RMAppAttemptImpl appAttempt) { |
| // Unregister from the ClientToAMTokenSecretManager |
| if (UserGroupInformation.isSecurityEnabled()) { |
| appAttempt.rmContext.getClientToAMTokenSecretManager() |
| .unRegisterApplication(appAttempt.getAppAttemptId()); |
| } |
| |
| // Remove the AppAttempt from the AMRMTokenSecretManager |
| appAttempt.rmContext.getAMRMTokenSecretManager() |
| .applicationMasterFinished(appAttempt.getAppAttemptId()); |
| } |
| |
| private static String sanitizeTrackingUrl(String url) { |
| return (url == null || url.trim().isEmpty()) ? "N/A" : url; |
| } |
| |
| @Override |
| public ApplicationAttemptReport createApplicationAttemptReport() { |
| this.readLock.lock(); |
| ApplicationAttemptReport attemptReport = null; |
| try { |
| // AM container maybe not yet allocated. and also unmangedAM doesn't have |
| // am container. |
| ContainerId amId = |
| masterContainer == null ? null : masterContainer.getId(); |
| attemptReport = ApplicationAttemptReport.newInstance( |
| this.getAppAttemptId(), this.getHost(), this.getRpcPort(), |
| this.getTrackingUrl(), this.getOriginalTrackingUrl(), |
| this.getDiagnostics(), createApplicationAttemptState(), amId, |
| this.startTime, this.finishTime); |
| } finally { |
| this.readLock.unlock(); |
| } |
| return attemptReport; |
| } |
| |
| @Override |
| public RMAppAttemptMetrics getRMAppAttemptMetrics() { |
| // didn't use read/write lock here because RMAppAttemptMetrics has its own |
| // lock |
| return attemptMetrics; |
| } |
| |
| @Override |
| public long getFinishTime() { |
| this.readLock.lock(); |
| try { |
| return this.finishTime; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| private void setFinishTime(long finishTime) { |
| this.writeLock.lock(); |
| try { |
| this.finishTime = finishTime; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void updateAMLaunchDiagnostics(String amLaunchDiagnostics) { |
| this.amLaunchDiagnostics = amLaunchDiagnostics; |
| } |
| |
| public RMAppAttemptState getRecoveredFinalState() { |
| return recoveredFinalState; |
| } |
| |
| public void setRecoveredFinalState(RMAppAttemptState finalState) { |
| this.recoveredFinalState = finalState; |
| } |
| |
| @Override |
| public Set<String> getBlacklistedNodes() { |
| if (scheduler instanceof AbstractYarnScheduler) { |
| AbstractYarnScheduler ayScheduler = |
| (AbstractYarnScheduler) scheduler; |
| SchedulerApplicationAttempt attempt = |
| ayScheduler.getApplicationAttempt(applicationAttemptId); |
| if (attempt != null) { |
| return attempt.getBlacklistedNodes(); |
| } |
| } |
| return Collections.emptySet(); |
| } |
| |
| protected void onInvalidTranstion(RMAppAttemptEventType rmAppAttemptEventType, |
| RMAppAttemptState state){ |
| /* TODO fail the application on the failed transition */ |
| } |
| } |