blob: e48e2fae55ae6b8d6af4114f248f59287fc7aba3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
/**
* The state machine for the representation of an Application
* within the NodeManager.
*/
public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
final ApplicationACLsManager aclsManager;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Context context;
private static final Log LOG = LogFactory.getLog(Application.class);
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
public ApplicationImpl(Dispatcher dispatcher,
ApplicationACLsManager aclsManager, String user, ApplicationId appId,
Credentials credentials, Context context) {
this.dispatcher = dispatcher;
this.user = user.toString();
this.appId = appId;
this.credentials = credentials;
this.aclsManager = aclsManager;
this.context = context;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
stateMachine = stateMachineFactory.make(this);
}
@Override
public String getUser() {
return user.toString();
}
@Override
public ApplicationId getAppId() {
return appId;
}
@Override
public ApplicationState getApplicationState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public Map<ContainerId, Container> getContainers() {
this.readLock.lock();
try {
return this.containers;
} finally {
this.readLock.unlock();
}
}
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
private static StateMachineFactory<ApplicationImpl, ApplicationState,
ApplicationEventType, ApplicationEvent> stateMachineFactory =
new StateMachineFactory<ApplicationImpl, ApplicationState,
ApplicationEventType, ApplicationEvent>(ApplicationState.NEW)
// Transitions from NEW state
.addTransition(ApplicationState.NEW, ApplicationState.INITING,
ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
.addTransition(ApplicationState.NEW, ApplicationState.NEW,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
// Transitions from INITING state
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
.addTransition(ApplicationState.INITING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
CONTAINER_DONE_TRANSITION)
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
new AppLogInitDoneTransition())
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
new AppLogInitFailTransition())
.addTransition(ApplicationState.INITING, ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_INITED,
new AppInitDoneTransition())
// Transitions from RUNNING state
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.INIT_CONTAINER,
new InitContainerTransition())
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
CONTAINER_DONE_TRANSITION)
.addTransition(
ApplicationState.RUNNING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
// Transitions from FINISHING_CONTAINERS_WAIT state.
.addTransition(
ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
new AppFinishTransition())
// Transitions from APPLICATION_RESOURCES_CLEANINGUP state
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED)
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
// Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
new AppLogsAggregatedTransition())
// create the topology tables
.installTopology();
private final StateMachine<ApplicationState, ApplicationEventType, ApplicationEvent> stateMachine;
/**
* Notify services of new application.
*
* In particular, this initializes the {@link LogAggregationService}
*/
@SuppressWarnings("unchecked")
static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs));
}
}
/**
* Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after
* {@link LogAggregationService} has created the directories for the app
* and started the aggregation thread for the app.
*
* In particular, this requests that the {@link ResourceLocalizationService}
* localize the application-scoped resources.
*/
@SuppressWarnings("unchecked")
static class AppLogInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}
}
/**
* Handles the APPLICATION_LOG_HANDLING_FAILED event that occurs after
* {@link LogAggregationService} has failed to initialize the log
* aggregation service
*
* In particular, this requests that the {@link ResourceLocalizationService}
* localize the application-scoped resources.
*/
@SuppressWarnings("unchecked")
static class AppLogInitFailTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
LOG.warn("Log Aggregation service failed to initialize, there will " +
"be no logs for this application");
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}
}
/**
* Handles INIT_CONTAINER events which request that we launch a new
* container. When we're still in the INITTING state, we simply
* queue these up. When we're in the RUNNING state, we pass along
* an ContainerInitEvent to the appropriate ContainerImpl.
*/
@SuppressWarnings("unchecked")
static class InitContainerTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container);
LOG.info("Adding " + container.getContainerID()
+ " to application " + app.toString());
switch (app.getApplicationState()) {
case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID()));
break;
case INITING:
case NEW:
// these get queued up and sent out in AppInitDoneTransition
break;
default:
assert false : "Invalid state for InitContainerTransition: " +
app.getApplicationState();
}
}
}
@SuppressWarnings("unchecked")
static class AppInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerID()));
}
}
}
static final class ContainerDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerFinishedEvent containerEvent =
(ApplicationContainerFinishedEvent) event;
if (null == app.containers.remove(containerEvent.getContainerID())) {
LOG.warn("Removing unknown " + containerEvent.getContainerID() +
" from application " + app.toString());
} else {
LOG.info("Removing " + containerEvent.getContainerID() +
" from application " + app.toString());
}
}
}
@SuppressWarnings("unchecked")
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
// tell any auxiliary services that the app is done
this.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
// TODO: Trigger the LogsManager
}
@SuppressWarnings("unchecked")
static class AppFinishTriggeredTransition
implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
if (app.containers.isEmpty()) {
// No container to cleanup. Cleanup app level resources.
app.handleAppFinishWithContainersCleanedup();
return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
}
// Send event to ContainersLauncher to finish all the containers of this
// application.
for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
"Container killed on application-finish event from RM."));
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}
}
static class AppFinishTransition implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
ApplicationContainerFinishedEvent containerFinishEvent =
(ApplicationContainerFinishedEvent) event;
LOG.info("Removing " + containerFinishEvent.getContainerID()
+ " from application " + app.toString());
app.containers.remove(containerFinishEvent.getContainerID());
if (app.containers.isEmpty()) {
// All containers are cleanedup.
app.handleAppFinishWithContainersCleanedup();
return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}
}
@SuppressWarnings("unchecked")
static class AppCompletelyDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
// Inform the ContainerTokenSecretManager
if (UserGroupInformation.isSecurityEnabled()) {
app.context.getContainerTokenSecretManager().appFinished(app.appId);
}
// Inform the logService
app.dispatcher.getEventHandler().handle(
new LogHandlerAppFinishedEvent(app.appId));
}
}
static class AppLogsAggregatedTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationId appId = event.getApplicationID();
app.context.getApplications().remove(appId);
app.aclsManager.removeApplication(appId);
}
}
@Override
public void handle(ApplicationEvent event) {
this.writeLock.lock();
try {
ApplicationId applicationID = event.getApplicationID();
LOG.debug("Processing " + applicationID + " of type " + event.getType());
ApplicationState oldState = stateMachine.getCurrentState();
ApplicationState newState = null;
try {
// queue event requesting init of the same app
newState = stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.warn("Can't handle this event at current state", e);
}
if (oldState != newState) {
LOG.info("Application " + applicationID + " transitioned from "
+ oldState + " to " + newState);
}
} finally {
this.writeLock.unlock();
}
}
@Override
public String toString() {
return appId.toString();
}
}