blob: 027e6e97eb0801648c439cf650ac9608cb84fba1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Global Policy Generator (GPG) is a Yarn Federation component. By tuning the
* Federation policies in Federation State Store, GPG overlooks the entire
* federated cluster and ensures that the system is tuned and balanced all the
* time.
*
* The GPG operates continuously but out-of-band from all cluster operations,
* that allows to enforce global invariants, affect load balancing, trigger
* draining of sub-clusters that will undergo maintenance, etc.
*/
public class GlobalPolicyGenerator extends CompositeService {
public static final Logger LOG =
LoggerFactory.getLogger(GlobalPolicyGenerator.class);
// YARN Variables
private static CompositeServiceShutdownHook gpgShutdownHook;
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private AtomicBoolean isStopping = new AtomicBoolean(false);
private static final String METRICS_NAME = "Global Policy Generator";
private static long gpgStartupTime = System.currentTimeMillis();
// Federation Variables
private GPGContext gpgContext;
private RegistryOperations registry;
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
private ApplicationCleaner applicationCleaner;
private PolicyGenerator policyGenerator;
private String webAppAddress;
private JvmPauseMonitor pauseMonitor;
private WebApp webApp;
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
this.gpgContext = new GPGContextImpl();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Set up the context
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
this.gpgContext
.setPolicyFacade(new GPGPolicyFacade(
this.gpgContext.getStateStoreFacade(), conf));
this.registry = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.YARN_REGISTRY_CLASS,
YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
RegistryOperations.class);
this.registry.init(conf);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
FederationRegistryClient registryClient =
new FederationRegistryClient(conf, this.registry, user);
this.gpgContext.setRegistryClient(registryClient);
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.GPG_APPCLEANER_CLASS,
YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS,
ApplicationCleaner.class);
this.applicationCleaner.init(conf, this.gpgContext);
this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
DefaultMetricsSystem.initialize(METRICS_NAME);
JvmMetrics jm = JvmMetrics.initSingleton("GPG", null);
pauseMonitor = new JvmPauseMonitor();
addService(pauseMonitor);
jm.setPauseMonitor(pauseMonitor);
// super.serviceInit after all services are added
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
this.registry.start();
// Schedule SubClusterCleaner service
long scCleanerIntervalMs = getConfig().getLong(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
if (scCleanerIntervalMs > 0) {
this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
// Schedule ApplicationCleaner service
long appCleanerIntervalMs =
getConfig().getLong(YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS);
if (appCleanerIntervalMs > 0) {
this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
LOG.info("Scheduled application cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
}
// Schedule PolicyGenerator
long policyGeneratorIntervalMillis = getConfig().getLong(
YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS);
if(policyGeneratorIntervalMillis > 0){
this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS);
LOG.info("Scheduled policygenerator with interval: {}",
DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
}
startWepApp();
}
@Override
protected void serviceStop() throws Exception {
if (this.registry != null) {
this.registry.stop();
}
try {
if (this.scheduledExecutorService != null
&& !this.scheduledExecutorService.isShutdown()) {
this.scheduledExecutorService.shutdown();
LOG.info("Stopped ScheduledExecutorService");
}
} catch (Exception e) {
LOG.error("Failed to shutdown ScheduledExecutorService", e);
throw e;
}
if (this.isStopping.getAndSet(true)) {
return;
}
if (webApp != null) {
webApp.stop();
}
DefaultMetricsSystem.shutdown();
super.serviceStop();
}
public String getName() {
return "FederationGlobalPolicyGenerator";
}
public GPGContext getGPGContext() {
return this.gpgContext;
}
private void initAndStart(Configuration conf, boolean hasToReboot) {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != gpgShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
}
gpgShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
this.init(conf);
this.start();
}
@VisibleForTesting
public void startWepApp() {
boolean enableCors = getConfig().getBoolean(
YarnConfiguration.GPG_WEBAPP_ENABLE_CORS_FILTER,
YarnConfiguration.DEFAULT_GPG_WEBAPP_ENABLE_CORS_FILTER);
if (enableCors) {
getConfig().setBoolean(HttpCrossOriginFilterInitializer.PREFIX
+ HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);
}
// Always load pseudo authentication filter to parse "user.name" in an URL
// to identify a HTTP request's user.
boolean hasHadoopAuthFilterInitializer = false;
String filterInitializerConfKey = "hadoop.http.filter.initializers";
Class<?>[] initializersClasses = getConfig()
.getClasses(filterInitializerConfKey);
List<String> targets = new ArrayList<String>();
if (initializersClasses != null) {
for (Class<?> initializer : initializersClasses) {
if (initializer.getName()
.equals(AuthenticationFilterInitializer.class.getName())) {
hasHadoopAuthFilterInitializer = true;
break;
}
targets.add(initializer.getName());
}
}
if (!hasHadoopAuthFilterInitializer) {
targets.add(AuthenticationFilterInitializer.class.getName());
getConfig().set(filterInitializerConfKey, StringUtils.join(",", targets));
}
LOG.info("Instantiating GPGWebApp at " + webAppAddress);
GPGWebApp gpgWebApp = new GPGWebApp(this);
webApp = WebApps.$for("gpg").at(webAppAddress).start(gpgWebApp);
}
@SuppressWarnings("resource")
public static void startGPG(String[] argv, Configuration conf) {
boolean federationEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
try {
if (federationEnabled) {
Thread.setDefaultUncaughtExceptionHandler(
new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
LOG);
GlobalPolicyGenerator globalPolicyGenerator =
new GlobalPolicyGenerator();
globalPolicyGenerator.initAndStart(conf, false);
} else {
LOG.warn("Federation is not enabled. The gpg cannot start.");
}
} catch (Throwable t) {
LOG.error("Error starting globalpolicygenerator", t);
System.exit(-1);
}
}
public static long getGPGStartupTime() {
return gpgStartupTime;
}
public static void main(String[] argv) {
startGPG(argv, new YarnConfiguration());
}
}