blob: 00bd56bd0f7126eb6456fda63ff0a85aaee4bcec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerImpl implements Container {
private final Lock readLock;
private final Lock writeLock;
private final Dispatcher dispatcher;
private final NMStateStoreService stateStore;
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private final ContainerLaunchContext launchContext;
private final ContainerTokenIdentifier containerTokenIdentifier;
private final ContainerId containerId;
private volatile Resource resource;
private final String user;
private int exitCode = ContainerExitStatus.INVALID;
private final StringBuilder diagnostics;
private final int diagnosticsMaxSize;
private boolean wasLaunched;
private long containerLocalizationStartTime;
private long containerLaunchStartTime;
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
private final ContainerRetryContext containerRetryContext;
// remaining retries to relaunch container if needed
private int remainingRetryAttempts;
private String workDir;
private String logDir;
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
private static final Log LOG = LogFactory.getLog(ContainerImpl.class);
private final Map<LocalResourceRequest,List<String>> pendingResources =
new HashMap<LocalResourceRequest,List<String>>();
private final Map<Path,List<String>> localizedResources =
new HashMap<Path,List<String>>();
private final List<LocalResourceRequest> publicRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> privateRsrcs =
new ArrayList<LocalResourceRequest>();
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
private final Map<LocalResourceRequest, Path> resourcesToBeUploaded =
new ConcurrentHashMap<LocalResourceRequest, Path>();
private final Map<LocalResourceRequest, Boolean> resourcesUploadPolicies =
new ConcurrentHashMap<LocalResourceRequest, Boolean>();
// whether container has been recovered after a restart
private RecoveredContainerStatus recoveredStatus =
RecoveredContainerStatus.REQUESTED;
// whether container was marked as killed after recovery
private boolean recoveredAsKilled = false;
private Context context;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
this.stateStore = context.getNMStateStore();
this.launchContext = launchContext;
if (launchContext != null
&& launchContext.getContainerRetryContext() != null) {
this.containerRetryContext = launchContext.getContainerRetryContext();
} else {
this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
}
this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
int minimumRestartInterval = conf.getInt(
YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
if (containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY
&& containerRetryContext.getRetryInterval() < minimumRestartInterval) {
LOG.info("Set restart interval to minimum value " + minimumRestartInterval
+ "ms for container " + containerTokenIdentifier.getContainerID());
this.containerRetryContext.setRetryInterval(minimumRestartInterval);
}
this.diagnosticsMaxSize = conf.getInt(
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
this.resource = containerTokenIdentifier.getResource();
this.diagnostics = new StringBuilder();
this.credentials = creds;
this.metrics = metrics;
user = containerTokenIdentifier.getApplicationSubmitter();
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.context = context;
boolean containerMetricsEnabled =
conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
if (containerMetricsEnabled) {
long flushPeriod =
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
long unregisterDelay = conf.getLong(
YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
containerMetrics = ContainerMetrics
.forContainer(containerId, flushPeriod, unregisterDelay);
containerMetrics.recordStartTime(clock.getTime());
}
stateMachine = stateMachineFactory.make(this);
this.context = context;
}
// constructor for a recovered container
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, Context context,
RecoveredContainerState rcs) {
this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context);
this.recoveredStatus = rcs.getStatus();
this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(rcs.getDiagnostics());
Resource recoveredCapability = rcs.getCapability();
if (recoveredCapability != null
&& !this.resource.equals(recoveredCapability)) {
// resource capability had been updated before NM was down
this.resource = Resource.newInstance(recoveredCapability.getMemorySize(),
recoveredCapability.getVirtualCores());
}
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
new ContainerDiagnosticsUpdateTransition();
// State Machine for each container.
private static StateMachineFactory
<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
stateMachineFactory =
new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
// From NEW State
.addTransition(ContainerState.NEW,
EnumSet.of(ContainerState.LOCALIZING,
ContainerState.LOCALIZED,
ContainerState.LOCALIZATION_FAILED,
ContainerState.DONE),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
.addTransition(ContainerState.LOCALIZING,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED,
new ResourceFailedTransition())
.addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillDuringLocalizationTransition())
// From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
new LocalizationFailedToDoneTransition())
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
// container not launched so kill is a no-op
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.KILL_CONTAINER)
// container cleanup triggers a release of all resources
// regardless of whether they were localized or not
// LocalizedResource handles release event in all states
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_LOCALIZED)
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED)
// From LOCALIZED State
.addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
// From RUNNING State
.addTransition(ContainerState.RUNNING,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RELAUNCHING,
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.RELAUNCHING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
new ExitedWithSuccessToDoneTransition())
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.KILL_CONTAINER)
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
new ExitedWithFailureToDoneTransition())
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.KILL_CONTAINER)
// From KILLING State.
.addTransition(ContainerState.KILLING,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new ContainerKilledTransition())
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
ContainerEventType.RESOURCE_LOCALIZED,
new LocalizedResourceDuringKillTransition())
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
ContainerEventType.RESOURCE_FAILED)
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(false))
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(false))
.addTransition(ContainerState.KILLING,
ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
new KillingToDoneTransition())
// Handle a launched container during killing stage is a no-op
// as cleanup container is always handled after launch container event
// in the container launcher
.addTransition(ContainerState.KILLING,
ContainerState.KILLING,
ContainerEventType.CONTAINER_LAUNCHED)
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
new ContainerCleanedupAfterKillToDoneTransition())
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.INIT_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
// This transition may result when
// we notify container of failed localization if localizer thread (for
// that container) fails for some reason
.addTransition(ContainerState.DONE, ContainerState.DONE,
EnumSet.of(ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// create the topology tables
.installTopology();
private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
stateMachine;
public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:
case LOCALIZATION_FAILED:
case LOCALIZED:
case RUNNING:
case RELAUNCHING:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
case KILLING:
case CONTAINER_CLEANEDUP_AFTER_KILL:
case CONTAINER_RESOURCES_CLEANINGUP:
return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
case DONE:
default:
return org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
}
}
public NMTimelinePublisher getNMTimelinePublisher() {
return context.getNMTimelinePublisher();
}
@Override
public String getUser() {
this.readLock.lock();
try {
return this.user;
} finally {
this.readLock.unlock();
}
}
@Override
public Map<Path,List<String>> getLocalizedResources() {
this.readLock.lock();
try {
if (ContainerState.LOCALIZED == getContainerState()
|| ContainerState.RELAUNCHING == getContainerState()) {
return localizedResources;
} else {
return null;
}
} finally {
this.readLock.unlock();
}
}
@Override
public Credentials getCredentials() {
this.readLock.lock();
try {
return credentials;
} finally {
this.readLock.unlock();
}
}
@Override
public ContainerState getContainerState() {
this.readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public ContainerLaunchContext getLaunchContext() {
this.readLock.lock();
try {
return launchContext;
} finally {
this.readLock.unlock();
}
}
@Override
public ContainerStatus cloneAndGetContainerStatus() {
this.readLock.lock();
try {
return BuilderUtils.newContainerStatus(this.containerId,
getCurrentState(), diagnostics.toString(), exitCode, getResource(),
this.containerTokenIdentifier.getExecutionType());
} finally {
this.readLock.unlock();
}
}
@Override
public NMContainerStatus getNMContainerStatus() {
this.readLock.lock();
try {
return NMContainerStatus.newInstance(this.containerId, getCurrentState(),
getResource(), diagnostics.toString(), exitCode,
containerTokenIdentifier.getPriority(),
containerTokenIdentifier.getCreationTime(),
containerTokenIdentifier.getNodeLabelExpression());
} finally {
this.readLock.unlock();
}
}
@Override
public ContainerId getContainerId() {
return this.containerId;
}
@Override
public Resource getResource() {
return Resources.clone(this.resource);
}
@Override
public void setResource(Resource targetResource) {
Resource currentResource = getResource();
this.resource = Resources.clone(targetResource);
this.metrics.changeContainer(currentResource, targetResource);
}
@Override
public ContainerTokenIdentifier getContainerTokenIdentifier() {
this.readLock.lock();
try {
return this.containerTokenIdentifier;
} finally {
this.readLock.unlock();
}
}
@Override
public String getWorkDir() {
return workDir;
}
@Override
public void setWorkDir(String workDir) {
this.workDir = workDir;
}
@Override
public String getLogDir() {
return logDir;
}
@Override
public void setLogDir(String logDir) {
this.logDir = logDir;
}
@SuppressWarnings("unchecked")
private void sendFinishedEvents() {
// Inform the application
@SuppressWarnings("rawtypes")
EventHandler eventHandler = dispatcher.getEventHandler();
ContainerStatus containerStatus = cloneAndGetContainerStatus();
eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
// Tell the logService too
eventHandler.handle(new LogHandlerContainerFinishedEvent(
containerId, exitCode));
}
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendLaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.LAUNCH_CONTAINER;
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
}
containerLaunchStartTime = clock.getTime();
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.RELAUNCH_CONTAINER;
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
// Inform the ContainersMonitor to start monitoring the container's
// resource usage.
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendContainerMonitorStartEvent() {
long launchDuration = clock.getTime() - containerLaunchStartTime;
metrics.addContainerLaunchDuration(launchDuration);
long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;
float pmemRatio = daemonConf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
long vmemBytes = (long) (pmemRatio * pmemBytes);
int cpuVcores = getResource().getVirtualCores();
long localizationDuration = containerLaunchStartTime -
containerLocalizationStartTime;
dispatcher.getEventHandler().handle(
new ContainerStartMonitoringEvent(containerId,
vmemBytes, pmemBytes, cpuVcores, launchDuration,
localizationDuration));
}
private void addDiagnostics(String... diags) {
for (String s : diags) {
this.diagnostics.append(s);
}
if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) {
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
}
try {
stateStore.storeContainerDiagnostics(containerId, diagnostics);
} catch (IOException e) {
LOG.warn("Unable to update diagnostics in state store for "
+ containerId, e);
}
}
@SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!publicRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
}
if (!privateRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
}
if (!appRsrcs.isEmpty()) {
rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
}
dispatcher.getEventHandler().handle(
new ContainerLocalizationCleanupEvent(this, rsrc));
}
static class ContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Just drain the event and change the state.
}
}
/**
* State transition when a NEW container receives the INIT_CONTAINER
* message.
*
* If there are resources to localize, sends a
* ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
* to the ResourceLocalizationManager and enters LOCALIZING state.
*
* If there are no resources to localize, sends LAUNCH_CONTAINER event
* and enters LOCALIZED state directly.
*
* If there are any invalid resources specified, enters LOCALIZATION_FAILED
* directly.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class RequestResourcesTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
container.sendFinishedEvents();
return ContainerState.DONE;
} else if (container.recoveredAsKilled &&
container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
// container was killed but never launched
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
container.metrics.releaseContainer(container.resource);
container.sendFinishedEvents();
return ContainerState.DONE;
}
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_INIT, container));
// Inform the AuxServices about the opaque serviceData
Map<String,ByteBuffer> csd = ctxt.getServiceData();
if (csd != null) {
// This can happen more than once per Application as each container may
// have distinct service data
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
container.user, container.containerId
.getApplicationAttemptId().getApplicationId(),
service.getKey().toString(), service.getValue()));
}
}
container.containerLocalizationStartTime = clock.getTime();
// Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
List<String> links = container.pendingResources.get(req);
if (links == null) {
links = new ArrayList<String>();
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
storeSharedCacheUploadPolicy(container, req, rsrc.getValue()
.getShouldBeUploadedToSharedCache());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue());
throw e;
}
}
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new LinkedHashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
if (!container.privateRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
if (!container.appRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
}
}
/**
* Store the resource's shared cache upload policies
* Given LocalResourceRequest can be shared across containers in
* LocalResourcesTrackerImpl, we preserve the upload policies here.
* In addition, it is possible for the application to create several
* "identical" LocalResources as part of
* ContainerLaunchContext.setLocalResources with different symlinks.
* There is a corner case where these "identical" local resources have
* different upload policies. For that scenario, upload policy will be set to
* true as long as there is at least one LocalResource entry with
* upload policy set to true.
*/
private static void storeSharedCacheUploadPolicy(ContainerImpl container,
LocalResourceRequest resourceRequest, Boolean uploadPolicy) {
Boolean storedUploadPolicy =
container.resourcesUploadPolicies.get(resourceRequest);
if (storedUploadPolicy == null || (!storedUploadPolicy && uploadPolicy)) {
container.resourcesUploadPolicies.put(resourceRequest, uploadPolicy);
}
}
/**
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@SuppressWarnings("unchecked")
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
LocalResourceRequest resourceRequest = rsrcEvent.getResource();
Path location = rsrcEvent.getLocation();
List<String> syms = container.pendingResources.remove(resourceRequest);
if (null == syms) {
LOG.warn("Localized unknown resource " + resourceRequest +
" for container " + container.containerId);
assert false;
// fail container?
return ContainerState.LOCALIZING;
}
container.localizedResources.put(location, syms);
// check to see if this resource should be uploaded to the shared cache
// as well
if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
container.resourcesToBeUploaded.put(resourceRequest, location);
}
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(LocalizationEventType.
CONTAINER_RESOURCES_LOCALIZED, container));
container.sendLaunchEvent();
container.metrics.endInitingContainer();
// If this is a recovered container that has already launched, skip
// uploading resources to the shared cache. We do this to avoid uploading
// the same resources multiple times. The tradeoff is that in the case of
// a recovered container, there is a chance that resources don't get
// uploaded into the shared cache. This is OK because resources are not
// acknowledged by the SCM until they have been uploaded by the node
// manager.
if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
&& container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
// kick off uploads to the shared cache
container.dispatcher.getEventHandler().handle(
new SharedCacheUploadEvent(container.resourcesToBeUploaded, container
.getLaunchContext(), container.getUser(),
SharedCacheUploadEventType.UPLOAD));
}
return ContainerState.LOCALIZED;
}
}
/**
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
*/
static class LaunchTransition extends ContainerTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.sendContainerMonitorStartEvent();
container.metrics.runningContainer();
container.wasLaunched = true;
if (container.recoveredAsKilled) {
LOG.info("Killing " + container.containerId
+ " due to recovered as killed");
container.addDiagnostics("Container recovered as killed.\n");
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
}
}
}
/**
* Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
* upon EXITED_WITH_SUCCESS message.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
boolean clCleanupRequired;
public ExitedWithSuccessTransition(boolean clCleanupRequired) {
this.clCleanupRequired = clCleanupRequired;
}
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Set exit code to 0 on success
container.exitCode = 0;
// TODO: Add containerWorkDir to the deletion service.
if (clCleanupRequired) {
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
}
container.cleanup();
}
}
/**
* Transition to EXITED_WITH_FAILURE state upon
* CONTAINER_EXITED_WITH_FAILURE state.
**/
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition {
boolean clCleanupRequired;
public ExitedWithFailureTransition(boolean clCleanupRequired) {
this.clCleanupRequired = clCleanupRequired;
}
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
// TODO: Add containerWorkDir to the deletion service.
// TODO: Add containerOuputDir to the deletion service.
if (clCleanupRequired) {
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
}
container.cleanup();
}
}
/**
* Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
* CONTAINER_EXITED_WITH_FAILURE state.
**/
@SuppressWarnings("unchecked") // dispatcher not typed
static class RetryFailureTransition implements
MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {
@Override
public ContainerState transition(final ContainerImpl container,
ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
if (container.containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY) {
int n = container.containerRetryContext.getMaxRetries()
- container.remainingRetryAttempts;
container.addDiagnostics("Diagnostic message from attempt "
+ n + " : ", "\n");
}
container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
if (container.shouldRetry(container.exitCode)) {
if (container.remainingRetryAttempts > 0) {
container.remainingRetryAttempts--;
try {
container.stateStore.storeContainerRemainingRetryAttempts(
container.getContainerId(), container.remainingRetryAttempts);
} catch (IOException e) {
LOG.warn(
"Unable to update remainingRetryAttempts in state store for "
+ container.getContainerId(), e);
}
}
LOG.info("Relaunching Container " + container.getContainerId()
+ ". Remaining retry attempts(after relaunch) : "
+ container.remainingRetryAttempts
+ ". Interval between retries is "
+ container.containerRetryContext.getRetryInterval() + "ms");
container.wasLaunched = false;
container.metrics.endRunningContainer();
if (container.containerRetryContext.getRetryInterval() == 0) {
container.sendRelaunchEvent();
} else {
// wait for some time, then send launch event
new Thread() {
@Override
public void run() {
try {
Thread.sleep(
container.containerRetryContext.getRetryInterval());
container.sendRelaunchEvent();
} catch (InterruptedException e) {
return;
}
}
}.start();
}
return ContainerState.RELAUNCHING;
} else {
new ExitedWithFailureTransition(true).transition(container, event);
return ContainerState.EXITED_WITH_FAILURE;
}
}
}
@Override
public boolean isRetryContextSet() {
return containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY;
}
@Override
public boolean shouldRetry(int errorCode) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& containerRetryContext.getErrorCodes() != null
&& containerRetryContext.getErrorCodes().contains(errorCode))) {
return remainingRetryAttempts > 0
|| remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
}
return false;
}
/**
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
*/
static class KilledExternallyTransition extends ExitedWithFailureTransition {
KilledExternallyTransition() {
super(true);
}
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
super.transition(container, event);
container.addDiagnostics("Killed by external signal\n");
}
}
/**
* Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
* RESOURCE_FAILED event.
*/
static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceFailedEvent rsrcFailedEvent =
(ContainerResourceFailedEvent) event;
container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n");
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.cleanup();
container.metrics.endInitingContainer();
}
}
/**
* Transition from LOCALIZING to KILLING upon receiving
* KILL_CONTAINER event.
*/
static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.cleanup();
container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.exitCode = killEvent.getContainerExitStatus();
container.addDiagnostics(killEvent.getDiagnostic(), "\n");
container.addDiagnostics("Container is killed before being launched.\n");
}
}
/**
* Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
* while in the process of killing.
*/
static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
List<String> syms =
container.pendingResources.remove(rsrcEvent.getResource());
if (null == syms) {
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
" for container " + container.containerId);
assert false;
// fail container?
return;
}
container.localizedResources.put(rsrcEvent.getLocation(), syms);
}
}
/**
* Transitions upon receiving KILL_CONTAINER:
* - LOCALIZED -> KILLING
* - RUNNING -> KILLING
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Kill the process/process-grp
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.addDiagnostics(killEvent.getDiagnostic(), "\n");
container.exitCode = killEvent.getContainerExitStatus();
}
}
/**
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
* upon receiving CONTAINER_KILLED_ON_REQUEST.
*/
static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
if (container.hasDefaultExitCode()) {
container.exitCode = exitEvent.getExitCode();
}
if (exitEvent.getDiagnosticInfo() != null) {
container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
// The process/process-grp is killed. Decrement reference counts and
// cleanup resources
container.cleanup();
}
}
/**
* Handle the following transitions:
* - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
* KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
* -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@SuppressWarnings("unchecked")
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.releaseContainer(container.resource);
if (container.containerMetrics != null) {
container.containerMetrics
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
container.containerMetrics.finished();
}
container.sendFinishedEvents();
// if the current state is NEW it means the CONTAINER_INIT was never
// sent for the event, thus no need to send the CONTAINER_STOP
if (container.getCurrentState()
!= org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
container.dispatcher.getEventHandler().handle(new AuxServicesEvent
(AuxServicesEventType.CONTAINER_STOP, container));
}
container.context.getNodeStatusUpdater().sendOutofBandHeartBeat();
}
}
/**
* Handle the following transition:
* - NEW -> DONE upon KILL_CONTAINER
*/
static class KillOnNewTransition extends ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.exitCode = killEvent.getContainerExitStatus();
container.addDiagnostics(killEvent.getDiagnostic(), "\n");
container.addDiagnostics("Container is killed before being launched.\n");
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - LOCALIZATION_FAILED -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class LocalizationFailedToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.failedContainer();
NMAuditLogger.logFailure(container.user,
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
"Container failed with state: " + container.getContainerState(),
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - EXITED_WITH_SUCCESS -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ExitedWithSuccessToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
if (container.wasLaunched) {
container.metrics.endRunningContainer();
} else {
LOG.warn("Container exited with success despite being killed and not" +
"actually running");
}
container.metrics.completedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - EXITED_WITH_FAILURE -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ExitedWithFailureToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
if (container.wasLaunched) {
container.metrics.endRunningContainer();
}
container.metrics.failedContainer();
NMAuditLogger.logFailure(container.user,
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
"Container failed with state: " + container.getContainerState(),
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - KILLING -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class KillingToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* CONTAINER_CLEANEDUP_AFTER_KILL -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ContainerCleanedupAfterKillToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
if (container.wasLaunched) {
container.metrics.endRunningContainer();
}
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Update diagnostics, staying in the same state.
*/
static class ContainerDiagnosticsUpdateTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerDiagnosticsUpdateEvent updateEvent =
(ContainerDiagnosticsUpdateEvent) event;
container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n");
}
}
@Override
public void handle(ContainerEvent event) {
try {
this.writeLock.lock();
ContainerId containerID = event.getContainerID();
LOG.debug("Processing " + containerID + " of type " + event.getType());
ContainerState oldState = stateMachine.getCurrentState();
ContainerState newState = null;
try {
newState =
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.warn("Can't handle this event at current state: Current: ["
+ oldState + "], eventType: [" + event.getType() + "]", e);
}
if (oldState != newState) {
LOG.info("Container " + containerID + " transitioned from "
+ oldState
+ " to " + newState);
}
} finally {
this.writeLock.unlock();
}
}
@Override
public String toString() {
this.readLock.lock();
try {
return this.containerId.toString();
} finally {
this.readLock.unlock();
}
}
private boolean hasDefaultExitCode() {
return (this.exitCode == ContainerExitStatus.INVALID);
}
/**
* Returns whether the specific resource should be uploaded to the shared
* cache.
*/
private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
LocalResourceRequest resource) {
return container.resourcesUploadPolicies.get(resource);
}
@VisibleForTesting
ContainerRetryContext getContainerRetryContext() {
return containerRetryContext;
}
@Override
public Priority getPriority() {
return containerTokenIdentifier.getPriority();
}
}