| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetSocketAddress; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.ha.HAServiceProtocol; |
| import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.metrics2.source.JvmMetrics; |
| import org.apache.hadoop.security.Groups; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authorize.ProxyUsers; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.service.CompositeService; |
| import org.apache.hadoop.service.Service; |
| import org.apache.hadoop.util.ExitUtil; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.ShutdownHookManager; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.conf.ConfigurationProvider; |
| import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; |
| import org.apache.hadoop.yarn.conf.HAUtil; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.event.AsyncDispatcher; |
| import org.apache.hadoop.yarn.event.Dispatcher; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; |
| import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; |
| import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; |
| import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; |
| import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; |
| import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; |
| import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; |
| import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; |
| import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; |
| import org.apache.hadoop.yarn.webapp.WebApp; |
| import org.apache.hadoop.yarn.webapp.WebApps; |
| import org.apache.hadoop.yarn.webapp.WebApps.Builder; |
| import org.apache.hadoop.yarn.webapp.util.WebAppUtils; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * The ResourceManager is the main class that is a set of components. |
| * "I am the ResourceManager. All your resources belong to us..." |
| * |
| */ |
| @SuppressWarnings("unchecked") |
| public class ResourceManager extends CompositeService implements Recoverable { |
| |
| /** |
| * Priority of the ResourceManager shutdown hook. |
| */ |
| public static final int SHUTDOWN_HOOK_PRIORITY = 30; |
| |
| private static final Log LOG = LogFactory.getLog(ResourceManager.class); |
| private static long clusterTimeStamp = System.currentTimeMillis(); |
| |
| /** |
| * "Always On" services. Services that need to run always irrespective of |
| * the HA state of the RM. |
| */ |
| @VisibleForTesting |
| protected RMContextImpl rmContext; |
| private Dispatcher rmDispatcher; |
| @VisibleForTesting |
| protected AdminService adminService; |
| |
| /** |
| * "Active" services. Services that need to run only on the Active RM. |
| * These services are managed (initialized, started, stopped) by the |
| * {@link CompositeService} RMActiveServices. |
| * |
| * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is |
| * in Active state. |
| */ |
| protected RMActiveServices activeServices; |
| protected RMSecretManagerService rmSecretManagerService; |
| |
| protected ResourceScheduler scheduler; |
| private ClientRMService clientRM; |
| protected ApplicationMasterService masterService; |
| protected NMLivelinessMonitor nmLivelinessMonitor; |
| protected NodesListManager nodesListManager; |
| protected RMAppManager rmAppManager; |
| protected ApplicationACLsManager applicationACLsManager; |
| protected QueueACLsManager queueACLsManager; |
| private WebApp webApp; |
| private AppReportFetcher fetcher = null; |
| protected ResourceTrackerService resourceTracker; |
| |
| private String webAppAddress; |
| private ConfigurationProvider configurationProvider = null; |
| /** End of Active services */ |
| |
| private Configuration conf; |
| |
| private UserGroupInformation rmLoginUGI; |
| |
| public ResourceManager() { |
| super("ResourceManager"); |
| } |
| |
| public RMContext getRMContext() { |
| return this.rmContext; |
| } |
| |
| public static long getClusterTimeStamp() { |
| return clusterTimeStamp; |
| } |
| |
| @VisibleForTesting |
| protected static void setClusterTimeStamp(long timestamp) { |
| clusterTimeStamp = timestamp; |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| this.conf = conf; |
| this.rmContext = new RMContextImpl(); |
| |
| this.configurationProvider = |
| ConfigurationProviderFactory.getConfigurationProvider(conf); |
| this.configurationProvider.init(this.conf); |
| rmContext.setConfigurationProvider(configurationProvider); |
| |
| // load core-site.xml |
| InputStream coreSiteXMLInputStream = |
| this.configurationProvider.getConfigurationInputStream(this.conf, |
| YarnConfiguration.CORE_SITE_CONFIGURATION_FILE); |
| if (coreSiteXMLInputStream != null) { |
| this.conf.addResource(coreSiteXMLInputStream); |
| } |
| |
| // Do refreshUserToGroupsMappings with loaded core-site.xml |
| Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(this.conf) |
| .refresh(); |
| |
| // Do refreshSuperUserGroupsConfiguration with loaded core-site.xml |
| ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf); |
| |
| // load yarn-site.xml |
| InputStream yarnSiteXMLInputStream = |
| this.configurationProvider.getConfigurationInputStream(this.conf, |
| YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); |
| if (yarnSiteXMLInputStream != null) { |
| this.conf.addResource(yarnSiteXMLInputStream); |
| } |
| |
| validateConfigs(this.conf); |
| |
| // register the handlers for all AlwaysOn services using setupDispatcher(). |
| rmDispatcher = setupDispatcher(); |
| addIfService(rmDispatcher); |
| rmContext.setDispatcher(rmDispatcher); |
| |
| adminService = createAdminService(); |
| addService(adminService); |
| rmContext.setRMAdminService(adminService); |
| |
| this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); |
| if (this.rmContext.isHAEnabled()) { |
| HAUtil.verifyAndSetConfiguration(this.conf); |
| } |
| createAndInitActiveServices(); |
| |
| webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf); |
| |
| this.rmLoginUGI = UserGroupInformation.getCurrentUser(); |
| |
| super.serviceInit(this.conf); |
| } |
| |
| protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, |
| Configuration conf) { |
| return new QueueACLsManager(scheduler, conf); |
| } |
| |
| @VisibleForTesting |
| protected void setRMStateStore(RMStateStore rmStore) { |
| rmStore.setRMDispatcher(rmDispatcher); |
| rmContext.setStateStore(rmStore); |
| } |
| |
| protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { |
| return new SchedulerEventDispatcher(this.scheduler); |
| } |
| |
| protected Dispatcher createDispatcher() { |
| return new AsyncDispatcher(); |
| } |
| |
| protected ResourceScheduler createScheduler() { |
| String schedulerClassName = conf.get(YarnConfiguration.RM_SCHEDULER, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER); |
| LOG.info("Using Scheduler: " + schedulerClassName); |
| try { |
| Class<?> schedulerClazz = Class.forName(schedulerClassName); |
| if (ResourceScheduler.class.isAssignableFrom(schedulerClazz)) { |
| return (ResourceScheduler) ReflectionUtils.newInstance(schedulerClazz, |
| this.conf); |
| } else { |
| throw new YarnRuntimeException("Class: " + schedulerClassName |
| + " not instance of " + ResourceScheduler.class.getCanonicalName()); |
| } |
| } catch (ClassNotFoundException e) { |
| throw new YarnRuntimeException("Could not instantiate Scheduler: " |
| + schedulerClassName, e); |
| } |
| } |
| |
| protected ApplicationMasterLauncher createAMLauncher() { |
| return new ApplicationMasterLauncher(this.rmContext); |
| } |
| |
| private NMLivelinessMonitor createNMLivelinessMonitor() { |
| return new NMLivelinessMonitor(this.rmContext |
| .getDispatcher()); |
| } |
| |
| protected AMLivelinessMonitor createAMLivelinessMonitor() { |
| return new AMLivelinessMonitor(this.rmDispatcher); |
| } |
| |
| protected DelegationTokenRenewer createDelegationTokenRenewer() { |
| return new DelegationTokenRenewer(); |
| } |
| |
| protected RMAppManager createRMAppManager() { |
| return new RMAppManager(this.rmContext, this.scheduler, this.masterService, |
| this.applicationACLsManager, this.conf); |
| } |
| |
| protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { |
| return new RMApplicationHistoryWriter(); |
| } |
| |
| // sanity check for configurations |
| protected static void validateConfigs(Configuration conf) { |
| // validate max-attempts |
| int globalMaxAppAttempts = |
| conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| if (globalMaxAppAttempts <= 0) { |
| throw new YarnRuntimeException("Invalid global max attempts configuration" |
| + ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS |
| + "=" + globalMaxAppAttempts + ", it should be a positive integer."); |
| } |
| |
| // validate expireIntvl >= heartbeatIntvl |
| long expireIntvl = conf.getLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); |
| long heartbeatIntvl = |
| conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); |
| if (expireIntvl < heartbeatIntvl) { |
| throw new YarnRuntimeException("Nodemanager expiry interval should be no" |
| + " less than heartbeat interval, " |
| + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS + "=" + expireIntvl |
| + ", " + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS + "=" |
| + heartbeatIntvl); |
| } |
| } |
| |
| /** |
| * RMActiveServices handles all the Active services in the RM. |
| */ |
| @Private |
| class RMActiveServices extends CompositeService { |
| |
| private DelegationTokenRenewer delegationTokenRenewer; |
| private EventHandler<SchedulerEvent> schedulerDispatcher; |
| private ApplicationMasterLauncher applicationMasterLauncher; |
| private ContainerAllocationExpirer containerAllocationExpirer; |
| |
| private boolean recoveryEnabled; |
| |
| RMActiveServices() { |
| super("RMActiveServices"); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration configuration) throws Exception { |
| conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); |
| |
| rmSecretManagerService = createRMSecretManagerService(); |
| addService(rmSecretManagerService); |
| |
| containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); |
| addService(containerAllocationExpirer); |
| rmContext.setContainerAllocationExpirer(containerAllocationExpirer); |
| |
| AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); |
| addService(amLivelinessMonitor); |
| rmContext.setAMLivelinessMonitor(amLivelinessMonitor); |
| |
| AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); |
| addService(amFinishingMonitor); |
| rmContext.setAMFinishingMonitor(amFinishingMonitor); |
| |
| boolean isRecoveryEnabled = conf.getBoolean( |
| YarnConfiguration.RECOVERY_ENABLED, |
| YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); |
| |
| RMStateStore rmStore = null; |
| if (isRecoveryEnabled) { |
| recoveryEnabled = true; |
| rmStore = RMStateStoreFactory.getStore(conf); |
| boolean isWorkPreservingRecoveryEnabled = |
| conf.getBoolean( |
| YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, |
| YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); |
| rmContext |
| .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); |
| } else { |
| recoveryEnabled = false; |
| rmStore = new NullRMStateStore(); |
| } |
| |
| try { |
| rmStore.init(conf); |
| rmStore.setRMDispatcher(rmDispatcher); |
| } catch (Exception e) { |
| // the Exception from stateStore.init() needs to be handled for |
| // HA and we need to give up master status if we got fenced |
| LOG.error("Failed to init state store", e); |
| throw e; |
| } |
| rmContext.setStateStore(rmStore); |
| |
| if (UserGroupInformation.isSecurityEnabled()) { |
| delegationTokenRenewer = createDelegationTokenRenewer(); |
| rmContext.setDelegationTokenRenewer(delegationTokenRenewer); |
| } |
| |
| RMApplicationHistoryWriter rmApplicationHistoryWriter = |
| createRMApplicationHistoryWriter(); |
| addService(rmApplicationHistoryWriter); |
| rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); |
| |
| // Register event handler for NodesListManager |
| nodesListManager = new NodesListManager(rmContext); |
| rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); |
| addService(nodesListManager); |
| rmContext.setNodesListManager(nodesListManager); |
| |
| // Initialize the scheduler |
| scheduler = createScheduler(); |
| scheduler.setRMContext(rmContext); |
| addIfService(scheduler); |
| rmContext.setScheduler(scheduler); |
| |
| schedulerDispatcher = createSchedulerEventDispatcher(); |
| addIfService(schedulerDispatcher); |
| rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); |
| |
| // Register event handler for RmAppEvents |
| rmDispatcher.register(RMAppEventType.class, |
| new ApplicationEventDispatcher(rmContext)); |
| |
| // Register event handler for RmAppAttemptEvents |
| rmDispatcher.register(RMAppAttemptEventType.class, |
| new ApplicationAttemptEventDispatcher(rmContext)); |
| |
| // Register event handler for RmNodes |
| rmDispatcher.register( |
| RMNodeEventType.class, new NodeEventDispatcher(rmContext)); |
| |
| nmLivelinessMonitor = createNMLivelinessMonitor(); |
| addService(nmLivelinessMonitor); |
| |
| resourceTracker = createResourceTrackerService(); |
| addService(resourceTracker); |
| rmContext.setResourceTrackerService(resourceTracker); |
| |
| DefaultMetricsSystem.initialize("ResourceManager"); |
| JvmMetrics.initSingleton("ResourceManager", null); |
| |
| // creating monitors that handle preemption |
| createPolicyMonitors(); |
| |
| masterService = createApplicationMasterService(); |
| addService(masterService) ; |
| rmContext.setApplicationMasterService(masterService); |
| |
| applicationACLsManager = new ApplicationACLsManager(conf); |
| |
| queueACLsManager = createQueueACLsManager(scheduler, conf); |
| |
| rmAppManager = createRMAppManager(); |
| // Register event handler for RMAppManagerEvents |
| rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); |
| |
| clientRM = createClientRMService(); |
| rmContext.setClientRMService(clientRM); |
| addService(clientRM); |
| rmContext.setClientRMService(clientRM); |
| |
| applicationMasterLauncher = createAMLauncher(); |
| rmDispatcher.register(AMLauncherEventType.class, |
| applicationMasterLauncher); |
| |
| addService(applicationMasterLauncher); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| addService(delegationTokenRenewer); |
| delegationTokenRenewer.setRMContext(rmContext); |
| } |
| |
| new RMNMInfo(rmContext, scheduler); |
| |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| RMStateStore rmStore = rmContext.getStateStore(); |
| // The state store needs to start irrespective of recoveryEnabled as apps |
| // need events to move to further states. |
| rmStore.start(); |
| |
| if(recoveryEnabled) { |
| try { |
| rmStore.checkVersion(); |
| RMState state = rmStore.loadState(); |
| recover(state); |
| } catch (Exception e) { |
| // the Exception from loadState() needs to be handled for |
| // HA and we need to give up master status if we got fenced |
| LOG.error("Failed to load/recover state", e); |
| throw e; |
| } |
| } |
| |
| super.serviceStart(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| |
| DefaultMetricsSystem.shutdown(); |
| |
| if (rmContext != null) { |
| RMStateStore store = rmContext.getStateStore(); |
| try { |
| store.close(); |
| } catch (Exception e) { |
| LOG.error("Error closing store.", e); |
| } |
| } |
| |
| super.serviceStop(); |
| } |
| |
| protected void createPolicyMonitors() { |
| if (scheduler instanceof PreemptableResourceScheduler |
| && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, |
| YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { |
| LOG.info("Loading policy monitors"); |
| List<SchedulingEditPolicy> policies = conf.getInstances( |
| YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, |
| SchedulingEditPolicy.class); |
| if (policies.size() > 0) { |
| rmDispatcher.register(ContainerPreemptEventType.class, |
| new RMContainerPreemptEventDispatcher( |
| (PreemptableResourceScheduler) scheduler)); |
| for (SchedulingEditPolicy policy : policies) { |
| LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); |
| policy.init(conf, rmContext.getDispatcher().getEventHandler(), |
| (PreemptableResourceScheduler) scheduler); |
| // periodically check whether we need to take action to guarantee |
| // constraints |
| SchedulingMonitor mon = new SchedulingMonitor(policy); |
| addService(mon); |
| } |
| } else { |
| LOG.warn("Policy monitors configured (" + |
| YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + |
| ") but none specified (" + |
| YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); |
| } |
| } |
| } |
| } |
| |
| @Private |
| public static class SchedulerEventDispatcher extends AbstractService |
| implements EventHandler<SchedulerEvent> { |
| |
| private final ResourceScheduler scheduler; |
| private final BlockingQueue<SchedulerEvent> eventQueue = |
| new LinkedBlockingQueue<SchedulerEvent>(); |
| private final Thread eventProcessor; |
| private volatile boolean stopped = false; |
| private boolean shouldExitOnError = false; |
| |
| public SchedulerEventDispatcher(ResourceScheduler scheduler) { |
| super(SchedulerEventDispatcher.class.getName()); |
| this.scheduler = scheduler; |
| this.eventProcessor = new Thread(new EventProcessor()); |
| this.eventProcessor.setName("ResourceManager Event Processor"); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| this.shouldExitOnError = |
| conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, |
| Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| this.eventProcessor.start(); |
| super.serviceStart(); |
| } |
| |
| private final class EventProcessor implements Runnable { |
| @Override |
| public void run() { |
| |
| SchedulerEvent event; |
| |
| while (!stopped && !Thread.currentThread().isInterrupted()) { |
| try { |
| event = eventQueue.take(); |
| } catch (InterruptedException e) { |
| LOG.error("Returning, interrupted : " + e); |
| return; // TODO: Kill RM. |
| } |
| |
| try { |
| scheduler.handle(event); |
| } catch (Throwable t) { |
| // An error occurred, but we are shutting down anyway. |
| // If it was an InterruptedException, the very act of |
| // shutdown could have caused it and is probably harmless. |
| if (stopped) { |
| LOG.warn("Exception during shutdown: ", t); |
| break; |
| } |
| LOG.fatal("Error in handling event type " + event.getType() |
| + " to the scheduler", t); |
| if (shouldExitOnError |
| && !ShutdownHookManager.get().isShutdownInProgress()) { |
| LOG.info("Exiting, bbye.."); |
| System.exit(-1); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| this.stopped = true; |
| this.eventProcessor.interrupt(); |
| try { |
| this.eventProcessor.join(); |
| } catch (InterruptedException e) { |
| throw new YarnRuntimeException(e); |
| } |
| super.serviceStop(); |
| } |
| |
| @Override |
| public void handle(SchedulerEvent event) { |
| try { |
| int qSize = eventQueue.size(); |
| if (qSize !=0 && qSize %1000 == 0) { |
| LOG.info("Size of scheduler event-queue is " + qSize); |
| } |
| int remCapacity = eventQueue.remainingCapacity(); |
| if (remCapacity < 1000) { |
| LOG.info("Very low remaining capacity on scheduler event queue: " |
| + remCapacity); |
| } |
| this.eventQueue.put(event); |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted. Trying to exit gracefully."); |
| } |
| } |
| } |
| |
| @Private |
| public static class RMFatalEventDispatcher |
| implements EventHandler<RMFatalEvent> { |
| private final RMContext rmContext; |
| private final ResourceManager rm; |
| |
| public RMFatalEventDispatcher( |
| RMContext rmContext, ResourceManager resourceManager) { |
| this.rmContext = rmContext; |
| this.rm = resourceManager; |
| } |
| |
| @Override |
| public void handle(RMFatalEvent event) { |
| LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + |
| event.getType().name() + ". Cause:\n" + event.getCause()); |
| |
| if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { |
| LOG.info("RMStateStore has been fenced"); |
| if (rmContext.isHAEnabled()) { |
| try { |
| // Transition to standby and reinit active services |
| LOG.info("Transitioning RM to Standby mode"); |
| rm.transitionToStandby(true); |
| rm.adminService.resetLeaderElection(); |
| return; |
| } catch (Exception e) { |
| LOG.fatal("Failed to transition RM to Standby mode."); |
| } |
| } |
| } |
| |
| ExitUtil.terminate(1, event.getCause()); |
| } |
| } |
| |
| @Private |
| public static final class ApplicationEventDispatcher implements |
| EventHandler<RMAppEvent> { |
| |
| private final RMContext rmContext; |
| |
| public ApplicationEventDispatcher(RMContext rmContext) { |
| this.rmContext = rmContext; |
| } |
| |
| @Override |
| public void handle(RMAppEvent event) { |
| ApplicationId appID = event.getApplicationId(); |
| RMApp rmApp = this.rmContext.getRMApps().get(appID); |
| if (rmApp != null) { |
| try { |
| rmApp.handle(event); |
| } catch (Throwable t) { |
| LOG.error("Error in handling event type " + event.getType() |
| + " for application " + appID, t); |
| } |
| } |
| } |
| } |
| |
| @Private |
| public static final class |
| RMContainerPreemptEventDispatcher |
| implements EventHandler<ContainerPreemptEvent> { |
| |
| private final PreemptableResourceScheduler scheduler; |
| |
| public RMContainerPreemptEventDispatcher( |
| PreemptableResourceScheduler scheduler) { |
| this.scheduler = scheduler; |
| } |
| |
| @Override |
| public void handle(ContainerPreemptEvent event) { |
| ApplicationAttemptId aid = event.getAppId(); |
| RMContainer container = event.getContainer(); |
| switch (event.getType()) { |
| case DROP_RESERVATION: |
| scheduler.dropContainerReservation(container); |
| break; |
| case PREEMPT_CONTAINER: |
| scheduler.preemptContainer(aid, container); |
| break; |
| case KILL_CONTAINER: |
| scheduler.killContainer(container); |
| break; |
| } |
| } |
| } |
| |
| @Private |
| public static final class ApplicationAttemptEventDispatcher implements |
| EventHandler<RMAppAttemptEvent> { |
| |
| private final RMContext rmContext; |
| |
| public ApplicationAttemptEventDispatcher(RMContext rmContext) { |
| this.rmContext = rmContext; |
| } |
| |
| @Override |
| public void handle(RMAppAttemptEvent event) { |
| ApplicationAttemptId appAttemptID = event.getApplicationAttemptId(); |
| ApplicationId appAttemptId = appAttemptID.getApplicationId(); |
| RMApp rmApp = this.rmContext.getRMApps().get(appAttemptId); |
| if (rmApp != null) { |
| RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptID); |
| if (rmAppAttempt != null) { |
| try { |
| rmAppAttempt.handle(event); |
| } catch (Throwable t) { |
| LOG.error("Error in handling event type " + event.getType() |
| + " for applicationAttempt " + appAttemptId, t); |
| } |
| } |
| } |
| } |
| } |
| |
| @Private |
| public static final class NodeEventDispatcher implements |
| EventHandler<RMNodeEvent> { |
| |
| private final RMContext rmContext; |
| |
| public NodeEventDispatcher(RMContext rmContext) { |
| this.rmContext = rmContext; |
| } |
| |
| @Override |
| public void handle(RMNodeEvent event) { |
| NodeId nodeId = event.getNodeId(); |
| RMNode node = this.rmContext.getRMNodes().get(nodeId); |
| if (node != null) { |
| try { |
| ((EventHandler<RMNodeEvent>) node).handle(event); |
| } catch (Throwable t) { |
| LOG.error("Error in handling event type " + event.getType() |
| + " for node " + nodeId, t); |
| } |
| } |
| } |
| } |
| |
| protected void startWepApp() { |
| Builder<ApplicationMasterService> builder = |
| WebApps |
| .$for("cluster", ApplicationMasterService.class, masterService, |
| "ws") |
| .with(conf) |
| .withHttpSpnegoPrincipalKey( |
| YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY) |
| .withHttpSpnegoKeytabKey( |
| YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) |
| .at(webAppAddress); |
| String proxyHostAndPort = WebAppUtils.getProxyHostAndPort(conf); |
| if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf). |
| equals(proxyHostAndPort)) { |
| if (HAUtil.isHAEnabled(conf)) { |
| fetcher = new AppReportFetcher(conf); |
| } else { |
| fetcher = new AppReportFetcher(conf, getClientRMService()); |
| } |
| builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME, |
| ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class); |
| builder.withAttribute(WebAppProxy.FETCHER_ATTRIBUTE, fetcher); |
| String[] proxyParts = proxyHostAndPort.split(":"); |
| builder.withAttribute(WebAppProxy.PROXY_HOST_ATTRIBUTE, proxyParts[0]); |
| |
| } |
| webApp = builder.start(new RMWebApp(this)); |
| } |
| |
| /** |
| * Helper method to create and init {@link #activeServices}. This creates an |
| * instance of {@link RMActiveServices} and initializes it. |
| * @throws Exception |
| */ |
| void createAndInitActiveServices() throws Exception { |
| activeServices = new RMActiveServices(); |
| activeServices.init(conf); |
| } |
| |
| /** |
| * Helper method to start {@link #activeServices}. |
| * @throws Exception |
| */ |
| void startActiveServices() throws Exception { |
| if (activeServices != null) { |
| clusterTimeStamp = System.currentTimeMillis(); |
| activeServices.start(); |
| } |
| } |
| |
| /** |
| * Helper method to stop {@link #activeServices}. |
| * @throws Exception |
| */ |
| void stopActiveServices() throws Exception { |
| if (activeServices != null) { |
| activeServices.stop(); |
| activeServices = null; |
| rmContext.getRMNodes().clear(); |
| rmContext.getInactiveRMNodes().clear(); |
| rmContext.getRMApps().clear(); |
| ClusterMetrics.destroy(); |
| QueueMetrics.clearQueueMetrics(); |
| } |
| } |
| |
| @VisibleForTesting |
| protected boolean areActiveServicesRunning() { |
| return activeServices != null && activeServices.isInState(STATE.STARTED); |
| } |
| |
| synchronized void transitionToActive() throws Exception { |
| if (rmContext.getHAServiceState() == |
| HAServiceProtocol.HAServiceState.ACTIVE) { |
| LOG.info("Already in active state"); |
| return; |
| } |
| |
| LOG.info("Transitioning to active state"); |
| |
| // use rmLoginUGI to startActiveServices. |
| // in non-secure model, rmLoginUGI will be current UGI |
| // in secure model, rmLoginUGI will be LoginUser UGI |
| this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| startActiveServices(); |
| return null; |
| } |
| }); |
| |
| rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); |
| LOG.info("Transitioned to active state"); |
| } |
| |
| synchronized void transitionToStandby(boolean initialize) |
| throws Exception { |
| if (rmContext.getHAServiceState() == |
| HAServiceProtocol.HAServiceState.STANDBY) { |
| LOG.info("Already in standby state"); |
| return; |
| } |
| |
| LOG.info("Transitioning to standby state"); |
| if (rmContext.getHAServiceState() == |
| HAServiceProtocol.HAServiceState.ACTIVE) { |
| stopActiveServices(); |
| if (initialize) { |
| resetDispatcher(); |
| createAndInitActiveServices(); |
| } |
| } |
| rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); |
| LOG.info("Transitioned to standby state"); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| try { |
| doSecureLogin(); |
| } catch(IOException ie) { |
| throw new YarnRuntimeException("Failed to login", ie); |
| } |
| |
| if (this.rmContext.isHAEnabled()) { |
| transitionToStandby(true); |
| } else { |
| transitionToActive(); |
| } |
| |
| startWepApp(); |
| if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { |
| int port = webApp.port(); |
| WebAppUtils.setRMWebAppPort(conf, port); |
| } |
| super.serviceStart(); |
| } |
| |
| protected void doSecureLogin() throws IOException { |
| InetSocketAddress socAddr = getBindAddress(conf); |
| SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB, |
| YarnConfiguration.RM_PRINCIPAL, socAddr.getHostName()); |
| |
| // if security is enable, set rmLoginUGI as UGI of loginUser |
| if (UserGroupInformation.isSecurityEnabled()) { |
| this.rmLoginUGI = UserGroupInformation.getLoginUser(); |
| } |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| if (webApp != null) { |
| webApp.stop(); |
| } |
| if (fetcher != null) { |
| fetcher.stop(); |
| } |
| if (configurationProvider != null) { |
| configurationProvider.close(); |
| } |
| super.serviceStop(); |
| transitionToStandby(false); |
| rmContext.setHAServiceState(HAServiceState.STOPPING); |
| } |
| |
| protected ResourceTrackerService createResourceTrackerService() { |
| return new ResourceTrackerService(this.rmContext, this.nodesListManager, |
| this.nmLivelinessMonitor, |
| this.rmContext.getContainerTokenSecretManager(), |
| this.rmContext.getNMTokenSecretManager()); |
| } |
| |
| protected ClientRMService createClientRMService() { |
| return new ClientRMService(this.rmContext, scheduler, this.rmAppManager, |
| this.applicationACLsManager, this.queueACLsManager, |
| this.rmContext.getRMDelegationTokenSecretManager()); |
| } |
| |
| protected ApplicationMasterService createApplicationMasterService() { |
| return new ApplicationMasterService(this.rmContext, scheduler); |
| } |
| |
| protected AdminService createAdminService() { |
| return new AdminService(this, rmContext); |
| } |
| |
| protected RMSecretManagerService createRMSecretManagerService() { |
| return new RMSecretManagerService(conf, rmContext); |
| } |
| |
| @Private |
| public ClientRMService getClientRMService() { |
| return this.clientRM; |
| } |
| |
| /** |
| * return the scheduler. |
| * @return the scheduler for the Resource Manager. |
| */ |
| @Private |
| public ResourceScheduler getResourceScheduler() { |
| return this.scheduler; |
| } |
| |
| /** |
| * return the resource tracking component. |
| * @return the resource tracking component. |
| */ |
| @Private |
| public ResourceTrackerService getResourceTrackerService() { |
| return this.resourceTracker; |
| } |
| |
| @Private |
| public ApplicationMasterService getApplicationMasterService() { |
| return this.masterService; |
| } |
| |
| @Private |
| public ApplicationACLsManager getApplicationACLsManager() { |
| return this.applicationACLsManager; |
| } |
| |
| @Private |
| public QueueACLsManager getQueueACLsManager() { |
| return this.queueACLsManager; |
| } |
| |
| @Private |
| WebApp getWebapp() { |
| return this.webApp; |
| } |
| |
| @Override |
| public void recover(RMState state) throws Exception { |
| // recover RMdelegationTokenSecretManager |
| rmContext.getRMDelegationTokenSecretManager().recover(state); |
| |
| // recover applications |
| rmAppManager.recover(state); |
| } |
| |
| public static void main(String argv[]) { |
| Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); |
| StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); |
| try { |
| Configuration conf = new YarnConfiguration(); |
| ResourceManager resourceManager = new ResourceManager(); |
| ShutdownHookManager.get().addShutdownHook( |
| new CompositeServiceShutdownHook(resourceManager), |
| SHUTDOWN_HOOK_PRIORITY); |
| resourceManager.init(conf); |
| resourceManager.start(); |
| } catch (Throwable t) { |
| LOG.fatal("Error starting ResourceManager", t); |
| System.exit(-1); |
| } |
| } |
| |
| /** |
| * Register the handlers for alwaysOn services |
| */ |
| private Dispatcher setupDispatcher() { |
| Dispatcher dispatcher = createDispatcher(); |
| dispatcher.register(RMFatalEventType.class, |
| new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); |
| return dispatcher; |
| } |
| |
| private void resetDispatcher() { |
| Dispatcher dispatcher = setupDispatcher(); |
| ((Service)dispatcher).init(this.conf); |
| ((Service)dispatcher).start(); |
| removeService((Service)rmDispatcher); |
| rmDispatcher = dispatcher; |
| addIfService(rmDispatcher); |
| rmContext.setDispatcher(rmDispatcher); |
| } |
| |
| /** |
| * Retrieve RM bind address from configuration |
| * |
| * @param conf |
| * @return InetSocketAddress |
| */ |
| public static InetSocketAddress getBindAddress(Configuration conf) { |
| return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); |
| } |
| } |