| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.globalpolicygenerator; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.lang3.time.DurationFormatUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; |
| import org.apache.hadoop.metrics2.source.JvmMetrics; |
| import org.apache.hadoop.registry.client.api.RegistryOperations; |
| import org.apache.hadoop.security.AuthenticationFilterInitializer; |
| import org.apache.hadoop.security.HttpCrossOriginFilterInitializer; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.service.CompositeService; |
| import org.apache.hadoop.util.JvmPauseMonitor; |
| import org.apache.hadoop.util.ShutdownHookManager; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; |
| import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; |
| import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; |
| import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; |
| import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; |
| import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp; |
| import org.apache.hadoop.yarn.webapp.WebApp; |
| import org.apache.hadoop.yarn.webapp.WebApps; |
| import org.apache.hadoop.yarn.webapp.util.WebAppUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Global Policy Generator (GPG) is a Yarn Federation component. By tuning the |
| * Federation policies in Federation State Store, GPG overlooks the entire |
| * federated cluster and ensures that the system is tuned and balanced all the |
| * time. |
| * |
| * The GPG operates continuously but out-of-band from all cluster operations, |
| * that allows to enforce global invariants, affect load balancing, trigger |
| * draining of sub-clusters that will undergo maintenance, etc. |
| */ |
| public class GlobalPolicyGenerator extends CompositeService { |
| |
| public static final Logger LOG = |
| LoggerFactory.getLogger(GlobalPolicyGenerator.class); |
| |
| // YARN Variables |
| private static CompositeServiceShutdownHook gpgShutdownHook; |
| public static final int SHUTDOWN_HOOK_PRIORITY = 30; |
| private AtomicBoolean isStopping = new AtomicBoolean(false); |
| private static final String METRICS_NAME = "Global Policy Generator"; |
| private static long gpgStartupTime = System.currentTimeMillis(); |
| |
| // Federation Variables |
| private GPGContext gpgContext; |
| private RegistryOperations registry; |
| |
| // Scheduler service that runs tasks periodically |
| private ScheduledThreadPoolExecutor scheduledExecutorService; |
| private SubClusterCleaner subClusterCleaner; |
| private ApplicationCleaner applicationCleaner; |
| private PolicyGenerator policyGenerator; |
| private String webAppAddress; |
| private JvmPauseMonitor pauseMonitor; |
| private WebApp webApp; |
| |
| public GlobalPolicyGenerator() { |
| super(GlobalPolicyGenerator.class.getName()); |
| this.gpgContext = new GPGContextImpl(); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| // Set up the context |
| this.gpgContext |
| .setStateStoreFacade(FederationStateStoreFacade.getInstance()); |
| this.gpgContext |
| .setPolicyFacade(new GPGPolicyFacade( |
| this.gpgContext.getStateStoreFacade(), conf)); |
| |
| this.registry = FederationStateStoreFacade.createInstance(conf, |
| YarnConfiguration.YARN_REGISTRY_CLASS, |
| YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, |
| RegistryOperations.class); |
| this.registry.init(conf); |
| |
| UserGroupInformation user = UserGroupInformation.getCurrentUser(); |
| FederationRegistryClient registryClient = |
| new FederationRegistryClient(conf, this.registry, user); |
| this.gpgContext.setRegistryClient(registryClient); |
| |
| this.scheduledExecutorService = new ScheduledThreadPoolExecutor( |
| conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, |
| YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); |
| |
| this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); |
| |
| this.applicationCleaner = FederationStateStoreFacade.createInstance(conf, |
| YarnConfiguration.GPG_APPCLEANER_CLASS, |
| YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, |
| ApplicationCleaner.class); |
| this.applicationCleaner.init(conf, this.gpgContext); |
| |
| this.policyGenerator = new PolicyGenerator(conf, this.gpgContext); |
| |
| this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf); |
| |
| DefaultMetricsSystem.initialize(METRICS_NAME); |
| JvmMetrics jm = JvmMetrics.initSingleton("GPG", null); |
| pauseMonitor = new JvmPauseMonitor(); |
| addService(pauseMonitor); |
| jm.setPauseMonitor(pauseMonitor); |
| |
| // super.serviceInit after all services are added |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| super.serviceStart(); |
| |
| this.registry.start(); |
| |
| // Schedule SubClusterCleaner service |
| long scCleanerIntervalMs = getConfig().getLong( |
| YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); |
| if (scCleanerIntervalMs > 0) { |
| this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner, |
| 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS); |
| LOG.info("Scheduled sub-cluster cleaner with interval: {}", |
| DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); |
| } |
| |
| // Schedule ApplicationCleaner service |
| long appCleanerIntervalMs = |
| getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS); |
| if (appCleanerIntervalMs > 0) { |
| this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner, |
| 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS); |
| LOG.info("Scheduled application cleaner with interval: {}", |
| DurationFormatUtils.formatDurationISO(appCleanerIntervalMs)); |
| } |
| |
| // Schedule PolicyGenerator |
| long policyGeneratorIntervalMillis = getConfig().getLong( |
| YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS); |
| if(policyGeneratorIntervalMillis > 0){ |
| this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator, |
| 0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS); |
| LOG.info("Scheduled policygenerator with interval: {}", |
| DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis)); |
| } |
| startWepApp(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| if (this.registry != null) { |
| this.registry.stop(); |
| } |
| |
| try { |
| if (this.scheduledExecutorService != null |
| && !this.scheduledExecutorService.isShutdown()) { |
| this.scheduledExecutorService.shutdown(); |
| LOG.info("Stopped ScheduledExecutorService"); |
| } |
| } catch (Exception e) { |
| LOG.error("Failed to shutdown ScheduledExecutorService", e); |
| throw e; |
| } |
| |
| if (this.isStopping.getAndSet(true)) { |
| return; |
| } |
| if (webApp != null) { |
| webApp.stop(); |
| } |
| DefaultMetricsSystem.shutdown(); |
| super.serviceStop(); |
| } |
| |
| public String getName() { |
| return "FederationGlobalPolicyGenerator"; |
| } |
| |
| public GPGContext getGPGContext() { |
| return this.gpgContext; |
| } |
| |
| private void initAndStart(Configuration conf, boolean hasToReboot) { |
| // Remove the old hook if we are rebooting. |
| if (hasToReboot && null != gpgShutdownHook) { |
| ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook); |
| } |
| |
| gpgShutdownHook = new CompositeServiceShutdownHook(this); |
| ShutdownHookManager.get().addShutdownHook(gpgShutdownHook, |
| SHUTDOWN_HOOK_PRIORITY); |
| |
| this.init(conf); |
| this.start(); |
| } |
| |
| @VisibleForTesting |
| public void startWepApp() { |
| boolean enableCors = getConfig().getBoolean( |
| YarnConfiguration.GPG_WEBAPP_ENABLE_CORS_FILTER, |
| YarnConfiguration.DEFAULT_GPG_WEBAPP_ENABLE_CORS_FILTER); |
| if (enableCors) { |
| getConfig().setBoolean(HttpCrossOriginFilterInitializer.PREFIX |
| + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true); |
| } |
| |
| // Always load pseudo authentication filter to parse "user.name" in an URL |
| // to identify a HTTP request's user. |
| boolean hasHadoopAuthFilterInitializer = false; |
| String filterInitializerConfKey = "hadoop.http.filter.initializers"; |
| Class<?>[] initializersClasses = getConfig() |
| .getClasses(filterInitializerConfKey); |
| List<String> targets = new ArrayList<String>(); |
| if (initializersClasses != null) { |
| for (Class<?> initializer : initializersClasses) { |
| if (initializer.getName() |
| .equals(AuthenticationFilterInitializer.class.getName())) { |
| hasHadoopAuthFilterInitializer = true; |
| break; |
| } |
| targets.add(initializer.getName()); |
| } |
| } |
| if (!hasHadoopAuthFilterInitializer) { |
| targets.add(AuthenticationFilterInitializer.class.getName()); |
| getConfig().set(filterInitializerConfKey, StringUtils.join(",", targets)); |
| } |
| LOG.info("Instantiating GPGWebApp at " + webAppAddress); |
| GPGWebApp gpgWebApp = new GPGWebApp(this); |
| webApp = WebApps.$for("gpg").at(webAppAddress).start(gpgWebApp); |
| } |
| |
| @SuppressWarnings("resource") |
| public static void startGPG(String[] argv, Configuration conf) { |
| boolean federationEnabled = |
| conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED, |
| YarnConfiguration.DEFAULT_FEDERATION_ENABLED); |
| try { |
| if (federationEnabled) { |
| Thread.setDefaultUncaughtExceptionHandler( |
| new YarnUncaughtExceptionHandler()); |
| StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv, |
| LOG); |
| GlobalPolicyGenerator globalPolicyGenerator = |
| new GlobalPolicyGenerator(); |
| globalPolicyGenerator.initAndStart(conf, false); |
| } else { |
| LOG.warn("Federation is not enabled. The gpg cannot start."); |
| } |
| } catch (Throwable t) { |
| LOG.error("Error starting globalpolicygenerator", t); |
| System.exit(-1); |
| } |
| } |
| |
| public static long getGPGStartupTime() { |
| return gpgStartupTime; |
| } |
| |
| public static void main(String[] argv) { |
| startGPG(argv, new YarnConfiguration()); |
| } |
| } |