blob: c8cdf463d5baba0e6a8ec7fcbf15264c0c99b5ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
public class RMAppImpl implements RMApp {
private static final Log LOG = LogFactory.getLog(RMAppImpl.class);
// Immutable fields
private final ApplicationId applicationId;
private final RMContext rmContext;
private final Configuration conf;
private final String user;
private final String queue;
private final String name;
private final ApplicationSubmissionContext submissionContext;
private final String clientTokenStr;
private final ApplicationStore appStore;
private final Dispatcher dispatcher;
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxRetries;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
= new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
private final long submitTime;
// Mutable fields
private long startTime;
private long finishTime;
private RMAppAttempt currentAttempt;
@SuppressWarnings("rawtypes")
private EventHandler handler;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FAILED,
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
// Transitions from SUBMITTED state
.addTransition(RMAppState.SUBMITTED, RMAppState.FAILED,
RMAppEventType.APP_REJECTED, new AppRejectedTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
.addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
.addTransition(RMAppState.RUNNING,
EnumSet.of(RMAppState.SUBMITTED, RMAppState.FAILED),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from FINISHED state
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
RMAppEventType.KILL)
// Transitions from FAILED state
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.KILL)
// Transitions from KILLED state
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.ATTEMPT_KILLED))
.installTopology();
private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
stateMachine;
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
ApplicationStore appStore,
YarnScheduler scheduler, ApplicationMasterService masterService,
long submitTime) {
this.applicationId = applicationId;
this.name = name;
this.rmContext = rmContext;
this.dispatcher = rmContext.getDispatcher();
this.handler = dispatcher.getEventHandler();
this.conf = config;
this.user = user;
this.queue = queue;
this.submissionContext = submissionContext;
this.clientTokenStr = clientTokenStr;
this.appStore = appStore;
this.scheduler = scheduler;
this.masterService = masterService;
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
}
@Override
public ApplicationId getApplicationId() {
return this.applicationId;
}
@Override
public FinalApplicationStatus getFinalApplicationStatus() {
this.readLock.lock();
try {
// finish state is obtained based on the state machine's current state
// as a fall-back in case the application has not been unregistered
// ( or if the app never unregistered itself )
// when the report is requested
if (currentAttempt != null
&& currentAttempt.getFinalApplicationStatus() != null) {
return currentAttempt.getFinalApplicationStatus();
}
return
createFinalApplicationStatus(this.stateMachine.getCurrentState());
} finally {
this.readLock.unlock();
}
}
@Override
public RMAppState getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public String getUser() {
return this.user;
}
@Override
public float getProgress() {
this.readLock.lock();
try {
if (this.currentAttempt != null) {
return this.currentAttempt.getProgress();
}
return 0;
} finally {
this.readLock.unlock();
}
}
@Override
public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
this.readLock.lock();
try {
return this.attempts.get(appAttemptId);
} finally {
this.readLock.unlock();
}
}
@Override
public String getQueue() {
return this.queue;
}
@Override
public String getName() {
return this.name;
}
@Override
public RMAppAttempt getCurrentAppAttempt() {
this.readLock.lock();
try {
return this.currentAttempt;
} finally {
this.readLock.unlock();
}
}
@Override
public ApplicationStore getApplicationStore() {
return this.appStore;
}
private YarnApplicationState createApplicationState(RMAppState rmAppState) {
switch(rmAppState) {
case NEW:
return YarnApplicationState.NEW;
case SUBMITTED:
case ACCEPTED:
return YarnApplicationState.SUBMITTED;
case RUNNING:
return YarnApplicationState.RUNNING;
case FINISHED:
return YarnApplicationState.FINISHED;
case KILLED:
return YarnApplicationState.KILLED;
case FAILED:
return YarnApplicationState.FAILED;
}
throw new YarnException("Unknown state passed!");
}
private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
switch(state) {
case NEW:
case SUBMITTED:
case ACCEPTED:
case RUNNING:
return FinalApplicationStatus.UNDEFINED;
// finished without a proper final state is the same as failed
case FINISHED:
case FAILED:
return FinalApplicationStatus.FAILED;
case KILLED:
return FinalApplicationStatus.KILLED;
}
throw new YarnException("Unknown state passed!");
}
@Override
public ApplicationReport createAndGetApplicationReport() {
this.readLock.lock();
try {
String clientToken = "N/A";
String trackingUrl = "N/A";
String host = "N/A";
String origTrackingUrl = "N/A";
int rpcPort = -1;
ApplicationResourceUsageReport appUsageReport = null;
FinalApplicationStatus finishState = getFinalApplicationStatus();
if (this.currentAttempt != null) {
trackingUrl = this.currentAttempt.getTrackingUrl();
origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
clientToken = this.currentAttempt.getClientToken();
host = this.currentAttempt.getHost();
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
}
return BuilderUtils.newApplicationReport(this.applicationId, this.user,
this.queue, this.name, host, rpcPort, clientToken,
createApplicationState(this.stateMachine.getCurrentState()),
this.diagnostics.toString(), trackingUrl,
this.startTime, this.finishTime, finishState, appUsageReport,
origTrackingUrl);
} finally {
this.readLock.unlock();
}
}
@Override
public long getFinishTime() {
this.readLock.lock();
try {
return this.finishTime;
} finally {
this.readLock.unlock();
}
}
@Override
public long getStartTime() {
this.readLock.lock();
try {
return this.startTime;
} finally {
this.readLock.unlock();
}
}
@Override
public long getSubmitTime() {
return this.submitTime;
}
@Override
public String getTrackingUrl() {
this.readLock.lock();
try {
if (this.currentAttempt != null) {
return this.currentAttempt.getTrackingUrl();
}
return null;
} finally {
this.readLock.unlock();
}
}
@Override
public StringBuilder getDiagnostics() {
this.readLock.lock();
try {
return this.diagnostics;
} finally {
this.readLock.unlock();
}
}
@Override
public void handle(RMAppEvent event) {
this.writeLock.lock();
try {
ApplicationId appID = event.getApplicationId();
LOG.debug("Processing event for " + appID + " of type "
+ event.getType());
final RMAppState oldState = getState();
try {
/* keep the master in sync with the state machine */
this.stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
/* TODO fail the application on the failed transition */
}
if (oldState != getState()) {
LOG.info(appID + " State change from " + oldState + " to "
+ getState());
}
} finally {
this.writeLock.unlock();
}
}
@SuppressWarnings("unchecked")
private void createNewAttempt() {
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
appAttemptId.setApplicationId(applicationId);
appAttemptId.setAttemptId(attempts.size() + 1);
RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId,
clientTokenStr, rmContext, scheduler, masterService,
submissionContext, YarnConfiguration.getProxyHostAndPort(conf));
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
}
private static class RMAppTransition implements
SingleArcTransition<RMAppImpl, RMAppEvent> {
public void transition(RMAppImpl app, RMAppEvent event) {
};
}
private static final class StartAppAttemptTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
app.createNewAttempt();
};
}
private static class AppKilledTransition extends FinalTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append("Application killed by user.");
super.transition(app, event);
};
}
private static class KillAppAndAttemptTransition extends AppKilledTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
super.transition(app, event);
}
}
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
app.diagnostics.append(rejectedEvent.getMessage());
super.transition(app, event);
};
}
private static class FinalTransition extends RMAppTransition {
private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
Set<NodeId> nodes = new HashSet<NodeId>();
for (RMAppAttempt attempt : app.attempts.values()) {
nodes.addAll(attempt.getRanNodes());
}
return nodes;
}
@SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
app.finishTime = System.currentTimeMillis();
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
};
}
private static final class AttemptFailedTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
private final RMAppState initialState;
public AttemptFailedTransition(RMAppState initialState) {
this.initialState = initialState;
}
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent)event);
if (app.attempts.size() == app.maxRetries) {
String msg = "Application " + app.getApplicationId()
+ " failed " + app.maxRetries
+ " times due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
LOG.info(msg);
app.diagnostics.append(msg);
// Inform the node for app-finish
FINAL_TRANSITION.transition(app, event);
return RMAppState.FAILED;
}
app.createNewAttempt();
return initialState;
}
}
}