blob: 9dc594514997c9b95b67f981fe8eb4141ed5f58c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* The RMActiveServiceContext is the class that maintains <b>Active</b> service
* context. Services that need to run only on the Active RM. This is expected to
* be used only by RMContext.
*/
@Private
@Unstable
public class RMActiveServiceContext {
private static final Log LOG = LogFactory
.getLog(RMActiveServiceContext.class);
private final ConcurrentMap<ApplicationId, RMApp> applications =
new ConcurrentHashMap<ApplicationId, RMApp>();
private final ConcurrentMap<NodeId, RMNode> nodes =
new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<NodeId, RMNode> inactiveNodes =
new ConcurrentHashMap<NodeId, RMNode>();
private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
new ConcurrentHashMap<ApplicationId, ByteBuffer>();
private boolean isWorkPreservingRecoveryEnabled;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private ReservationSystem reservationSystem;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
private long epoch;
private Clock systemClock = SystemClock.getInstance();
private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true;
private boolean isSchedulerReady = false;
private PlacementManager queuePlacementManager = null;
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
}
@Private
@Unstable
public RMActiveServiceContext(Dispatcher rmDispatcher,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
ResourceScheduler scheduler) {
this();
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
this.setAMFinishingMonitor(amFinishingMonitor);
this.setDelegationTokenRenewer(delegationTokenRenewer);
this.setAMRMTokenSecretManager(appTokenSecretManager);
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setScheduler(scheduler);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
nullStore.init(new YarnConfiguration());
setStateStore(nullStore);
} catch (Exception e) {
assert false;
}
}
@Private
@Unstable
public void setStateStore(RMStateStore store) {
stateStore = store;
}
@Private
@Unstable
public ClientRMService getClientRMService() {
return clientRMService;
}
@Private
@Unstable
public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService;
}
@Private
@Unstable
public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService;
}
@Private
@Unstable
public RMStateStore getStateStore() {
return stateStore;
}
@Private
@Unstable
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return this.applications;
}
@Private
@Unstable
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return this.nodes;
}
@Private
@Unstable
public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
return this.inactiveNodes;
}
@Private
@Unstable
public ContainerAllocationExpirer getContainerAllocationExpirer() {
return this.containerAllocationExpirer;
}
@Private
@Unstable
public AMLivelinessMonitor getAMLivelinessMonitor() {
return this.amLivelinessMonitor;
}
@Private
@Unstable
public AMLivelinessMonitor getAMFinishingMonitor() {
return this.amFinishingMonitor;
}
@Private
@Unstable
public DelegationTokenRenewer getDelegationTokenRenewer() {
return delegationTokenRenewer;
}
@Private
@Unstable
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
return this.amRMTokenSecretManager;
}
@Private
@Unstable
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;
}
@Private
@Unstable
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
return this.nmTokenSecretManager;
}
@Private
@Unstable
public ResourceScheduler getScheduler() {
return this.scheduler;
}
@Private
@Unstable
public ReservationSystem getReservationSystem() {
return this.reservationSystem;
}
@Private
@Unstable
public NodesListManager getNodesListManager() {
return this.nodesListManager;
}
@Private
@Unstable
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
@Private
@Unstable
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
}
@Private
@Unstable
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
return this.rmDelegationTokenSecretManager;
}
@Private
@Unstable
public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
@Private
@Unstable
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer;
}
@Private
@Unstable
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor;
}
@Private
@Unstable
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor;
}
@Private
@Unstable
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
}
@Private
@Unstable
void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
this.nmTokenSecretManager = nmTokenSecretManager;
}
@Private
@Unstable
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
@Private
@Unstable
void setReservationSystem(ReservationSystem reservationSystem) {
this.reservationSystem = reservationSystem;
}
@Private
@Unstable
void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
this.delegationTokenRenewer = delegationTokenRenewer;
}
@Private
@Unstable
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
@Private
@Unstable
void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
this.amRMTokenSecretManager = amRMTokenSecretManager;
}
@Private
@Unstable
void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager;
}
@Private
@Unstable
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService;
}
@Private
@Unstable
void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
@Private
@Unstable
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
this.isWorkPreservingRecoveryEnabled = enabled;
}
@Private
@Unstable
public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled;
}
@Private
@Unstable
public long getEpoch() {
return this.epoch;
}
@Private
@Unstable
void setEpoch(long epoch) {
this.epoch = epoch;
}
@Private
@Unstable
public RMNodeLabelsManager getNodeLabelManager() {
return nodeLabelManager;
}
@Private
@Unstable
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
}
@Private
@Unstable
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return rmDelegatedNodeLabelsUpdater;
}
@Private
@Unstable
public void setRMDelegatedNodeLabelsUpdater(
RMDelegatedNodeLabelsUpdater nodeLablesUpdater) {
rmDelegatedNodeLabelsUpdater = nodeLablesUpdater;
}
@Private
@Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime();
this.schedulerRecoveryWaitTime = waitTime;
}
@Private
@Unstable
public boolean isSchedulerReadyForAllocatingContainers() {
if (isSchedulerReady) {
return isSchedulerReady;
}
isSchedulerReady =
(systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
if (!isSchedulerReady && printLog) {
LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
printLog = false;
}
if (isSchedulerReady) {
LOG.info("Scheduler recovery is done. Start allocating new containers.");
}
return isSchedulerReady;
}
@Private
@Unstable
public void setSystemClock(Clock clock) {
this.systemClock = clock;
}
@Private
@Unstable
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
return systemCredentials;
}
@Private
@Unstable
public PlacementManager getQueuePlacementManager() {
return queuePlacementManager;
}
@Private
@Unstable
public void setQueuePlacementManager(PlacementManager placementMgr) {
this.queuePlacementManager = placementMgr;
}
@Private
@Unstable
public void setRMAppLifetimeMonitor(
RMAppLifetimeMonitor lifetimeMonitor) {
this.rmAppLifetimeMonitor = lifetimeMonitor;
}
@Private
@Unstable
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.rmAppLifetimeMonitor;
}
@Private
@Unstable
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return this.queueLimitCalculator;
}
@Private
@Unstable
public void setContainerQueueLimitCalculator(
QueueLimitCalculator limitCalculator) {
this.queueLimitCalculator = limitCalculator;
}
}