| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.rmapp; |
| |
| import java.net.InetAddress; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.ipc.CallerContext; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.StringInterner; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.ApplicationTimeout; |
| import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; |
| import org.apache.hadoop.yarn.api.records.CollectorInfo; |
| import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
| import org.apache.hadoop.yarn.api.records.LogAggregationStatus; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.NodeLabel; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.NodeUpdateType; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.client.api.AppAdminClient; |
| import org.apache.hadoop.yarn.conf.HAUtil; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; |
| import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; |
| import org.apache.hadoop.yarn.server.api.records.AppCollectorData; |
| import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; |
| import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.placement |
| .ApplicationPlacementContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; |
| import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitionException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.SystemClock; |
| import org.apache.hadoop.yarn.util.Times; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| public class RMAppImpl implements RMApp, Recoverable { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(RMAppImpl.class); |
| private static final String UNAVAILABLE = "N/A"; |
| private static final String UNLIMITED = "UNLIMITED"; |
| private static final long UNKNOWN = -1L; |
| private static final EnumSet<RMAppState> COMPLETED_APP_STATES = |
| EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, |
| RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); |
| private static final String STATE_CHANGE_MESSAGE = |
| "%s State change from %s to %s on event = %s"; |
| private static final String RECOVERY_MESSAGE = |
| "Recovering app: %s with %d attempts and final state = %s"; |
| |
| // Immutable fields |
| private final ApplicationId applicationId; |
| private final RMContext rmContext; |
| private final Configuration conf; |
| private final String user; |
| private final UserGroupInformation userUgi; |
| private final String name; |
| private final ApplicationSubmissionContext submissionContext; |
| private final Dispatcher dispatcher; |
| private final YarnScheduler scheduler; |
| private final ApplicationMasterService masterService; |
| private final StringBuilder diagnostics = new StringBuilder(); |
| private final int maxAppAttempts; |
| private final ReadLock readLock; |
| private final WriteLock writeLock; |
| private final Map<ApplicationAttemptId, RMAppAttempt> attempts |
| = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>(); |
| private final long submitTime; |
| private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>(); |
| private final String applicationType; |
| private final Set<String> applicationTags; |
| private Map<String, String> applicationSchedulingEnvs = new HashMap<>(); |
| |
| private final long attemptFailuresValidityInterval; |
| private boolean amBlacklistingEnabled = false; |
| private float blacklistDisableThreshold; |
| |
| private Clock systemClock; |
| |
| private boolean isNumAttemptsBeyondThreshold = false; |
| |
| |
| |
| // Mutable fields |
| private long startTime; |
| private long launchTime = 0; |
| private long finishTime = 0; |
| private long storedFinishTime = 0; |
| private int firstAttemptIdInStateStore = 1; |
| private int nextAttemptId = 1; |
| private AppCollectorData collectorData; |
| private CollectorInfo collectorInfo; |
| // This field isn't protected by readlock now. |
| private volatile RMAppAttempt currentAttempt; |
| private String queue; |
| private EventHandler handler; |
| private static final AppFinishedTransition FINISHED_TRANSITION = |
| new AppFinishedTransition(); |
| private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>(); |
| |
| private final RMAppLogAggregation logAggregation; |
| private Map<ApplicationTimeoutType, Long> applicationTimeouts = |
| new HashMap<ApplicationTimeoutType, Long>(); |
| |
| // These states stored are only valid when app is at killing or final_saving. |
| private RMAppState stateBeforeKilling; |
| private RMAppState stateBeforeFinalSaving; |
| private RMAppEvent eventCausingFinalSaving; |
| private RMAppState targetedFinalState; |
| private RMAppState recoveredFinalState; |
| private List<ResourceRequest> amReqs; |
| |
| private CallerContext callerContext; |
| |
| private ApplicationPlacementContext placementContext; |
| |
| Object transitionTodo; |
| |
| private Priority applicationPriority; |
| |
| private static final StateMachineFactory<RMAppImpl, |
| RMAppState, |
| RMAppEventType, |
| RMAppEvent> stateMachineFactory |
| = new StateMachineFactory<RMAppImpl, |
| RMAppState, |
| RMAppEventType, |
| RMAppEvent>(RMAppState.NEW) |
| |
| |
| // Transitions from NEW state |
| .addTransition(RMAppState.NEW, RMAppState.NEW, |
| RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) |
| .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, |
| RMAppEventType.START, new RMAppNewlySavingTransition()) |
| .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, |
| RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, |
| RMAppState.KILLED, RMAppState.FINAL_SAVING), |
| RMAppEventType.RECOVER, new RMAppRecoveredTransition()) |
| .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, |
| new AppKilledTransition()) |
| .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, |
| RMAppEventType.APP_REJECTED, |
| new FinalSavingTransition(new AppRejectedTransition(), |
| RMAppState.FAILED)) |
| |
| // Transitions from NEW_SAVING state |
| .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, |
| RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) |
| .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, |
| RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) |
| .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, |
| RMAppEventType.KILL, |
| new FinalSavingTransition( |
| new AppKilledTransition(), RMAppState.KILLED)) |
| .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, |
| RMAppEventType.APP_REJECTED, |
| new FinalSavingTransition(new AppRejectedTransition(), |
| RMAppState.FAILED)) |
| .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED, |
| RMAppEventType.APP_SAVE_FAILED, new AppRejectedTransition()) |
| |
| // Transitions from SUBMITTED state |
| .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, |
| RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) |
| .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, |
| RMAppEventType.APP_REJECTED, |
| new FinalSavingTransition( |
| new AppRejectedTransition(), RMAppState.FAILED)) |
| .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, |
| RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition()) |
| .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, |
| RMAppEventType.KILL, |
| new FinalSavingTransition( |
| new AppKilledTransition(), RMAppState.KILLED)) |
| |
| // Transitions from ACCEPTED state |
| .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, |
| RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) |
| .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, |
| RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( |
| YarnApplicationState.RUNNING)) |
| .addTransition(RMAppState.ACCEPTED, |
| EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), |
| // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED |
| // event because RMAppRecoveredTransition is returning ACCEPTED state |
| // directly and waiting for the previous AM to exit. |
| RMAppEventType.ATTEMPT_FAILED, |
| new AttemptFailedTransition(RMAppState.ACCEPTED)) |
| .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, |
| RMAppEventType.ATTEMPT_FINISHED, |
| new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) |
| .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, |
| RMAppEventType.KILL, new KillAttemptTransition()) |
| .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, |
| RMAppEventType.ATTEMPT_KILLED, |
| new FinalSavingTransition(new AppKilledTransition(), RMAppState.KILLED)) |
| .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| // Handle AppAttemptLaunch to update the launchTime and publish to ATS |
| .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, |
| RMAppEventType.ATTEMPT_LAUNCHED, |
| new AttemptLaunchedTransition()) |
| |
| // Transitions from RUNNING state |
| .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, |
| RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) |
| .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, |
| RMAppEventType.ATTEMPT_UNREGISTERED, |
| new FinalSavingTransition( |
| new AttemptUnregisteredTransition(), |
| RMAppState.FINISHING, RMAppState.FINISHED)) |
| .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, |
| // UnManagedAM directly jumps to finished |
| RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) |
| .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| .addTransition(RMAppState.RUNNING, |
| EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), |
| RMAppEventType.ATTEMPT_FAILED, |
| new AttemptFailedTransition(RMAppState.ACCEPTED)) |
| .addTransition(RMAppState.RUNNING, RMAppState.KILLING, |
| RMAppEventType.KILL, new KillAttemptTransition()) |
| |
| // Transitions from FINAL_SAVING state |
| .addTransition(RMAppState.FINAL_SAVING, |
| EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED, |
| RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED, |
| new FinalStateSavedTransition()) |
| .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, |
| RMAppEventType.ATTEMPT_FINISHED, |
| new AttemptFinishedAtFinalSavingTransition()) |
| .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| // ignorable transitions |
| .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, |
| EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, |
| RMAppEventType.APP_NEW_SAVED)) |
| |
| // Transitions from FINISHING state |
| .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, |
| RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) |
| .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| // ignorable transitions |
| .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, |
| EnumSet.of(RMAppEventType.NODE_UPDATE, |
| // ignore Kill/Move as we have already saved the final Finished state |
| // in state store. |
| RMAppEventType.KILL)) |
| |
| // Transitions from KILLING state |
| .addTransition(RMAppState.KILLING, RMAppState.KILLING, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, |
| RMAppEventType.ATTEMPT_KILLED, |
| new FinalSavingTransition( |
| new AppKilledTransition(), RMAppState.KILLED)) |
| .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, |
| RMAppEventType.ATTEMPT_UNREGISTERED, |
| new FinalSavingTransition( |
| new AttemptUnregisteredTransition(), |
| RMAppState.FINISHING, RMAppState.FINISHED)) |
| .addTransition(RMAppState.KILLING, RMAppState.FINISHED, |
| // UnManagedAM directly jumps to finished |
| RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) |
| .addTransition(RMAppState.KILLING, |
| EnumSet.of(RMAppState.FINAL_SAVING), |
| RMAppEventType.ATTEMPT_FAILED, |
| new AttemptFailedTransition(RMAppState.KILLING)) |
| |
| .addTransition(RMAppState.KILLING, RMAppState.KILLING, |
| EnumSet.of( |
| RMAppEventType.NODE_UPDATE, |
| RMAppEventType.ATTEMPT_REGISTERED, |
| RMAppEventType.APP_UPDATE_SAVED, |
| RMAppEventType.KILL)) |
| |
| // Transitions from FINISHED state |
| // ignorable transitions |
| .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, |
| EnumSet.of( |
| RMAppEventType.NODE_UPDATE, |
| RMAppEventType.ATTEMPT_UNREGISTERED, |
| RMAppEventType.ATTEMPT_FINISHED, |
| RMAppEventType.KILL)) |
| |
| // Transitions from FAILED state |
| // ignorable transitions |
| .addTransition(RMAppState.FAILED, RMAppState.FAILED, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| .addTransition(RMAppState.FAILED, RMAppState.FAILED, |
| EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) |
| |
| // Transitions from KILLED state |
| // ignorable transitions |
| .addTransition(RMAppState.KILLED, RMAppState.KILLED, |
| RMAppEventType.APP_RUNNING_ON_NODE, |
| new AppRunningOnNodeTransition()) |
| .addTransition( |
| RMAppState.KILLED, |
| RMAppState.KILLED, |
| EnumSet.of(RMAppEventType.APP_ACCEPTED, |
| RMAppEventType.APP_REJECTED, RMAppEventType.KILL, |
| RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, |
| RMAppEventType.NODE_UPDATE, RMAppEventType.START)) |
| |
| .installTopology(); |
| |
| private final StateMachine<RMAppState, RMAppEventType, RMAppEvent> |
| stateMachine; |
| |
| private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1; |
| private static final float MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 0.0f; |
| private static final float MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 1.0f; |
| |
| public RMAppImpl(ApplicationId applicationId, RMContext rmContext, |
| Configuration config, String name, String user, String queue, |
| ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, |
| ApplicationMasterService masterService, long submitTime, |
| String applicationType, Set<String> applicationTags, |
| List<ResourceRequest> amReqs) { |
| this(applicationId, rmContext, config, name, user, queue, submissionContext, |
| scheduler, masterService, submitTime, applicationType, applicationTags, |
| amReqs, null, -1); |
| } |
| |
| public RMAppImpl(ApplicationId applicationId, RMContext rmContext, |
| Configuration config, String name, String user, String queue, |
| ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, |
| ApplicationMasterService masterService, long submitTime, |
| String applicationType, Set<String> applicationTags, |
| List<ResourceRequest> amReqs, ApplicationPlacementContext |
| placementContext, long startTime) { |
| this(applicationId, rmContext, config, name, |
| (user != null ? UserGroupInformation.createRemoteUser(user) : null), |
| queue, submissionContext, scheduler, masterService, submitTime, |
| applicationType, applicationTags, amReqs, placementContext, startTime); |
| } |
| |
| public RMAppImpl(ApplicationId applicationId, RMContext rmContext, |
| Configuration config, String name, UserGroupInformation userUgi, |
| String queue, ApplicationSubmissionContext submissionContext, |
| YarnScheduler scheduler, ApplicationMasterService masterService, |
| long submitTime, String applicationType, Set<String> applicationTags, |
| List<ResourceRequest> amReqs, ApplicationPlacementContext |
| placementContext, long startTime) { |
| this.systemClock = SystemClock.getInstance(); |
| |
| this.applicationId = applicationId; |
| this.name = StringInterner.weakIntern(name); |
| this.rmContext = rmContext; |
| this.dispatcher = rmContext.getDispatcher(); |
| this.handler = dispatcher.getEventHandler(); |
| this.conf = config; |
| this.user = StringInterner.weakIntern( |
| (userUgi != null) ? userUgi.getShortUserName() : null); |
| this.userUgi = userUgi; |
| this.queue = queue; |
| this.submissionContext = submissionContext; |
| this.scheduler = scheduler; |
| this.masterService = masterService; |
| this.submitTime = submitTime; |
| if (startTime <= 0) { |
| this.startTime = this.systemClock.getTime(); |
| } else { |
| this.startTime = startTime; |
| } |
| this.applicationType = StringInterner.weakIntern(applicationType); |
| this.applicationTags = applicationTags; |
| this.amReqs = amReqs; |
| if (submissionContext.getPriority() != null) { |
| this.applicationPriority = Priority |
| .newInstance(submissionContext.getPriority().getPriority()); |
| } else { |
| // If incoming app does not have priority configured in submission |
| // context, system could be assume that its a 0 priority app and could be |
| // considered as normal. |
| this.applicationPriority = Priority.newInstance(0); |
| } |
| |
| int globalMaxAppAttempts = conf.getInt( |
| YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS, |
| conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); |
| int rmMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| int individualMaxAppAttempts = submissionContext.getMaxAppAttempts(); |
| if (individualMaxAppAttempts <= 0) { |
| this.maxAppAttempts = rmMaxAppAttempts; |
| LOG.warn("The specific max attempts: " + individualMaxAppAttempts |
| + " for application: " + applicationId.getId() |
| + " is invalid, because it is less than or equal to zero." |
| + " Use the rm max attempts instead."); |
| } else if (individualMaxAppAttempts > globalMaxAppAttempts) { |
| this.maxAppAttempts = globalMaxAppAttempts; |
| LOG.warn("The specific max attempts: " + individualMaxAppAttempts |
| + " for application: " + applicationId.getId() |
| + " is invalid, because it is out of the range [1, " |
| + globalMaxAppAttempts + "]. Use the global max attempts instead."); |
| } else { |
| this.maxAppAttempts = individualMaxAppAttempts; |
| } |
| |
| this.attemptFailuresValidityInterval = |
| submissionContext.getAttemptFailuresValidityInterval(); |
| if (this.attemptFailuresValidityInterval > 0) { |
| LOG.info("The attemptFailuresValidityInterval for the application: " |
| + this.applicationId + " is " + this.attemptFailuresValidityInterval |
| + "."); |
| } |
| |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| this.readLock = lock.readLock(); |
| this.writeLock = lock.writeLock(); |
| |
| this.stateMachine = stateMachineFactory.make(this); |
| |
| this.callerContext = CallerContext.getCurrent(); |
| |
| this.placementContext = placementContext; |
| |
| // If applications are not explicitly specifying envs, try to pull from |
| // AM container environment lists. |
| if(submissionContext.getAMContainerSpec() != null) { |
| applicationSchedulingEnvs |
| .putAll(submissionContext.getAMContainerSpec().getEnvironment()); |
| } |
| applicationSchedulingEnvs |
| .putAll(submissionContext.getApplicationSchedulingPropertiesMap()); |
| |
| this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock); |
| |
| // amBlacklistingEnabled can be configured globally |
| // Just use the global values |
| amBlacklistingEnabled = |
| conf.getBoolean( |
| YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, |
| YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED); |
| if (amBlacklistingEnabled) { |
| blacklistDisableThreshold = conf.getFloat( |
| YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD, |
| YarnConfiguration. |
| DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD); |
| // Verify whether blacklistDisableThreshold is valid. And for invalid |
| // threshold, reset to global level blacklistDisableThreshold |
| // configured. |
| if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE || |
| blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) { |
| blacklistDisableThreshold = YarnConfiguration. |
| DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; |
| } |
| } |
| } |
| |
| /** |
| * Starts the application level timeline collector for this app. This should |
| * be used only if the timeline service v.2 is enabled. |
| */ |
| public void startTimelineCollector() { |
| AppLevelTimelineCollector collector = |
| new AppLevelTimelineCollector(applicationId, user); |
| rmContext.getRMTimelineCollectorManager().putIfAbsent( |
| applicationId, collector); |
| } |
| |
| /** |
| * Stops the application level timeline collector for this app. This should be |
| * used only if the timeline service v.2 is enabled. |
| */ |
| public void stopTimelineCollector() { |
| rmContext.getRMTimelineCollectorManager().remove(applicationId); |
| } |
| |
| @Override |
| public ApplicationId getApplicationId() { |
| return this.applicationId; |
| } |
| |
| @Override |
| public ApplicationSubmissionContext getApplicationSubmissionContext() { |
| return this.submissionContext; |
| } |
| |
| @Override |
| public FinalApplicationStatus getFinalApplicationStatus() { |
| // finish state is obtained based on the state machine's current state |
| // as a fall-back in case the application has not been unregistered |
| // ( or if the app never unregistered itself ) |
| // when the report is requested |
| if (currentAttempt != null |
| && currentAttempt.getFinalApplicationStatus() != null) { |
| return currentAttempt.getFinalApplicationStatus(); |
| } |
| return createFinalApplicationStatus(this.stateMachine.getCurrentState()); |
| } |
| |
| @Override |
| public RMAppState getState() { |
| this.readLock.lock(); |
| try { |
| return this.stateMachine.getCurrentState(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getUser() { |
| return this.user; |
| } |
| |
| @Override |
| public float getProgress() { |
| RMAppAttempt attempt = this.currentAttempt; |
| if (attempt != null) { |
| return attempt.getProgress(); |
| } |
| return 0; |
| } |
| |
| @Override |
| public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) { |
| this.readLock.lock(); |
| |
| try { |
| return this.attempts.get(appAttemptId); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getQueue() { |
| return this.queue; |
| } |
| |
| @Override |
| public void setQueue(String queue) { |
| this.queue = queue; |
| } |
| |
| @Override |
| public AppCollectorData getCollectorData() { |
| return this.collectorData; |
| } |
| |
| public void setCollectorData(AppCollectorData incomingData) { |
| this.collectorData = incomingData; |
| this.collectorInfo = CollectorInfo.newInstance( |
| incomingData.getCollectorAddr(), incomingData.getCollectorToken()); |
| } |
| |
| public CollectorInfo getCollectorInfo() { |
| return this.collectorInfo; |
| } |
| |
| public void removeCollectorData() { |
| this.collectorData = null; |
| } |
| |
| @Override |
| public String getName() { |
| return this.name; |
| } |
| |
| @Override |
| public RMAppAttempt getCurrentAppAttempt() { |
| return this.currentAttempt; |
| } |
| |
| @Override |
| public Map<ApplicationAttemptId, RMAppAttempt> getAppAttempts() { |
| this.readLock.lock(); |
| |
| try { |
| return Collections.unmodifiableMap(this.attempts); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { |
| switch(state) { |
| case NEW: |
| case NEW_SAVING: |
| case SUBMITTED: |
| case ACCEPTED: |
| case RUNNING: |
| case FINAL_SAVING: |
| case KILLING: |
| return FinalApplicationStatus.UNDEFINED; |
| // finished without a proper final state is the same as failed |
| case FINISHING: |
| case FINISHED: |
| case FAILED: |
| return FinalApplicationStatus.FAILED; |
| case KILLED: |
| return FinalApplicationStatus.KILLED; |
| } |
| throw new YarnRuntimeException("Unknown state passed!"); |
| } |
| |
| @Override |
| public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) { |
| this.writeLock.lock(); |
| try { |
| int updatedNodeCount = this.updatedNodes.size(); |
| upNodes.putAll(this.updatedNodes); |
| this.updatedNodes.clear(); |
| return updatedNodeCount; |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ApplicationReport createAndGetApplicationReport(String clientUserName, |
| boolean allowAccess) { |
| this.readLock.lock(); |
| |
| try { |
| ApplicationAttemptId currentApplicationAttemptId = null; |
| org.apache.hadoop.yarn.api.records.Token clientToAMToken = null; |
| String trackingUrl = UNAVAILABLE; |
| String host = UNAVAILABLE; |
| String origTrackingUrl = UNAVAILABLE; |
| LogAggregationStatus logAggregationStatus = null; |
| int rpcPort = -1; |
| ApplicationResourceUsageReport appUsageReport = |
| RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; |
| FinalApplicationStatus finishState = getFinalApplicationStatus(); |
| String diags = UNAVAILABLE; |
| float progress = 0.0f; |
| org.apache.hadoop.yarn.api.records.Token amrmToken = null; |
| if (allowAccess) { |
| trackingUrl = rmContext.getAppProxyUrl(conf, applicationId); |
| if (this.currentAttempt != null) { |
| currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); |
| trackingUrl = this.currentAttempt.getTrackingUrl(); |
| origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| // get a token so the client can communicate with the app attempt |
| // NOTE: token may be unavailable if the attempt is not running |
| Token<ClientToAMTokenIdentifier> attemptClientToAMToken = |
| this.currentAttempt.createClientToken(clientUserName); |
| if (attemptClientToAMToken != null) { |
| clientToAMToken = BuilderUtils.newClientToAMToken( |
| attemptClientToAMToken.getIdentifier(), |
| attemptClientToAMToken.getKind().toString(), |
| attemptClientToAMToken.getPassword(), |
| attemptClientToAMToken.getService().toString()); |
| } |
| } |
| host = this.currentAttempt.getHost(); |
| rpcPort = this.currentAttempt.getRpcPort(); |
| appUsageReport = currentAttempt.getApplicationResourceUsageReport(); |
| progress = currentAttempt.getProgress(); |
| logAggregationStatus = this.getLogAggregationStatusForAppReport(); |
| } |
| //if the diagnostics is not already set get it from attempt |
| diags = getDiagnostics().toString(); |
| |
| if (currentAttempt != null && |
| currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { |
| if (getApplicationSubmissionContext().getUnmanagedAM() && |
| clientUserName != null && getUser().equals(clientUserName)) { |
| Token<AMRMTokenIdentifier> token = currentAttempt.getAMRMToken(); |
| if (token != null) { |
| amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(), |
| token.getKind().toString(), token.getPassword(), |
| token.getService().toString()); |
| } |
| } |
| } |
| |
| RMAppMetrics rmAppMetrics = getRMAppMetrics(); |
| appUsageReport |
| .setResourceSecondsMap(rmAppMetrics.getResourceSecondsMap()); |
| appUsageReport.setPreemptedResourceSecondsMap( |
| rmAppMetrics.getPreemptedResourceSecondsMap()); |
| } |
| |
| if (currentApplicationAttemptId == null) { |
| currentApplicationAttemptId = |
| BuilderUtils.newApplicationAttemptId(this.applicationId, |
| DUMMY_APPLICATION_ATTEMPT_NUMBER); |
| } |
| |
| ApplicationReport report = BuilderUtils.newApplicationReport( |
| this.applicationId, currentApplicationAttemptId, this.user, |
| this.queue, this.name, host, rpcPort, clientToAMToken, |
| createApplicationState(), diags, trackingUrl, this.startTime, |
| this.launchTime, this.finishTime, finishState, appUsageReport, |
| origTrackingUrl, progress, this.applicationType, amrmToken, |
| applicationTags, this.getApplicationPriority()); |
| report.setLogAggregationStatus(logAggregationStatus); |
| report.setUnmanagedApp(submissionContext.getUnmanagedAM()); |
| report.setAppNodeLabelExpression(getAppNodeLabelExpression()); |
| report.setAmNodeLabelExpression(getAmNodeLabelExpression()); |
| if (HAUtil.isFederationEnabled(conf)) { |
| report.setRMClusterId(YarnConfiguration.getClusterId(conf)); |
| } |
| |
| ApplicationTimeout timeout = ApplicationTimeout |
| .newInstance(ApplicationTimeoutType.LIFETIME, UNLIMITED, UNKNOWN); |
| // Currently timeout type supported is LIFETIME. When more timeout types |
| // are supported in YARN-5692, the below logic need to be changed. |
| if (!this.applicationTimeouts.isEmpty()) { |
| long timeoutInMillis = applicationTimeouts |
| .get(ApplicationTimeoutType.LIFETIME).longValue(); |
| timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis)); |
| if (isAppInCompletedStates()) { |
| // if application configured with timeout and finished before timeout |
| // happens then remaining time should not be calculated. |
| timeout.setRemainingTime(0); |
| } else { |
| timeout.setRemainingTime( |
| Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0)); |
| } |
| } |
| report.setApplicationTimeouts( |
| Collections.singletonMap(timeout.getTimeoutType(), timeout)); |
| return report; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getFinishTime() { |
| this.readLock.lock(); |
| |
| try { |
| return this.finishTime; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getStartTime() { |
| this.readLock.lock(); |
| |
| try { |
| return this.startTime; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getLaunchTime() { |
| this.readLock.lock(); |
| |
| try { |
| return this.launchTime; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public long getSubmitTime() { |
| return this.submitTime; |
| } |
| |
| @Override |
| public String getTrackingUrl() { |
| RMAppAttempt attempt = this.currentAttempt; |
| if (attempt != null) { |
| return attempt.getTrackingUrl(); |
| } |
| return null; |
| } |
| |
| @Override |
| public String getOriginalTrackingUrl() { |
| RMAppAttempt attempt = this.currentAttempt; |
| if (attempt != null) { |
| return attempt.getOriginalTrackingUrl(); |
| } |
| return null; |
| } |
| |
| @Override |
| public StringBuilder getDiagnostics() { |
| this.readLock.lock(); |
| try { |
| if (diagnostics.length() == 0 && getCurrentAppAttempt() != null) { |
| String appAttemptDiagnostics = getCurrentAppAttempt().getDiagnostics(); |
| if (appAttemptDiagnostics != null) { |
| return new StringBuilder(appAttemptDiagnostics); |
| } |
| } |
| return this.diagnostics; |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getMaxAppAttempts() { |
| return this.maxAppAttempts; |
| } |
| |
| @Override |
| public void handle(RMAppEvent event) { |
| this.writeLock.lock(); |
| |
| try { |
| ApplicationId appID = event.getApplicationId(); |
| LOG.debug("Processing event for {} of type {}", |
| appID, event.getType()); |
| |
| final RMAppState oldState = getState(); |
| try { |
| /* keep the master in sync with the state machine */ |
| this.stateMachine.doTransition(event.getType(), event); |
| } catch (InvalidStateTransitionException e) { |
| LOG.error("App: " + appID |
| + " can't handle this event at current state", e); |
| onInvalidStateTransition(event.getType(), oldState); |
| } |
| |
| // Log at INFO if we're not recovering or not in a terminal state. |
| // Log at DEBUG otherwise. |
| if ((oldState != getState()) && |
| (((recoveredFinalState == null)) || |
| (event.getType() != RMAppEventType.RECOVER))) { |
| LOG.info(String.format(STATE_CHANGE_MESSAGE, appID, oldState, |
| getState(), event.getType())); |
| } else if ((oldState != getState()) && LOG.isDebugEnabled()) { |
| LOG.debug(String.format(STATE_CHANGE_MESSAGE, appID, oldState, |
| getState(), event.getType())); |
| } |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void recover(RMState state) { |
| ApplicationStateData appState = |
| state.getApplicationState().get(getApplicationId()); |
| this.recoveredFinalState = appState.getState(); |
| |
| if (recoveredFinalState == null) { |
| LOG.info(String.format(RECOVERY_MESSAGE, getApplicationId(), |
| appState.getAttemptCount(), "NONE")); |
| } else if (LOG.isDebugEnabled()) { |
| LOG.debug(String.format(RECOVERY_MESSAGE, getApplicationId(), |
| appState.getAttemptCount(), recoveredFinalState)); |
| } |
| |
| this.diagnostics.append(null == appState.getDiagnostics() ? "" : appState |
| .getDiagnostics()); |
| this.storedFinishTime = appState.getFinishTime(); |
| this.startTime = appState.getStartTime(); |
| this.launchTime = appState.getLaunchTime(); |
| this.callerContext = appState.getCallerContext(); |
| this.applicationTimeouts = appState.getApplicationTimeouts(); |
| // If interval > 0, some attempts might have been deleted. |
| if (this.attemptFailuresValidityInterval > 0) { |
| this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); |
| this.nextAttemptId = firstAttemptIdInStateStore; |
| } |
| //TODO recover collector address. |
| //this.collectorAddr = appState.getCollectorAddr(); |
| |
| // send the ATS create Event during RM recovery. |
| // NOTE: it could be duplicated with events sent before RM get restarted. |
| sendATSCreateEvent(); |
| RMAppAttemptImpl preAttempt = null; |
| for (ApplicationAttemptId attemptId : |
| new TreeSet<>(appState.attempts.keySet())) { |
| // create attempt |
| createNewAttempt(attemptId); |
| ((RMAppAttemptImpl)this.currentAttempt).recover(state); |
| // If previous attempt is not in final state, it means we failed to store |
| // its final state. We set it to FAILED now because we could not make sure |
| // about its final state. |
| if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) { |
| preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED); |
| } |
| preAttempt = (RMAppAttemptImpl)currentAttempt; |
| } |
| if (currentAttempt != null) { |
| nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1; |
| } |
| } |
| |
| private void createNewAttempt() { |
| ApplicationAttemptId appAttemptId = |
| ApplicationAttemptId.newInstance(applicationId, nextAttemptId++); |
| createNewAttempt(appAttemptId); |
| } |
| |
| private void createNewAttempt(ApplicationAttemptId appAttemptId) { |
| BlacklistManager currentAMBlacklistManager; |
| if (currentAttempt != null) { |
| // Transfer over the blacklist from the previous app-attempt. |
| currentAMBlacklistManager = currentAttempt.getAMBlacklistManager(); |
| } else { |
| if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) { |
| currentAMBlacklistManager = new SimpleBlacklistManager( |
| RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, |
| getAMResourceRequests()), |
| blacklistDisableThreshold); |
| } else { |
| currentAMBlacklistManager = new DisabledBlacklistManager(); |
| } |
| } |
| RMAppAttempt attempt = |
| new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, |
| submissionContext, conf, amReqs, this, currentAMBlacklistManager); |
| attempts.put(appAttemptId, attempt); |
| currentAttempt = attempt; |
| } |
| |
| private void |
| createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { |
| createNewAttempt(); |
| handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), |
| transferStateFromPreviousAttempt)); |
| } |
| |
| private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { |
| NodeState nodeState = node.getState(); |
| updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type)); |
| LOG.debug("Received node update event:{} for node:{} with state:", |
| type, node, nodeState); |
| } |
| |
| private static class RMAppTransition implements |
| SingleArcTransition<RMAppImpl, RMAppEvent> { |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| }; |
| } |
| |
| private static final class RMAppNodeUpdateTransition extends RMAppTransition { |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event; |
| app.processNodeUpdate(nodeUpdateEvent.getUpdateType(), |
| nodeUpdateEvent.getNode()); |
| }; |
| } |
| |
| private static final class RMAppStateUpdateTransition |
| extends RMAppTransition { |
| private YarnApplicationState stateToATS; |
| |
| public RMAppStateUpdateTransition(YarnApplicationState state) { |
| stateToATS = state; |
| } |
| |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.rmContext.getSystemMetricsPublisher().appStateUpdated( |
| app, stateToATS, app.systemClock.getTime()); |
| }; |
| } |
| |
| private static final class AttemptLaunchedTransition |
| extends RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| |
| if(app.launchTime == 0) { |
| LOG.info("update the launch time for applicationId: "+ |
| app.getApplicationId()+", attemptId: "+ |
| app.getCurrentAppAttempt().getAppAttemptId()+ |
| "launchTime: "+event.getTimestamp()); |
| ApplicationStateData appState = ApplicationStateData.newInstance( |
| app.submitTime, app.startTime, app.submissionContext, app.user, |
| app.callerContext); |
| appState.setApplicationTimeouts(app.getApplicationTimeouts()); |
| appState.setLaunchTime(event.getTimestamp()); |
| app.rmContext.getStateStore().updateApplicationState(appState, false); |
| app.launchTime = event.getTimestamp(); |
| app.rmContext.getSystemMetricsPublisher().appLaunched( |
| app, app.launchTime); |
| } |
| } |
| } |
| |
| private static final class AppRunningOnNodeTransition extends RMAppTransition { |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; |
| |
| // if final state already stored, notify RMNode |
| if (isAppInFinalState(app)) { |
| app.handler.handle( |
| new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent |
| .getApplicationId())); |
| return; |
| } |
| |
| // otherwise, add it to ranNodes for further process |
| app.ranNodes.add(nodeAddedEvent.getNodeId()); |
| |
| app.logAggregation.addReportIfNecessary( |
| nodeAddedEvent.getNodeId(), app.getApplicationId()); |
| } |
| } |
| |
| // synchronously recover attempt to ensure any incoming external events |
| // to be processed after the attempt processes the recover event. |
| private void recoverAppAttempts() { |
| for (RMAppAttempt attempt : getAppAttempts().values()) { |
| attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(), |
| RMAppAttemptEventType.RECOVER)); |
| } |
| } |
| |
| private static final class RMAppRecoveredTransition implements |
| MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { |
| |
| @Override |
| public RMAppState transition(RMAppImpl app, RMAppEvent event) { |
| |
| RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event; |
| app.recover(recoverEvent.getRMState()); |
| // The app has completed. |
| if (app.recoveredFinalState != null) { |
| app.recoverAppAttempts(); |
| new FinalTransition(app.recoveredFinalState).transition(app, event); |
| return app.recoveredFinalState; |
| } |
| |
| if (UserGroupInformation.isSecurityEnabled()) { |
| // asynchronously renew delegation token on recovery. |
| try { |
| app.rmContext.getDelegationTokenRenewer() |
| .addApplicationAsyncDuringRecovery(app.getApplicationId(), |
| BuilderUtils.parseCredentials(app.submissionContext), |
| app.submissionContext.getCancelTokensWhenComplete(), |
| app.getUser(), |
| BuilderUtils.parseTokensConf(app.submissionContext)); |
| } catch (Exception e) { |
| String msg = "Failed to fetch user credentials from application:" + e |
| .getMessage(); |
| app.diagnostics.append(msg); |
| LOG.error(msg, e); |
| } |
| } |
| |
| for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts |
| .entrySet()) { |
| app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, |
| timeout.getKey(), timeout.getValue()); |
| if (LOG.isDebugEnabled()) { |
| long remainingTime = timeout.getValue() - app.systemClock.getTime(); |
| LOG.debug("Application " + app.applicationId |
| + " is registered for timeout monitor, type=" + timeout.getKey() |
| + " remaining timeout=" + (remainingTime > 0 ? |
| remainingTime / 1000 : |
| 0) + " seconds"); |
| } |
| } |
| |
| // No existent attempts means the attempt associated with this app was not |
| // started or started but not yet saved. |
| if (app.attempts.isEmpty()) { |
| app.scheduler.handle( |
| new AppAddedSchedulerEvent(app.user, app.submissionContext, false, |
| app.applicationPriority, app.placementContext)); |
| return RMAppState.SUBMITTED; |
| } |
| |
| // Add application to scheduler synchronously to guarantee scheduler |
| // knows applications before AM or NM re-registers. |
| app.scheduler.handle( |
| new AppAddedSchedulerEvent(app.user, app.submissionContext, true, |
| app.applicationPriority, app.placementContext)); |
| |
| // recover attempts |
| app.recoverAppAttempts(); |
| |
| // YARN-1507 is saving the application state after the application is |
| // accepted. So after YARN-1507, an app is saved meaning it is accepted. |
| // Thus we return ACCECPTED state on recovery. |
| return RMAppState.ACCEPTED; |
| } |
| } |
| |
| private static final class AddApplicationToSchedulerTransition extends |
| RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.handler.handle( |
| new AppAddedSchedulerEvent(app.user, app.submissionContext, false, |
| app.applicationPriority, app.placementContext)); |
| // send the ATS create Event |
| app.sendATSCreateEvent(); |
| } |
| } |
| |
| private static final class StartAppAttemptTransition extends RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.createAndStartNewAttempt(false); |
| }; |
| } |
| |
| private static final class FinalStateSavedTransition implements |
| MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { |
| |
| @Override |
| public RMAppState transition(RMAppImpl app, RMAppEvent event) { |
| Map<ApplicationTimeoutType, Long> timeouts = |
| app.submissionContext.getApplicationTimeouts(); |
| if (timeouts != null && timeouts.size() > 0) { |
| app.rmContext.getRMAppLifetimeMonitor() |
| .unregisterApp(app.getApplicationId(), timeouts.keySet()); |
| } |
| |
| if (app.transitionTodo instanceof SingleArcTransition) { |
| ((SingleArcTransition) app.transitionTodo).transition(app, |
| app.eventCausingFinalSaving); |
| } else if (app.transitionTodo instanceof MultipleArcTransition) { |
| ((MultipleArcTransition) app.transitionTodo).transition(app, |
| app.eventCausingFinalSaving); |
| } |
| return app.targetedFinalState; |
| } |
| } |
| |
| private static class AttemptFailedFinalStateSavedTransition extends |
| RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| String msg = null; |
| if (event instanceof RMAppFailedAttemptEvent) { |
| msg = app.getAppAttemptFailedDiagnostics(event); |
| } |
| LOG.info(msg); |
| app.diagnostics.append(msg); |
| // Inform the node for app-finish |
| new FinalTransition(RMAppState.FAILED).transition(app, event); |
| } |
| } |
| |
| private String getAppAttemptFailedDiagnostics(RMAppEvent event) { |
| String msg = null; |
| RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; |
| if (this.submissionContext.getUnmanagedAM()) { |
| // RM does not manage the AM. Do not retry |
| msg = "Unmanaged application " + this.getApplicationId() |
| + " failed due to " + failedEvent.getDiagnosticMsg() |
| + ". Failing the application."; |
| } else if (this.isNumAttemptsBeyondThreshold) { |
| int globalLimit = conf.getInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS, |
| conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); |
| msg = String.format( |
| "Application %s failed %d times%s%s due to %s. Failing the application.", |
| getApplicationId(), |
| maxAppAttempts, |
| (attemptFailuresValidityInterval <= 0 ? "" |
| : (" in previous " + attemptFailuresValidityInterval |
| + " milliseconds")), |
| (globalLimit == maxAppAttempts) ? "" |
| : (" (global limit =" + globalLimit |
| + "; local limit is =" + maxAppAttempts + ")"), |
| failedEvent.getDiagnosticMsg()); |
| } |
| return msg; |
| } |
| |
| private static final class RMAppNewlySavingTransition extends RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| |
| long applicationLifetime = |
| app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); |
| applicationLifetime = app.scheduler |
| .checkAndGetApplicationLifetime(app.queue, applicationLifetime); |
| if (applicationLifetime > 0) { |
| // calculate next timeout value |
| Long newTimeout = |
| Long.valueOf(app.submitTime + (applicationLifetime * 1000)); |
| app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, |
| ApplicationTimeoutType.LIFETIME, newTimeout); |
| |
| // update applicationTimeouts with new absolute value. |
| app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, |
| newTimeout); |
| |
| LOG.info("Application " + app.applicationId |
| + " is registered for timeout monitor, type=" |
| + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime |
| + " seconds"); |
| } |
| |
| // If recovery is enabled then store the application information in a |
| // non-blocking call so make sure that RM has stored the information |
| // needed to restart the AM after RM restart without further client |
| // communication |
| LOG.info("Storing application with id " + app.applicationId); |
| app.rmContext.getStateStore().storeNewApplication(app); |
| } |
| } |
| |
| private void rememberTargetTransitions(RMAppEvent event, |
| Object transitionToDo, RMAppState targetFinalState) { |
| transitionTodo = transitionToDo; |
| targetedFinalState = targetFinalState; |
| eventCausingFinalSaving = event; |
| } |
| |
| private void rememberTargetTransitionsAndStoreState(RMAppEvent event, |
| Object transitionToDo, RMAppState targetFinalState, |
| RMAppState stateToBeStored) { |
| rememberTargetTransitions(event, transitionToDo, targetFinalState); |
| this.stateBeforeFinalSaving = getState(); |
| this.storedFinishTime = this.systemClock.getTime(); |
| |
| LOG.info("Updating application " + this.applicationId |
| + " with final state: " + this.targetedFinalState); |
| // we lost attempt_finished diagnostics in app, because attempt_finished |
| // diagnostics is sent after app final state is saved. Later on, we will |
| // create GetApplicationAttemptReport specifically for getting per attempt |
| // info. |
| String diags = null; |
| switch (event.getType()) { |
| case APP_REJECTED: |
| case ATTEMPT_FINISHED: |
| case ATTEMPT_KILLED: |
| diags = event.getDiagnosticMsg(); |
| break; |
| case ATTEMPT_FAILED: |
| RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; |
| diags = getAppAttemptFailedDiagnostics(failedEvent); |
| break; |
| default: |
| break; |
| } |
| |
| ApplicationStateData appState = |
| ApplicationStateData.newInstance(this.submitTime, this.startTime, |
| this.getUser(), this.getRealUser(), this.submissionContext, |
| stateToBeStored, diags, this.launchTime, this.storedFinishTime, |
| this.callerContext); |
| appState.setApplicationTimeouts(this.applicationTimeouts); |
| this.rmContext.getStateStore().updateApplicationState(appState); |
| } |
| |
| private static final class FinalSavingTransition extends RMAppTransition { |
| Object transitionToDo; |
| RMAppState targetedFinalState; |
| RMAppState stateToBeStored; |
| |
| public FinalSavingTransition(Object transitionToDo, |
| RMAppState targetedFinalState) { |
| this(transitionToDo, targetedFinalState, targetedFinalState); |
| } |
| |
| public FinalSavingTransition(Object transitionToDo, |
| RMAppState targetedFinalState, RMAppState stateToBeStored) { |
| this.transitionToDo = transitionToDo; |
| this.targetedFinalState = targetedFinalState; |
| this.stateToBeStored = stateToBeStored; |
| } |
| |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.rememberTargetTransitionsAndStoreState(event, transitionToDo, |
| targetedFinalState, stateToBeStored); |
| } |
| } |
| |
| private static class AttemptUnregisteredTransition extends RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.finishTime = app.storedFinishTime; |
| } |
| } |
| |
| private static class AppFinishedTransition extends FinalTransition { |
| public AppFinishedTransition() { |
| super(RMAppState.FINISHED); |
| } |
| |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.diagnostics.append(event.getDiagnosticMsg()); |
| super.transition(app, event); |
| }; |
| } |
| |
| private static class AttemptFinishedAtFinalSavingTransition extends |
| RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| if (app.targetedFinalState.equals(RMAppState.FAILED) |
| || app.targetedFinalState.equals(RMAppState.KILLED)) { |
| // Ignore Attempt_Finished event if we were supposed to reach FAILED |
| // FINISHED state |
| return; |
| } |
| |
| // pass in the earlier attempt_unregistered event, as it is needed in |
| // AppFinishedFinalStateSavedTransition later on |
| app.rememberTargetTransitions(event, |
| new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving), |
| RMAppState.FINISHED); |
| }; |
| } |
| |
| private static class AppFinishedFinalStateSavedTransition extends |
| RMAppTransition { |
| RMAppEvent attemptUnregistered; |
| |
| public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) { |
| this.attemptUnregistered = attemptUnregistered; |
| } |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| new AttemptUnregisteredTransition().transition(app, attemptUnregistered); |
| FINISHED_TRANSITION.transition(app, event); |
| }; |
| } |
| |
| /** |
| * Log the audit event for kill by client. |
| * |
| * @param event |
| * The {@link RMAppEvent} to be logged |
| */ |
| static void auditLogKillEvent(RMAppEvent event) { |
| if (event instanceof RMAppKillByClientEvent) { |
| RMAppKillByClientEvent killEvent = (RMAppKillByClientEvent) event; |
| UserGroupInformation callerUGI = killEvent.getCallerUGI(); |
| String userName = null; |
| if (callerUGI != null) { |
| userName = callerUGI.getShortUserName(); |
| } |
| InetAddress remoteIP = killEvent.getIp(); |
| RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST, |
| "RMAppImpl", event.getApplicationId(), remoteIP); |
| } |
| } |
| |
| private static class AppKilledTransition extends FinalTransition { |
| public AppKilledTransition() { |
| super(RMAppState.KILLED); |
| } |
| |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.diagnostics.append(event.getDiagnosticMsg()); |
| super.transition(app, event); |
| RMAppImpl.auditLogKillEvent(event); |
| }; |
| } |
| |
| private static class KillAttemptTransition extends RMAppTransition { |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.stateBeforeKilling = app.getState(); |
| // Forward app kill diagnostics in the event to kill app attempt. |
| // These diagnostics will be returned back in ATTEMPT_KILLED event sent by |
| // RMAppAttemptImpl. |
| app.handler.handle( |
| new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), |
| RMAppAttemptEventType.KILL, event.getDiagnosticMsg())); |
| RMAppImpl.auditLogKillEvent(event); |
| } |
| } |
| |
| private static final class AppRejectedTransition extends FinalTransition { |
| public AppRejectedTransition() { |
| super(RMAppState.FAILED); |
| } |
| |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| app.diagnostics.append(event.getDiagnosticMsg()); |
| super.transition(app, event); |
| }; |
| } |
| |
| /** |
| * Attempt to perform a type-specific cleanup after application has completed. |
| * |
| * @param app application to clean up |
| */ |
| static void appAdminClientCleanUp(RMAppImpl app) { |
| try { |
| AppAdminClient client = AppAdminClient.createAppAdminClient(app |
| .applicationType, app.conf); |
| int result = client.actionCleanUp(app.name, app.user); |
| if (result == 0) { |
| LOG.info("Type-specific cleanup of application " + app.applicationId |
| + " of type " + app.applicationType + " succeeded"); |
| } else { |
| LOG.warn("Type-specific cleanup of application " + app.applicationId |
| + " of type " + app.applicationType + " did not succeed with exit" |
| + " code " + result); |
| } |
| } catch (IllegalArgumentException e) { |
| // no AppAdminClient class has been specified for the application type, |
| // so this does not need to be logged |
| } catch (Exception e) { |
| LOG.warn("Could not run type-specific cleanup on application " + |
| app.applicationId + " of type " + app.applicationType, e); |
| } |
| } |
| |
| private static class FinalTransition extends RMAppTransition { |
| |
| private final RMAppState finalState; |
| |
| FinalTransition(RMAppState finalState) { |
| this.finalState = finalState; |
| } |
| |
| @Override |
| public void transition(RMAppImpl app, RMAppEvent event) { |
| completeAndCleanupApp(app); |
| handleAppFinished(app); |
| app.clearUnusedFields(); |
| appAdminClientCleanUp(app); |
| } |
| |
| private void completeAndCleanupApp(RMAppImpl app) { |
| //cleanup app in RM Nodes |
| for (NodeId nodeId : app.getRanNodes()) { |
| app.handler.handle( |
| new RMNodeCleanAppEvent(nodeId, app.applicationId)); |
| } |
| app.ranNodes.clear(); |
| // Recovered apps that are completed were not added to scheduler, so no |
| // need to remove them from scheduler. |
| if (app.recoveredFinalState == null) { |
| app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, |
| finalState)); |
| } |
| |
| app.handler.handle(new RMAppManagerEvent(app.applicationId, |
| RMAppManagerEventType.APP_COMPLETED)); |
| } |
| |
| private void handleAppFinished(RMAppImpl app) { |
| app.logAggregation |
| .recordLogAggregationStartTime(app.systemClock.getTime()); |
| // record finish time |
| app.finishTime = app.storedFinishTime; |
| if (app.finishTime == 0) { |
| app.finishTime = app.systemClock.getTime(); |
| } |
| |
| //record finish in history and metrics |
| app.rmContext.getRMApplicationHistoryWriter() |
| .applicationFinished(app, finalState); |
| app.rmContext.getSystemMetricsPublisher() |
| .appFinished(app, finalState, app.finishTime); |
| } |
| } |
| |
| public int getNumFailedAppAttempts() { |
| int completedAttempts = 0; |
| // Do not count AM preemption, hardware failures or NM resync |
| // as attempt failure. |
| for (RMAppAttempt attempt : attempts.values()) { |
| if (attempt.shouldCountTowardsMaxAttemptRetry()) { |
| completedAttempts++; |
| } |
| } |
| return completedAttempts; |
| } |
| |
| private static final class AttemptFailedTransition implements |
| MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { |
| |
| private final RMAppState initialState; |
| |
| public AttemptFailedTransition(RMAppState initialState) { |
| this.initialState = initialState; |
| } |
| |
| @Override |
| public RMAppState transition(RMAppImpl app, RMAppEvent event) { |
| int numberOfFailure = app.getNumFailedAppAttempts(); |
| if (app.maxAppAttempts == 1) { |
| // If the user explicitly set the attempts to 1 then there are likely |
| // correctness issues if the AM restarts for any reason. |
| LOG.info("Max app attempts is 1 for " + app.applicationId |
| + ", preventing further attempts."); |
| numberOfFailure = app.maxAppAttempts; |
| } else { |
| LOG.info("The number of failed attempts" |
| + (app.attemptFailuresValidityInterval > 0 ? " in previous " |
| + app.attemptFailuresValidityInterval + " milliseconds " : " ") |
| + "is " + numberOfFailure + ". The max attempts is " |
| + app.maxAppAttempts); |
| |
| if (app.attemptFailuresValidityInterval > 0) { |
| removeExcessAttempts(app); |
| } |
| } |
| |
| if (!app.submissionContext.getUnmanagedAM() |
| && numberOfFailure < app.maxAppAttempts) { |
| if (initialState.equals(RMAppState.KILLING)) { |
| // If this is not last attempt, app should be killed instead of |
| // launching a new attempt |
| app.rememberTargetTransitionsAndStoreState(event, |
| new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED); |
| return RMAppState.FINAL_SAVING; |
| } |
| |
| boolean transferStateFromPreviousAttempt; |
| RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; |
| transferStateFromPreviousAttempt = |
| failedEvent.getTransferStateFromPreviousAttempt(); |
| |
| RMAppAttempt oldAttempt = app.currentAttempt; |
| app.createAndStartNewAttempt(transferStateFromPreviousAttempt); |
| // Transfer the state from the previous attempt to the current attempt. |
| // Note that the previous failed attempt may still be collecting the |
| // container events from the scheduler and update its data structures |
| // before the new attempt is created. We always transferState for |
| // finished containers so that they can be acked to NM, |
| // but when pulling finished container we will check this flag again. |
| ((RMAppAttemptImpl) app.currentAttempt) |
| .transferStateFromAttempt(oldAttempt); |
| return initialState; |
| } else { |
| if (numberOfFailure >= app.maxAppAttempts) { |
| app.isNumAttemptsBeyondThreshold = true; |
| } |
| app.rememberTargetTransitionsAndStoreState(event, |
| new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, |
| RMAppState.FAILED); |
| return RMAppState.FINAL_SAVING; |
| } |
| } |
| |
| private void removeExcessAttempts(RMAppImpl app) { |
| while (app.nextAttemptId |
| - app.firstAttemptIdInStateStore > app.maxAppAttempts) { |
| // attempts' first element is oldest attempt because it is a |
| // LinkedHashMap |
| ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( |
| app.getApplicationId(), app.firstAttemptIdInStateStore); |
| RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId); |
| long endTime = app.systemClock.getTime(); |
| if (rmAppAttempt.getFinishTime() < (endTime |
| - app.attemptFailuresValidityInterval)) { |
| app.firstAttemptIdInStateStore++; |
| LOG.info("Remove attempt from state store : " + attemptId); |
| app.rmContext.getStateStore().removeApplicationAttempt(attemptId); |
| } else { |
| break; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public String getApplicationType() { |
| return this.applicationType; |
| } |
| |
| @Override |
| public Set<String> getApplicationTags() { |
| return this.applicationTags; |
| } |
| |
| @Override |
| public boolean isAppFinalStateStored() { |
| RMAppState state = getState(); |
| return state.equals(RMAppState.FINISHING) |
| || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED) |
| || state.equals(RMAppState.KILLED); |
| } |
| |
| @Override |
| public YarnApplicationState createApplicationState() { |
| RMAppState rmAppState = getState(); |
| // If App is in FINAL_SAVING state, return its previous state. |
| if (rmAppState.equals(RMAppState.FINAL_SAVING)) { |
| rmAppState = stateBeforeFinalSaving; |
| } |
| if (rmAppState.equals(RMAppState.KILLING)) { |
| rmAppState = stateBeforeKilling; |
| } |
| return RMServerUtils.createApplicationState(rmAppState); |
| } |
| |
| public static boolean isAppInFinalState(RMApp rmApp) { |
| RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState(); |
| if (appState == null) { |
| appState = rmApp.getState(); |
| } |
| return appState == RMAppState.FAILED || appState == RMAppState.FINISHED |
| || appState == RMAppState.KILLED; |
| } |
| |
| @Override |
| public boolean isAppInCompletedStates() { |
| RMAppState appState = getState(); |
| return appState == RMAppState.FINISHED || appState == RMAppState.FINISHING |
| || appState == RMAppState.FAILED || appState == RMAppState.KILLED |
| || appState == RMAppState.FINAL_SAVING |
| || appState == RMAppState.KILLING; |
| } |
| |
| @Override |
| public ApplicationPlacementContext getApplicationPlacementContext() { |
| return placementContext; |
| } |
| |
| public RMAppState getRecoveredFinalState() { |
| return this.recoveredFinalState; |
| } |
| |
| @Override |
| public Set<NodeId> getRanNodes() { |
| return ranNodes; |
| } |
| |
| @Override |
| public RMAppMetrics getRMAppMetrics() { |
| Resource resourcePreempted = Resource.newInstance(0, 0); |
| int numAMContainerPreempted = 0; |
| int numNonAMContainerPreempted = 0; |
| Map<String, Long> resourceSecondsMap = new HashMap<>(); |
| Map<String, Long> preemptedSecondsMap = new HashMap<>(); |
| int totalAllocatedContainers = 0; |
| this.readLock.lock(); |
| try { |
| for (RMAppAttempt attempt : attempts.values()) { |
| if (null != attempt) { |
| RMAppAttemptMetrics attemptMetrics = |
| attempt.getRMAppAttemptMetrics(); |
| Resources.addTo(resourcePreempted, |
| attemptMetrics.getResourcePreempted()); |
| numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0; |
| numNonAMContainerPreempted += |
| attemptMetrics.getNumNonAMContainersPreempted(); |
| // getAggregateAppResourceUsage() will calculate resource usage stats |
| // for both running and finished containers. |
| AggregateAppResourceUsage resUsage = |
| attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage(); |
| for (Map.Entry<String, Long> entry : resUsage |
| .getResourceUsageSecondsMap().entrySet()) { |
| long value = RMServerUtils |
| .getOrDefault(resourceSecondsMap, entry.getKey(), 0L); |
| value += entry.getValue(); |
| resourceSecondsMap.put(entry.getKey(), value); |
| } |
| for (Map.Entry<String, Long> entry : attemptMetrics |
| .getPreemptedResourceSecondsMap().entrySet()) { |
| long value = RMServerUtils |
| .getOrDefault(preemptedSecondsMap, entry.getKey(), 0L); |
| value += entry.getValue(); |
| preemptedSecondsMap.put(entry.getKey(), value); |
| } |
| totalAllocatedContainers += |
| attemptMetrics.getTotalAllocatedContainers(); |
| } |
| } |
| } finally { |
| this.readLock.unlock(); |
| } |
| |
| return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted, |
| numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap, |
| totalAllocatedContainers); |
| } |
| |
| @Private |
| @VisibleForTesting |
| public void setSystemClock(Clock clock) { |
| this.systemClock = clock; |
| } |
| |
| @Override |
| public ReservationId getReservationId() { |
| return submissionContext.getReservationID(); |
| } |
| |
| @Override |
| public List<ResourceRequest> getAMResourceRequests() { |
| return this.amReqs; |
| } |
| |
| @Override |
| public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() { |
| return logAggregation.getLogAggregationReportsForApp(this); |
| } |
| |
| public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { |
| logAggregation.aggregateLogReport(nodeId, report, this); |
| } |
| |
| public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { |
| return logAggregation.getLogAggregationFailureMessagesForNM(nodeId); |
| } |
| |
| @Override |
| public LogAggregationStatus getLogAggregationStatusForAppReport() { |
| return logAggregation |
| .getLogAggregationStatusForAppReport(this); |
| } |
| |
| @Override |
| public String getAppNodeLabelExpression() { |
| String appNodeLabelExpression = |
| getApplicationSubmissionContext().getNodeLabelExpression(); |
| appNodeLabelExpression = (appNodeLabelExpression == null) |
| ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : appNodeLabelExpression; |
| appNodeLabelExpression = (appNodeLabelExpression.trim().isEmpty()) |
| ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appNodeLabelExpression; |
| return appNodeLabelExpression; |
| } |
| |
| @Override |
| public String getAmNodeLabelExpression() { |
| String amNodeLabelExpression = null; |
| if (!getApplicationSubmissionContext().getUnmanagedAM()) { |
| amNodeLabelExpression = |
| getAMResourceRequests() != null && !getAMResourceRequests().isEmpty() |
| ? getAMResourceRequests().get(0).getNodeLabelExpression() : null; |
| amNodeLabelExpression = (amNodeLabelExpression == null) |
| ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression; |
| amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty()) |
| ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : amNodeLabelExpression; |
| } |
| return amNodeLabelExpression; |
| } |
| |
| @Override |
| public CallerContext getCallerContext() { |
| return callerContext; |
| } |
| |
| private void sendATSCreateEvent() { |
| rmContext.getRMApplicationHistoryWriter().applicationStarted(this); |
| rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime); |
| String appViewACLs = submissionContext.getAMContainerSpec() |
| .getApplicationACLs().get(ApplicationAccessType.VIEW_APP); |
| rmContext.getSystemMetricsPublisher().appACLsUpdated( |
| this, appViewACLs, systemClock.getTime()); |
| } |
| |
| @Private |
| @VisibleForTesting |
| public int getNextAttemptId() { |
| return nextAttemptId; |
| } |
| |
| private long getApplicationLifetime(ApplicationTimeoutType type) { |
| Map<ApplicationTimeoutType, Long> timeouts = |
| this.submissionContext.getApplicationTimeouts(); |
| long applicationLifetime = -1; |
| if (timeouts != null && timeouts.containsKey(type)) { |
| applicationLifetime = timeouts.get(type); |
| } |
| return applicationLifetime; |
| } |
| |
| @Override |
| public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() { |
| this.readLock.lock(); |
| try { |
| return new HashMap(this.applicationTimeouts); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| public void updateApplicationTimeout( |
| Map<ApplicationTimeoutType, Long> updateTimeout) { |
| this.writeLock.lock(); |
| try { |
| if (COMPLETED_APP_STATES.contains(getState())) { |
| return; |
| } |
| // update monitoring service |
| this.rmContext.getRMAppLifetimeMonitor() |
| .updateApplicationTimeouts(getApplicationId(), updateTimeout); |
| this.applicationTimeouts.putAll(updateTimeout); |
| |
| } finally { |
| this.writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Priority getApplicationPriority() { |
| return applicationPriority; |
| } |
| |
| public void setApplicationPriority(Priority applicationPriority) { |
| this.applicationPriority = applicationPriority; |
| } |
| |
| /** |
| * Clear Unused fields to free memory. |
| */ |
| private void clearUnusedFields() { |
| this.submissionContext.setAMContainerSpec(null); |
| this.submissionContext.setLogAggregationContext(null); |
| } |
| |
| @Override |
| public Map<String, String> getApplicationSchedulingEnvs() { |
| return this.applicationSchedulingEnvs; |
| } |
| |
| /** |
| * catch the InvalidStateTransition. |
| * |
| * @param state RMAppState. |
| * @param rmAppEventType RMAppEventType. |
| */ |
| protected void onInvalidStateTransition(RMAppEventType rmAppEventType, |
| RMAppState state){ |
| /* TODO fail the application on the failed transition */ |
| } |
| |
| @VisibleForTesting |
| long getLogAggregationStartTime() { |
| return logAggregation.getLogAggregationStartTime(); |
| } |
| |
| Clock getSystemClock() { |
| return systemClock; |
| } |
| |
| @Override |
| public String getRealUser() { |
| UserGroupInformation realUserUgi = this.userUgi.getRealUser(); |
| return (realUserUgi != null) ? realUserUgi.getShortUserName() : null; |
| } |
| } |