blob: 6d84fb2907680106fbef3d3d32b32ff521ca0aee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* The state machine for the representation of an Application
* within the NodeManager.
*/
public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
// flow context is set only if the timeline service v.2 is enabled
private FlowContext flowContext;
final ApplicationId appId;
final Credentials credentials;
Map<ApplicationAccessType, String> applicationACLs;
final ApplicationACLsManager aclsManager;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Context context;
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationImpl.class);
private LogAggregationContext logAggregationContext;
Map<ContainerId, Container> containers =
new ConcurrentHashMap<>();
/**
* The timestamp when the log aggregation has started for this application.
* Used to determine the age of application log files during log aggregation.
* When logAggregationRentention policy is enabled, log files older than
* the retention policy will not be uploaded but scheduled for deletion.
*/
private long applicationLogInitedTimestamp = -1;
private final NMStateStoreService appStateStore;
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials, Context context) {
this(dispatcher, user, null, appId, credentials, context, -1L);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
this.user = user;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = context.getApplicationACLsManager();
Configuration conf = context.getConf();
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
if (flowContext == null) {
throw new IllegalArgumentException("flow context cannot be null");
}
this.flowContext = flowContext;
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
context.getNMTimelinePublisher().createTimelineClient(appId);
}
}
this.context = context;
this.appStateStore = context.getNMStateStore();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
stateMachine = stateMachineFactory.make(this);
setAppLogInitedTimestamp(recoveredLogInitedTime);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId,
Credentials credentials, Context context) {
this(dispatcher, user, flowContext, appId, credentials,
context, -1);
}
/**
* Data object that encapsulates the flow context for the application purpose.
*/
public static class FlowContext {
private final String flowName;
private final String flowVersion;
private final long flowRunId;
public FlowContext(String flowName, String flowVersion, long flowRunId) {
this.flowName = flowName;
this.flowVersion = flowVersion;
this.flowRunId = flowRunId;
}
public String getFlowName() {
return flowName;
}
public String getFlowVersion() {
return flowVersion;
}
public long getFlowRunId() {
return flowRunId;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("{");
sb.append("Flow Name=").append(getFlowName());
sb.append(" Flow Versioin=").append(getFlowVersion());
sb.append(" Flow Run Id=").append(getFlowRunId()).append(" }");
return sb.toString();
}
}
@Override
public String getUser() {
return user.toString();
}
@Override
public ApplicationId getAppId() {
return appId;
}
@Override
public ApplicationState getApplicationState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public Map<ContainerId, Container> getContainers() {
this.readLock.lock();
try {
return this.containers;
} finally {
this.readLock.unlock();
}
}
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
private static final InitContainerTransition INIT_CONTAINER_TRANSITION =
new InitContainerTransition();
private static StateMachineFactory<ApplicationImpl, ApplicationState,
ApplicationEventType, ApplicationEvent> stateMachineFactory =
new StateMachineFactory<ApplicationImpl, ApplicationState,
ApplicationEventType, ApplicationEvent>(ApplicationState.NEW)
// Transitions from NEW state
.addTransition(ApplicationState.NEW, ApplicationState.INITING,
ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
.addTransition(ApplicationState.NEW, ApplicationState.NEW,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
// Transitions from INITING state
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.INITING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
CONTAINER_DONE_TRANSITION)
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
new AppLogInitDoneTransition())
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
new AppLogInitFailTransition())
.addTransition(ApplicationState.INITING, ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_INITED,
new AppInitDoneTransition())
// Transitions from RUNNING state
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
CONTAINER_DONE_TRANSITION)
.addTransition(
ApplicationState.RUNNING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
// Transitions from FINISHING_CONTAINERS_WAIT state.
.addTransition(
ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
new AppFinishTransition())
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.FINISHING_CONTAINERS_WAIT,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION))
// Transitions from APPLICATION_RESOURCES_CLEANINGUP state
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED)
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_INITED,
ApplicationEventType.FINISH_APPLICATION))
// Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
new AppLogsAggregatedTransition())
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.INIT_CONTAINER,
INIT_CONTAINER_TRANSITION)
.addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
EnumSet.of(
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
ApplicationEventType.FINISH_APPLICATION))
// create the topology tables
.installTopology();
private final StateMachine<ApplicationState, ApplicationEventType, ApplicationEvent> stateMachine;
/**
* Notify services of new application.
*
* In particular, this initializes the {@link LogAggregationService}
*/
@SuppressWarnings("unchecked")
static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// Inform the logAggregator
app.logAggregationContext = initEvent.getLogAggregationContext();
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, app.applicationACLs,
app.logAggregationContext, app.applicationLogInitedTimestamp));
}
}
/**
* Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after
* {@link LogAggregationService} has created the directories for the app
* and started the aggregation thread for the app.
*
* In particular, this requests that the {@link ResourceLocalizationService}
* localize the application-scoped resources.
*/
@SuppressWarnings("unchecked")
static class AppLogInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
app.setAppLogInitedTimestamp(event.getTimestamp());
try {
app.appStateStore.storeApplication(app.appId, buildAppProto(app));
} catch (Exception ex) {
LOG.warn("failed to update application state in state store", ex);
}
}
}
@VisibleForTesting
void setAppLogInitedTimestamp(long appLogInitedTimestamp) {
this.applicationLogInitedTimestamp = appLogInitedTimestamp;
}
static ContainerManagerApplicationProto buildAppProto(ApplicationImpl app)
throws IOException {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) app.appId).getProto());
builder.setUser(app.getUser());
if (app.logAggregationContext != null) {
builder.setLogAggregationContext((
(LogAggregationContextPBImpl)app.logAggregationContext).getProto());
}
builder.clearCredentials();
if (app.credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
app.credentials.writeTokenStorageToStream(dob);
builder.setCredentials(ByteString.copyFrom(dob.getData()));
}
builder.clearAcls();
if (app.applicationACLs != null) {
for (Map.Entry<ApplicationAccessType, String> acl : app
.applicationACLs.entrySet()) {
YarnProtos.ApplicationACLMapProto p = YarnProtos
.ApplicationACLMapProto.newBuilder()
.setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
.setAcl(acl.getValue())
.build();
builder.addAcls(p);
}
}
builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
builder.clearFlowContext();
if (app.flowContext != null && app.flowContext.getFlowName() != null
&& app.flowContext.getFlowVersion() != null) {
FlowContextProto fcp = FlowContextProto.newBuilder()
.setFlowName(app.flowContext.getFlowName())
.setFlowVersion(app.flowContext.getFlowVersion())
.setFlowRunId(app.flowContext.getFlowRunId()).build();
builder.setFlowContext(fcp);
}
return builder.build();
}
/**
* Handles the APPLICATION_LOG_HANDLING_FAILED event that occurs after
* {@link LogAggregationService} has failed to initialize the log
* aggregation service
*
* In particular, this requests that the {@link ResourceLocalizationService}
* localize the application-scoped resources.
*/
@SuppressWarnings("unchecked")
static class AppLogInitFailTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
LOG.warn("Log Aggregation service failed to initialize, there will " +
"be no logs for this application");
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}
}
/**
* Handles INIT_CONTAINER events which request that we launch a new
* container. When we're still in the INITTING state, we simply
* queue these up. When we're in the RUNNING state, we pass along
* an ContainerInitEvent to the appropriate ContainerImpl.
*/
@SuppressWarnings("unchecked")
static class InitContainerTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerInitEvent initEvent =
(ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerId(), container);
LOG.info("Adding " + container.getContainerId()
+ " to application " + app.toString());
ApplicationState appState = app.getApplicationState();
switch (appState) {
case RUNNING:
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId()));
break;
case INITING:
case NEW:
// these get queued up and sent out in AppInitDoneTransition
break;
default:
LOG.warn("Killing {} because {} is in state {}",
container.getContainerId(), app, appState);
app.dispatcher.getEventHandler().handle(new ContainerKillEvent(
container.getContainerId(),
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
"Application no longer running.\n"));
break;
}
}
}
@SuppressWarnings("unchecked")
static class AppInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
container.getContainerId()));
}
}
}
static final class ContainerDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerFinishedEvent containerEvent =
(ApplicationContainerFinishedEvent) event;
if (null == app.containers.remove(containerEvent.getContainerID())) {
LOG.warn("Removing unknown " + containerEvent.getContainerID() +
" from application " + app.toString());
} else {
LOG.info("Removing " + containerEvent.getContainerID() +
" from application " + app.toString());
}
}
}
@SuppressWarnings("unchecked")
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
// tell any auxiliary services that the app is done
this.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
// TODO: Trigger the LogsManager
}
@SuppressWarnings("unchecked")
static class AppFinishTriggeredTransition
implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event;
if (app.containers.isEmpty()) {
// No container to cleanup. Cleanup app level resources.
app.handleAppFinishWithContainersCleanedup();
return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
}
// Send event to ContainersLauncher to finish all the containers of this
// application.
for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
"Container killed on application-finish event: " + appEvent.getDiagnostic()));
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}
}
static class AppFinishTransition implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@Override
public ApplicationState transition(ApplicationImpl app,
ApplicationEvent event) {
ApplicationContainerFinishedEvent containerFinishEvent =
(ApplicationContainerFinishedEvent) event;
LOG.info("Removing " + containerFinishEvent.getContainerID()
+ " from application " + app.toString());
app.containers.remove(containerFinishEvent.getContainerID());
if (app.containers.isEmpty()) {
// All containers are cleanedup.
app.handleAppFinishWithContainersCleanedup();
return ApplicationState.APPLICATION_RESOURCES_CLEANINGUP;
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
}
}
@SuppressWarnings("unchecked")
static class AppCompletelyDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
private void updateCollectorStatus(ApplicationImpl app) {
// Remove collectors info for finished apps.
// TODO check we remove related collectors info in failure cases
// (YARN-3038)
Map<ApplicationId, AppCollectorData> registeringCollectors
= app.context.getRegisteringCollectors();
if (registeringCollectors != null) {
registeringCollectors.remove(app.getAppId());
}
Map<ApplicationId, AppCollectorData> knownCollectors =
app.context.getKnownCollectors();
if (knownCollectors != null) {
knownCollectors.remove(app.getAppId());
}
// stop timelineClient when application get finished.
NMTimelinePublisher nmTimelinePublisher =
app.context.getNMTimelinePublisher();
if (nmTimelinePublisher != null) {
nmTimelinePublisher.stopTimelineClient(app.getAppId());
}
}
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
// Inform the logService
app.dispatcher.getEventHandler().handle(
new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId());
updateCollectorStatus(app);
}
}
static class AppLogsAggregatedTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationId appId = event.getApplicationID();
app.context.getApplications().remove(appId);
app.aclsManager.removeApplication(appId);
try {
app.context.getNMStateStore().removeApplication(appId);
} catch (IOException e) {
LOG.error("Unable to remove application from state store", e);
}
}
}
@Override
public void handle(ApplicationEvent event) {
this.writeLock.lock();
try {
ApplicationId applicationID = event.getApplicationID();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Processing " + applicationID + " of type " + event.getType());
}
ApplicationState oldState = stateMachine.getCurrentState();
ApplicationState newState = null;
try {
// queue event requesting init of the same app
newState = stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.warn("Can't handle this event at current state", e);
}
if (newState != null && oldState != newState) {
LOG.info("Application " + applicationID + " transitioned from "
+ oldState + " to " + newState);
}
} finally {
this.writeLock.unlock();
}
}
@Override
public String toString() {
return appId.toString();
}
@VisibleForTesting
public LogAggregationContext getLogAggregationContext() {
try {
this.readLock.lock();
return this.logAggregationContext;
} finally {
this.readLock.unlock();
}
}
@Override
public String getFlowName() {
return flowContext == null ? null : flowContext.getFlowName();
}
@Override
public String getFlowVersion() {
return flowContext == null ? null : flowContext.getFlowVersion();
}
@Override
public long getFlowRunId() {
return flowContext == null ? 0L : flowContext.getFlowRunId();
}
}