| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; |
| |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerExitStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerReport; |
| import org.apache.hadoop.yarn.api.records.ContainerState; |
| import org.apache.hadoop.yarn.api.records.ContainerStatus; |
| import org.apache.hadoop.yarn.api.records.ContainerUpdateType; |
| import org.apache.hadoop.yarn.api.records.ExecutionType; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.ResourceRequest; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMContext; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; |
| import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; |
| import org.apache.hadoop.yarn.state.InvalidStateTransitionException; |
| import org.apache.hadoop.yarn.state.MultipleArcTransition; |
| import org.apache.hadoop.yarn.state.SingleArcTransition; |
| import org.apache.hadoop.yarn.state.StateMachine; |
| import org.apache.hadoop.yarn.state.StateMachineFactory; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.apache.hadoop.yarn.webapp.util.WebAppUtils; |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| public class RMContainerImpl implements RMContainer { |
| |
| private static final Log LOG = LogFactory.getLog(RMContainerImpl.class); |
| |
| private static final StateMachineFactory<RMContainerImpl, RMContainerState, |
| RMContainerEventType, RMContainerEvent> |
| stateMachineFactory = new StateMachineFactory<RMContainerImpl, |
| RMContainerState, RMContainerEventType, RMContainerEvent>( |
| RMContainerState.NEW) |
| |
| // Transitions from NEW state |
| .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED, |
| RMContainerEventType.START, new ContainerStartedTransition()) |
| .addTransition(RMContainerState.NEW, RMContainerState.KILLED, |
| RMContainerEventType.KILL) |
| .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, |
| RMContainerEventType.RESERVED, new ContainerReservedTransition()) |
| .addTransition(RMContainerState.NEW, RMContainerState.ACQUIRED, |
| RMContainerEventType.ACQUIRED, new AcquiredTransition()) |
| .addTransition(RMContainerState.NEW, |
| EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), |
| RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) |
| |
| // Transitions from RESERVED state |
| .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, |
| RMContainerEventType.RESERVED, new ContainerReservedTransition()) |
| .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, |
| RMContainerEventType.START, new ContainerStartedTransition()) |
| .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, |
| RMContainerEventType.KILL) // nothing to do |
| .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, |
| RMContainerEventType.RELEASED) // nothing to do |
| |
| |
| // Transitions from ALLOCATED state |
| .addTransition(RMContainerState.ALLOCATED, RMContainerState.ACQUIRED, |
| RMContainerEventType.ACQUIRED, new AcquiredTransition()) |
| .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, |
| RMContainerEventType.EXPIRE, new FinishedTransition()) |
| .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, |
| RMContainerEventType.KILL, new FinishedTransition()) |
| |
| // Transitions from ACQUIRED state |
| .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, |
| RMContainerEventType.LAUNCHED) |
| .addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED, |
| RMContainerEventType.ACQUIRED) |
| .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, |
| RMContainerEventType.FINISHED, new FinishedTransition()) |
| .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, |
| RMContainerEventType.RELEASED, new KillTransition()) |
| .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, |
| RMContainerEventType.EXPIRE, new KillTransition()) |
| .addTransition(RMContainerState.ACQUIRED, RMContainerState.KILLED, |
| RMContainerEventType.KILL, new KillTransition()) |
| |
| // Transitions from RUNNING state |
| .addTransition(RMContainerState.RUNNING, RMContainerState.COMPLETED, |
| RMContainerEventType.FINISHED, new FinishedTransition()) |
| .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED, |
| RMContainerEventType.KILL, new KillTransition()) |
| .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED, |
| RMContainerEventType.RELEASED, new KillTransition()) |
| .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, |
| RMContainerEventType.ACQUIRED) |
| .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, |
| RMContainerEventType.RESERVED, new ContainerReservedTransition()) |
| .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, |
| RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, |
| new ContainerAcquiredWhileRunningTransition()) |
| .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, |
| RMContainerEventType.NM_DONE_CHANGE_RESOURCE, |
| new NMReportedContainerChangeIsDoneTransition()) |
| |
| // Transitions from COMPLETED state |
| .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED, |
| EnumSet.of(RMContainerEventType.EXPIRE, RMContainerEventType.RELEASED, |
| RMContainerEventType.KILL)) |
| |
| // Transitions from EXPIRED state |
| .addTransition(RMContainerState.EXPIRED, RMContainerState.EXPIRED, |
| EnumSet.of(RMContainerEventType.RELEASED, RMContainerEventType.KILL)) |
| |
| // Transitions from RELEASED state |
| .addTransition(RMContainerState.RELEASED, RMContainerState.RELEASED, |
| EnumSet.of(RMContainerEventType.EXPIRE, RMContainerEventType.RELEASED, |
| RMContainerEventType.KILL, RMContainerEventType.FINISHED)) |
| |
| // Transitions from KILLED state |
| .addTransition(RMContainerState.KILLED, RMContainerState.KILLED, |
| EnumSet.of(RMContainerEventType.EXPIRE, RMContainerEventType.RELEASED, |
| RMContainerEventType.KILL, RMContainerEventType.FINISHED)) |
| |
| // create the topology tables |
| .installTopology(); |
| |
| private final StateMachine<RMContainerState, RMContainerEventType, |
| RMContainerEvent> stateMachine; |
| private final ReadLock readLock; |
| private final WriteLock writeLock; |
| private final ApplicationAttemptId appAttemptId; |
| private final NodeId nodeId; |
| private final RMContext rmContext; |
| private final EventHandler eventHandler; |
| private final ContainerAllocationExpirer containerAllocationExpirer; |
| private final String user; |
| private final String nodeLabelExpression; |
| |
| private volatile Container container; |
| private Resource reservedResource; |
| private NodeId reservedNode; |
| private SchedulerRequestKey reservedSchedulerKey; |
| private long creationTime; |
| private long finishTime; |
| private ContainerStatus finishedStatus; |
| private boolean isAMContainer; |
| private List<ResourceRequest> resourceRequests; |
| |
| // Only used for container resource increase and decrease. This is the |
| // resource to rollback to should container resource increase token expires. |
| private Resource lastConfirmedResource; |
| private volatile String queueName; |
| |
| private boolean isExternallyAllocated; |
| private SchedulerRequestKey allocatedSchedulerKey; |
| |
| public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, |
| ApplicationAttemptId appAttemptId, NodeId nodeId, String user, |
| RMContext rmContext) { |
| this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System |
| .currentTimeMillis(), ""); |
| } |
| |
| public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, |
| ApplicationAttemptId appAttemptId, NodeId nodeId, String user, |
| RMContext rmContext, boolean isExternallyAllocated) { |
| this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System |
| .currentTimeMillis(), "", isExternallyAllocated); |
| } |
| |
| private boolean saveNonAMContainerMetaInfo; |
| |
| public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, |
| ApplicationAttemptId appAttemptId, NodeId nodeId, String user, |
| RMContext rmContext, String nodeLabelExpression) { |
| this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System |
| .currentTimeMillis(), nodeLabelExpression); |
| } |
| |
| public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, |
| ApplicationAttemptId appAttemptId, NodeId nodeId, String user, |
| RMContext rmContext, long creationTime, String nodeLabelExpression) { |
| this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, |
| creationTime, nodeLabelExpression, false); |
| } |
| |
| public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, |
| ApplicationAttemptId appAttemptId, NodeId nodeId, String user, |
| RMContext rmContext, long creationTime, String nodeLabelExpression, |
| boolean isExternallyAllocated) { |
| this.stateMachine = stateMachineFactory.make(this); |
| this.nodeId = nodeId; |
| this.container = container; |
| this.allocatedSchedulerKey = schedulerKey; |
| this.appAttemptId = appAttemptId; |
| this.user = user; |
| this.creationTime = creationTime; |
| this.rmContext = rmContext; |
| this.eventHandler = rmContext.getDispatcher().getEventHandler(); |
| this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); |
| this.isAMContainer = false; |
| this.resourceRequests = null; |
| this.nodeLabelExpression = nodeLabelExpression; |
| this.lastConfirmedResource = container.getResource(); |
| this.isExternallyAllocated = isExternallyAllocated; |
| |
| ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| this.readLock = lock.readLock(); |
| this.writeLock = lock.writeLock(); |
| |
| saveNonAMContainerMetaInfo = rmContext.getYarnConfiguration().getBoolean( |
| YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, |
| YarnConfiguration |
| .DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO); |
| |
| if (container.getId() != null) { |
| rmContext.getRMApplicationHistoryWriter().containerStarted(this); |
| } |
| |
| // If saveNonAMContainerMetaInfo is true, store system metrics for all |
| // containers. If false, and if this container is marked as the AM, metrics |
| // will still be published for this container, but that calculation happens |
| // later. |
| if (saveNonAMContainerMetaInfo && null != container.getId()) { |
| rmContext.getSystemMetricsPublisher().containerCreated( |
| this, this.creationTime); |
| } |
| } |
| |
| @Override |
| public ContainerId getContainerId() { |
| return this.container.getId(); |
| } |
| |
| @Override |
| public ApplicationAttemptId getApplicationAttemptId() { |
| return this.appAttemptId; |
| } |
| |
| @Override |
| public Container getContainer() { |
| return this.container; |
| } |
| |
| public void setContainer(Container container) { |
| this.container = container; |
| } |
| |
| @Override |
| public RMContainerState getState() { |
| this.readLock.lock(); |
| try { |
| return this.stateMachine.getCurrentState(); |
| } finally { |
| this.readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Resource getReservedResource() { |
| return reservedResource; |
| } |
| |
| @Override |
| public NodeId getReservedNode() { |
| return reservedNode; |
| } |
| |
| @Override |
| public SchedulerRequestKey getReservedSchedulerKey() { |
| return reservedSchedulerKey; |
| } |
| |
| @Override |
| public Resource getAllocatedResource() { |
| try { |
| readLock.lock(); |
| return container.getResource(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public Resource getLastConfirmedResource() { |
| try { |
| readLock.lock(); |
| return this.lastConfirmedResource; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public NodeId getAllocatedNode() { |
| return container.getNodeId(); |
| } |
| |
| @Override |
| public SchedulerRequestKey getAllocatedSchedulerKey() { |
| return allocatedSchedulerKey; |
| } |
| |
| @Override |
| public Priority getAllocatedPriority() { |
| return container.getPriority(); |
| } |
| |
| @Override |
| public long getCreationTime() { |
| return creationTime; |
| } |
| |
| @Override |
| public long getFinishTime() { |
| try { |
| readLock.lock(); |
| return finishTime; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getDiagnosticsInfo() { |
| try { |
| readLock.lock(); |
| if (finishedStatus != null) { |
| return finishedStatus.getDiagnostics(); |
| } else { |
| return null; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getLogURL() { |
| try { |
| readLock.lock(); |
| StringBuilder logURL = new StringBuilder(); |
| logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext |
| .getYarnConfiguration())); |
| logURL.append(WebAppUtils.getRunningLogURL( |
| container.getNodeHttpAddress(), getContainerId().toString(), |
| user)); |
| return logURL.toString(); |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int getContainerExitStatus() { |
| try { |
| readLock.lock(); |
| if (finishedStatus != null) { |
| return finishedStatus.getExitStatus(); |
| } else { |
| return 0; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public ContainerState getContainerState() { |
| try { |
| readLock.lock(); |
| if (finishedStatus != null) { |
| return finishedStatus.getState(); |
| } else { |
| return ContainerState.RUNNING; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public List<ResourceRequest> getResourceRequests() { |
| try { |
| readLock.lock(); |
| return resourceRequests; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void setResourceRequests(List<ResourceRequest> requests) { |
| try { |
| writeLock.lock(); |
| this.resourceRequests = requests; |
| } finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return getContainerId().toString(); |
| } |
| |
| @Override |
| public boolean isAMContainer() { |
| try { |
| readLock.lock(); |
| return isAMContainer; |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| public void setAMContainer(boolean isAMContainer) { |
| try { |
| writeLock.lock(); |
| this.isAMContainer = isAMContainer; |
| } finally { |
| writeLock.unlock(); |
| } |
| |
| // Even if saveNonAMContainerMetaInfo is not true, the AM container's system |
| // metrics still need to be saved so that the AM's logs can be accessed. |
| // This call to getSystemMetricsPublisher().containerCreated() is mutually |
| // exclusive with the one in the RMContainerImpl constructor. |
| if (!saveNonAMContainerMetaInfo && this.isAMContainer) { |
| rmContext.getSystemMetricsPublisher().containerCreated( |
| this, this.creationTime); |
| } |
| } |
| |
| @Override |
| public void handle(RMContainerEvent event) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing " + event.getContainerId() + " of type " + event |
| .getType()); |
| } |
| try { |
| writeLock.lock(); |
| RMContainerState oldState = getState(); |
| try { |
| stateMachine.doTransition(event.getType(), event); |
| } catch (InvalidStateTransitionException e) { |
| LOG.error("Can't handle this event at current state", e); |
| LOG.error("Invalid event " + event.getType() + |
| " on container " + this.getContainerId()); |
| } |
| if (oldState != getState()) { |
| LOG.info(event.getContainerId() + " Container Transitioned from " |
| + oldState + " to " + getState()); |
| } |
| } |
| |
| finally { |
| writeLock.unlock(); |
| } |
| } |
| |
| public boolean completed() { |
| return finishedStatus != null; |
| } |
| |
| @Override |
| public NodeId getNodeId() { |
| return nodeId; |
| } |
| |
| private static class BaseTransition implements |
| SingleArcTransition<RMContainerImpl, RMContainerEvent> { |
| |
| @Override |
| public void transition(RMContainerImpl cont, RMContainerEvent event) { |
| |
| } |
| } |
| |
| private static final class ContainerRecoveredTransition |
| implements |
| MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> { |
| @Override |
| public RMContainerState transition(RMContainerImpl container, |
| RMContainerEvent event) { |
| NMContainerStatus report = |
| ((RMContainerRecoverEvent) event).getContainerReport(); |
| if (report.getContainerState().equals(ContainerState.COMPLETE)) { |
| ContainerStatus status = |
| ContainerStatus.newInstance(report.getContainerId(), |
| report.getContainerState(), report.getDiagnostics(), |
| report.getContainerExitStatus()); |
| |
| new FinishedTransition().transition(container, |
| new RMContainerFinishedEvent(container.getContainerId(), status, |
| RMContainerEventType.FINISHED)); |
| return RMContainerState.COMPLETED; |
| } else if (report.getContainerState().equals(ContainerState.RUNNING)) { |
| // Tell the app |
| container.eventHandler.handle(new RMAppRunningOnNodeEvent(container |
| .getApplicationAttemptId().getApplicationId(), container.nodeId)); |
| return RMContainerState.RUNNING; |
| } else { |
| // This can never happen. |
| LOG.warn("RMContainer received unexpected recover event with container" |
| + " state " + report.getContainerState() + " while recovering."); |
| return RMContainerState.RUNNING; |
| } |
| } |
| } |
| |
| private static final class ContainerReservedTransition |
| extends BaseTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| RMContainerReservedEvent e = (RMContainerReservedEvent)event; |
| container.reservedResource = e.getReservedResource(); |
| container.reservedNode = e.getReservedNode(); |
| container.reservedSchedulerKey = e.getReservedSchedulerKey(); |
| |
| Container c = container.getContainer(); |
| if (c != null) { |
| c.setNodeId(container.reservedNode); |
| } |
| } |
| } |
| |
| |
| private static final class ContainerStartedTransition extends |
| BaseTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| container.eventHandler.handle(new RMAppAttemptEvent( |
| container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED)); |
| } |
| } |
| |
| private static final class AcquiredTransition extends BaseTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| // Clear ResourceRequest stored in RMContainer, we don't need to remember |
| // this anymore. |
| container.setResourceRequests(null); |
| |
| // Register with containerAllocationExpirer. |
| container.containerAllocationExpirer.register( |
| new AllocationExpirationInfo(container.getContainerId())); |
| |
| // Tell the app |
| container.eventHandler.handle(new RMAppRunningOnNodeEvent(container |
| .getApplicationAttemptId().getApplicationId(), container.nodeId)); |
| } |
| } |
| |
| private static final class ContainerAcquiredWhileRunningTransition extends |
| BaseTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| RMContainerUpdatesAcquiredEvent acquiredEvent = |
| (RMContainerUpdatesAcquiredEvent) event; |
| if (acquiredEvent.isIncreasedContainer()) { |
| // If container is increased but not started by AM, we will start |
| // containerAllocationExpirer for this container in this transition. |
| container.containerAllocationExpirer.register( |
| new AllocationExpirationInfo(event.getContainerId(), true)); |
| } |
| } |
| } |
| |
| private static final class NMReportedContainerChangeIsDoneTransition |
| extends BaseTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| RMContainerNMDoneChangeResourceEvent nmDoneChangeResourceEvent = |
| (RMContainerNMDoneChangeResourceEvent)event; |
| Resource rmContainerResource = container.getAllocatedResource(); |
| Resource nmContainerResource = |
| nmDoneChangeResourceEvent.getNMContainerResource(); |
| |
| if (Resources.equals(rmContainerResource, nmContainerResource)) { |
| // If rmContainerResource == nmContainerResource, the resource |
| // increase is confirmed. |
| // In this case: |
| // - Set the lastConfirmedResource as nmContainerResource |
| // - Unregister the allocation expirer |
| container.lastConfirmedResource = nmContainerResource; |
| container.containerAllocationExpirer.unregister( |
| new AllocationExpirationInfo(event.getContainerId())); |
| } else if (Resources.fitsIn(rmContainerResource, nmContainerResource)) { |
| // If rmContainerResource < nmContainerResource, this is caused by the |
| // following sequence: |
| // 1. AM asks for increase from 1G to 5G, and RM approves it |
| // 2. AM acquires the increase token and increases on NM |
| // 3. Before NM reports 5G to RM to confirm the increase, AM sends |
| // a decrease request to 4G, and RM approves it |
| // 4. When NM reports 5G to RM, RM now sees its own allocation as 4G |
| // In this cases: |
| // - Set the lastConfirmedResource as rmContainerResource |
| // - Unregister the allocation expirer |
| // - Notify NM to reduce its resource to rmContainerResource |
| container.lastConfirmedResource = rmContainerResource; |
| container.containerAllocationExpirer.unregister( |
| new AllocationExpirationInfo(event.getContainerId())); |
| container.eventHandler.handle(new RMNodeUpdateContainerEvent( |
| container.nodeId, |
| Collections.singletonMap(container.getContainer(), |
| ContainerUpdateType.DECREASE_RESOURCE))); |
| } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) { |
| // If nmContainerResource < rmContainerResource, this is caused by the |
| // following sequence: |
| // 1. AM asks for increase from 1G to 2G, and RM approves it |
| // 2. AM asks for increase from 2G to 4G, and RM approves it |
| // 3. AM only uses the 2G token to increase on NM, but never uses the |
| // 4G token |
| // 4. NM reports 2G to RM, but RM sees its own allocation as 4G |
| // In this case: |
| // - Set the lastConfirmedResource as the maximum of |
| // nmContainerResource and lastConfirmedResource |
| // - Do NOT unregister the allocation expirer |
| // When the increase allocation expires, resource will be rolled back to |
| // the last confirmed resource. |
| container.lastConfirmedResource = Resources.componentwiseMax( |
| nmContainerResource, container.lastConfirmedResource); |
| } else { |
| // Something wrong happened, kill the container |
| LOG.warn("Something wrong happened, container size reported by NM" |
| + " is not expected, ContainerID=" + container.getContainerId() |
| + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:" |
| + nmContainerResource); |
| container.eventHandler.handle(new RMNodeCleanContainerEvent( |
| container.nodeId, container.getContainerId())); |
| |
| } |
| } |
| } |
| |
| private static class FinishedTransition extends BaseTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; |
| |
| container.finishTime = System.currentTimeMillis(); |
| container.finishedStatus = finishedEvent.getRemoteContainerStatus(); |
| // Inform AppAttempt |
| // container.getContainer() can return null when a RMContainer is a |
| // reserved container |
| updateAttemptMetrics(container); |
| |
| container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( |
| container.appAttemptId, finishedEvent.getRemoteContainerStatus(), |
| container.getAllocatedNode())); |
| |
| container.rmContext.getRMApplicationHistoryWriter().containerFinished( |
| container); |
| |
| boolean saveNonAMContainerMetaInfo = |
| container.rmContext.getYarnConfiguration().getBoolean( |
| YarnConfiguration |
| .APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, |
| YarnConfiguration |
| .DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO); |
| |
| if (saveNonAMContainerMetaInfo || container.isAMContainer()) { |
| container.rmContext.getSystemMetricsPublisher().containerFinished( |
| container, container.finishTime); |
| } |
| |
| } |
| |
| private static void updateAttemptMetrics(RMContainerImpl container) { |
| // If this is a preempted container, update preemption metrics |
| Resource resource = container.getContainer().getResource(); |
| RMAppAttempt rmAttempt = container.rmContext.getRMApps() |
| .get(container.getApplicationAttemptId().getApplicationId()) |
| .getCurrentAppAttempt(); |
| |
| if (rmAttempt != null) { |
| long usedMillis = container.finishTime - container.creationTime; |
| rmAttempt.getRMAppAttemptMetrics() |
| .updateAggregateAppResourceUsage(resource, usedMillis); |
| // If this is a preempted container, update preemption metrics |
| if (ContainerExitStatus.PREEMPTED == container.finishedStatus |
| .getExitStatus()) { |
| rmAttempt.getRMAppAttemptMetrics() |
| .updatePreemptionInfo(resource, container); |
| rmAttempt.getRMAppAttemptMetrics() |
| .updateAggregatePreemptedAppResourceUsage(resource, usedMillis); |
| } |
| } |
| } |
| } |
| |
| private static final class KillTransition extends FinishedTransition { |
| |
| @Override |
| public void transition(RMContainerImpl container, RMContainerEvent event) { |
| |
| // Unregister from containerAllocationExpirer. |
| container.containerAllocationExpirer.unregister( |
| new AllocationExpirationInfo(container.getContainerId())); |
| |
| // Inform node |
| container.eventHandler.handle(new RMNodeCleanContainerEvent( |
| container.nodeId, container.getContainerId())); |
| |
| // Inform appAttempt |
| super.transition(container, event); |
| } |
| } |
| |
| @Override |
| public ContainerReport createContainerReport() { |
| this.readLock.lock(); |
| ContainerReport containerReport = null; |
| try { |
| containerReport = ContainerReport.newInstance(this.getContainerId(), |
| this.getAllocatedResource(), this.getAllocatedNode(), |
| this.getAllocatedSchedulerKey().getPriority(), this.getCreationTime(), |
| this.getFinishTime(), this.getDiagnosticsInfo(), this.getLogURL(), |
| this.getContainerExitStatus(), this.getContainerState(), |
| this.getNodeHttpAddress(), this.getExecutionType()); |
| } finally { |
| this.readLock.unlock(); |
| } |
| return containerReport; |
| } |
| |
| @Override |
| public String getNodeHttpAddress() { |
| try { |
| readLock.lock(); |
| if (container.getNodeHttpAddress() != null) { |
| StringBuilder httpAddress = new StringBuilder(); |
| httpAddress.append(WebAppUtils.getHttpSchemePrefix(rmContext |
| .getYarnConfiguration())); |
| httpAddress.append(container.getNodeHttpAddress()); |
| return httpAddress.toString(); |
| } else { |
| return null; |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public String getNodeLabelExpression() { |
| if (nodeLabelExpression == null) { |
| return RMNodeLabelsManager.NO_LABEL; |
| } |
| return nodeLabelExpression; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj instanceof RMContainer) { |
| if (null != getContainerId()) { |
| return getContainerId().equals(((RMContainer) obj).getContainerId()); |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| if (null != getContainerId()) { |
| return getContainerId().hashCode(); |
| } |
| return super.hashCode(); |
| } |
| |
| @Override |
| public int compareTo(RMContainer o) { |
| if (getContainerId() != null && o.getContainerId() != null) { |
| return getContainerId().compareTo(o.getContainerId()); |
| } |
| return -1; |
| } |
| |
| public void setQueueName(String queueName) { |
| this.queueName = queueName; |
| } |
| |
| @Override |
| public String getQueueName() { |
| return queueName; |
| } |
| |
| @Override |
| public ExecutionType getExecutionType() { |
| return container.getExecutionType(); |
| } |
| |
| @Override |
| public boolean isRemotelyAllocated() { |
| return isExternallyAllocated; |
| } |
| |
| @Override |
| public Resource getAllocatedOrReservedResource() { |
| try { |
| readLock.lock(); |
| if (getState().equals(RMContainerState.RESERVED)) { |
| return getReservedResource(); |
| } else { |
| return getAllocatedResource(); |
| } |
| } finally { |
| readLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void setContainerId(ContainerId containerId) { |
| // In some cases, for example, global scheduling. It is possible that |
| // container created without container-id assigned, so we will publish |
| // container creation event to timeline service when id assigned. |
| container.setId(containerId); |
| |
| if (containerId != null) { |
| rmContext.getRMApplicationHistoryWriter().containerStarted(this); |
| } |
| // If saveNonAMContainerMetaInfo is true, store system metrics for all |
| // containers. If false, and if this container is marked as the AM, metrics |
| // will still be published for this container, but that calculation happens |
| // later. |
| if (saveNonAMContainerMetaInfo && null != container.getId()) { |
| rmContext.getSystemMetricsPublisher().containerCreated( |
| this, this.creationTime); |
| } |
| } |
| } |