blob: a3a7e56e94e57c854d48f69d0f5e002e2aa164f6 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.healthmgr;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.name.Names;
import com.microsoft.dhalion.api.IHealthPolicy;
import com.microsoft.dhalion.api.MetricsProvider;
import com.microsoft.dhalion.events.EventManager;
import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.twitter.heron.classification.InterfaceStability.Evolving;
import com.twitter.heron.classification.InterfaceStability.Unstable;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.utils.logging.LoggingHelper;
import com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
import com.twitter.heron.healthmgr.common.PackingPlanProvider;
import com.twitter.heron.healthmgr.sensors.TrackerMetricsProvider;
import com.twitter.heron.scheduler.client.ISchedulerClient;
import com.twitter.heron.scheduler.client.SchedulerClientFactory;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.ConfigLoader;
import com.twitter.heron.spi.common.Context;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.statemgr.IStateManager;
import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import com.twitter.heron.spi.utils.ReflectionUtils;
/**
* {@link HealthManager} makes a topology dynamic and self-regulating. This is implemented using
* Dhalion library. The {@link HealthManager} will perform the following functions to achieve its
* goal:
* <ul>
* <li>loads heron configuration including health policy configuration from
* <code>healthmgr.yaml</code>
* <li>initializing guice injector with metrics collection module from <code>tracker</code> or
* <code>metrics cache</code>, <code>scheduler client</code> and <code>state client</code>
* <li>initializes health policies instances and starts policy execution using
* {@link PoliciesExecutor}
* </ul>
* The {@link HealthManager} is executed as a process. It is recommended that it is started on
* container 0, colocated with the metrics provider and the scheduler service.
* <p>
* Required command line options for the {@link HealthManager} include
* <ul>
* <li>cluster name: <code>-c local</code>
* <li>role: <code> -r dev</code>
* <li>environment: <code> -e default</code>
* <li>topology name: <code> -n AckingTopology</code>
* <p>
* </ul>
* <p>
* Optional command line options for the {@link HealthManager} include
* <ul>
* <li>health manager mode: <code> -m local</code>, default cluster
* <li>heron home directory: <code> -d ~/.heron</code>, required if mode is local
* <li>config directory: <code> -p ~/.heron/conf</code>, required if mode is local
* <li>metrics type: <code>-s f.q.class.name</code>,
* default: <code>com.twitter.heron.healthmgr.sensors.TrackerMetricsProvider</code>
* <li>metrics source: <code>-t http://host:port</code>, default: <code>http://127.0.0.1:8888</code>
* <li>enable verbose mode: <code> -v</code>
* </ul>
*/
@Unstable
@Evolving
public class HealthManager {
public static final String CONF_TOPOLOGY_NAME = "TOPOLOGY_NAME";
public static final String CONF_METRICS_SOURCE_URL = "METRICS_SOURCE_URL";
private static final String CONF_METRICS_SOURCE_TYPE = "METRICS_SOURCE_TYPE";
private static final Logger LOG = Logger.getLogger(HealthManager.class.getName());
private final Config config;
private AbstractModule baseModule;
private Config runtime;
private Injector injector;
private SchedulerStateManagerAdaptor stateMgrAdaptor;
private ISchedulerClient schedulerClient;
private List<IHealthPolicy> healthPolicies = new ArrayList<>();
private HealthPolicyConfigReader policyConfigReader;
public enum HealthManagerMode {
cluster,
local
}
private enum CliArgs {
CLUSTER("cluster"),
ROLE("role"),
ENVIRONMENT("environment"),
TOPOLOGY_NAME("topology_name"),
METRIC_SOURCE_URL("metric_source_url"),
METRIC_SOURCE_TYPE("metric_source_type"),
HERON_HOME("heron_home"),
CONFIG_PATH("config_path"),
MODE("mode"),
VERBOSE("verbose");
private String text;
CliArgs(String name) {
this.text = name;
}
}
public HealthManager(Config config, AbstractModule baseModule) {
this.config = config;
this.baseModule = baseModule;
}
public static void main(String[] args) throws Exception {
CommandLineParser parser = new DefaultParser();
Options slaManagerCliOptions = constructCliOptions();
// parse the help options first.
Options helpOptions = constructHelpOptions();
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
usage(slaManagerCliOptions);
return;
}
try {
cmd = parser.parse(slaManagerCliOptions, args);
} catch (ParseException e) {
usage(slaManagerCliOptions);
throw new RuntimeException("Error parsing command line options: ", e);
}
HealthManagerMode mode = HealthManagerMode.cluster;
if (hasOption(cmd, CliArgs.MODE)) {
mode = HealthManagerMode.valueOf(getOptionValue(cmd, CliArgs.MODE));
}
Config config;
switch (mode) {
case cluster:
config = Config.toClusterMode(Config.newBuilder()
.putAll(ConfigLoader.loadClusterConfig())
.putAll(commandLineConfigs(cmd))
.build());
break;
case local:
if (!hasOption(cmd, CliArgs.HERON_HOME) || !hasOption(cmd, CliArgs.CONFIG_PATH)) {
throw new IllegalArgumentException("Missing heron_home or config_path argument");
}
String heronHome = getOptionValue(cmd, CliArgs.HERON_HOME);
String configPath = getOptionValue(cmd, CliArgs.CONFIG_PATH);
config = Config.toLocalMode(Config.newBuilder()
.putAll(ConfigLoader.loadConfig(heronHome, configPath, null, null))
.putAll(commandLineConfigs(cmd))
.build());
break;
default:
throw new IllegalArgumentException("Invalid mode: " + getOptionValue(cmd, CliArgs.MODE));
}
setupLogging(cmd, config);
LOG.info("Static Heron config loaded successfully ");
LOG.fine(config.toString());
// load the default config value and override with any command line values
String metricSourceClassName = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_TYPE.key());
metricSourceClassName = getOptionValue(cmd, CliArgs.METRIC_SOURCE_TYPE, metricSourceClassName);
String metricsUrl = config.getStringValue(PolicyConfigKey.METRIC_SOURCE_URL.key());
metricsUrl = getOptionValue(cmd, CliArgs.METRIC_SOURCE_URL, metricsUrl);
AbstractModule module = buildMetricsProviderModule(metricsUrl, metricSourceClassName);
HealthManager healthManager = new HealthManager(config, module);
LOG.info("Initializing health manager");
healthManager.initialize();
LOG.info("Starting Health Manager");
PoliciesExecutor policyExecutor = new PoliciesExecutor(healthManager.healthPolicies);
ScheduledFuture<?> future = policyExecutor.start();
try {
future.get();
} finally {
policyExecutor.destroy();
}
}
private static void setupLogging(CommandLine cmd, Config config) throws IOException {
String systemConfigFilename = Context.systemConfigFile(config);
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFilename, true)
.build();
Boolean verbose = hasOption(cmd, CliArgs.VERBOSE);
Level loggingLevel = Level.INFO;
if (verbose) {
loggingLevel = Level.FINE;
}
String loggingDir = systemConfig.getHeronLoggingDirectory();
LoggingHelper.loggerInit(loggingLevel, true);
String fileName = String.format("%s-%s-%s", "heron", Context.topologyName(config), "healthmgr");
LoggingHelper.addLoggingHandler(
LoggingHelper.getFileHandler(fileName, loggingDir, true,
systemConfig.getHeronLoggingMaximumSize(),
systemConfig.getHeronLoggingMaximumFiles()));
LOG.info("Logging setup done.");
}
private static boolean hasOption(CommandLine cmd, CliArgs argName) {
return cmd.hasOption(argName.text);
}
private static String getOptionValue(CommandLine cmd, CliArgs argName) {
return cmd.getOptionValue(argName.text, null);
}
private static String getOptionValue(CommandLine cmd, CliArgs argName, String defaultValue) {
return cmd.getOptionValue(argName.text, defaultValue);
}
public void initialize() throws ReflectiveOperationException, FileNotFoundException {
injector = Guice.createInjector(baseModule);
stateMgrAdaptor = createStateMgrAdaptor();
this.runtime = Config.newBuilder()
.put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, stateMgrAdaptor)
.put(Key.TOPOLOGY_NAME, Context.topologyName(config))
.build();
this.schedulerClient = createSchedulerClient();
this.policyConfigReader = createPolicyConfigReader();
AbstractModule commonModule = buildCommonConfigModule();
injector = injector.createChildInjector(commonModule);
initializePolicies();
}
@SuppressWarnings("unchecked") // we don't know what T is until runtime
private void initializePolicies() throws ClassNotFoundException {
List<String> policyIds = policyConfigReader.getPolicyIds();
for (String policyId : policyIds) {
Map<String, Object> policyConfigMap = policyConfigReader.getPolicyConfig(policyId);
HealthPolicyConfig policyConfig = new HealthPolicyConfig(policyConfigMap);
String policyClassName = policyConfig.getPolicyClass();
LOG.info(String.format("Initializing %s with class %s", policyId, policyClassName));
Class<IHealthPolicy> policyClass
= (Class<IHealthPolicy>) this.getClass().getClassLoader().loadClass(policyClassName);
AbstractModule module = constructPolicySpecificModule(policyConfig);
IHealthPolicy policy = injector.createChildInjector(module).getInstance(policyClass);
healthPolicies.add(policy);
}
}
@VisibleForTesting
HealthPolicyConfigReader createPolicyConfigReader() throws FileNotFoundException {
String policyConfigFile
= Paths.get(Context.heronConf(config), PolicyConfigKey.CONF_FILE_NAME.key()).toString();
HealthPolicyConfigReader configReader = new HealthPolicyConfigReader(policyConfigFile);
configReader.loadConfig();
return configReader;
}
@VisibleForTesting
static AbstractModule buildMetricsProviderModule(final String sourceUrl, final String type) {
return new AbstractModule() {
@Override
protected void configure() {
bind(String.class)
.annotatedWith(Names.named(CONF_METRICS_SOURCE_URL))
.toInstance(sourceUrl);
bind(String.class)
.annotatedWith(Names.named(CONF_METRICS_SOURCE_TYPE))
.toInstance(type);
}
};
}
private AbstractModule buildCommonConfigModule() throws ReflectiveOperationException {
String metricSourceClassName
= injector.getInstance(
com.google.inject.Key.get(String.class, Names.named(CONF_METRICS_SOURCE_TYPE)));
Class<? extends MetricsProvider> metricsProviderClass =
Class.forName(metricSourceClassName).asSubclass(MetricsProvider.class);
return new AbstractModule() {
@Override
protected void configure() {
bind(String.class)
.annotatedWith(Names.named(CONF_TOPOLOGY_NAME))
.toInstance(Context.topologyName(config));
bind(String.class)
.annotatedWith(Names.named(TrackerMetricsProvider.CONF_CLUSTER))
.toInstance(Context.cluster(config));
bind(String.class)
.annotatedWith(Names.named(TrackerMetricsProvider.CONF_ENVIRON))
.toInstance(Context.environ(config));
bind(Config.class).toInstance(config);
bind(EventManager.class).in(Singleton.class);
bind(ISchedulerClient.class).toInstance(schedulerClient);
bind(SchedulerStateManagerAdaptor.class).toInstance(stateMgrAdaptor);
bind(PackingPlanProvider.class).in(Singleton.class);
bind(MetricsProvider.class).to(metricsProviderClass).in(Singleton.class);
}
};
}
private AbstractModule constructPolicySpecificModule(final HealthPolicyConfig policyConfig) {
return new AbstractModule() {
@Override
protected void configure() {
bind(HealthPolicyConfig.class).toInstance(policyConfig);
}
};
}
@VisibleForTesting
SchedulerStateManagerAdaptor createStateMgrAdaptor() throws ReflectiveOperationException {
String stateMgrClass = Context.stateManagerClass(config);
IStateManager stateMgr = ReflectionUtils.newInstance(stateMgrClass);
stateMgr.initialize(config);
return new SchedulerStateManagerAdaptor(stateMgr, 5000);
}
private ISchedulerClient createSchedulerClient() {
return new SchedulerClientFactory(config, runtime).getSchedulerClient();
}
/**
* Load the config parameters from the command line
*
* @param cmd command line options
* @return config, the command line config
*/
private static Config commandLineConfigs(CommandLine cmd) {
String cluster = getOptionValue(cmd, CliArgs.CLUSTER);
String role = getOptionValue(cmd, CliArgs.ROLE);
String environ = getOptionValue(cmd, CliArgs.ENVIRONMENT);
String topologyName = getOptionValue(cmd, CliArgs.TOPOLOGY_NAME);
Boolean verbose = hasOption(cmd, CliArgs.VERBOSE);
Config.Builder commandLineConfig = Config.newBuilder()
.put(Key.CLUSTER, cluster)
.put(Key.ROLE, role)
.put(Key.ENVIRON, environ)
.put(Key.TOPOLOGY_NAME, topologyName)
.put(Key.VERBOSE, verbose);
return commandLineConfig.build();
}
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(HealthManager.class.getSimpleName(), options);
}
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
.longOpt("help")
.build();
options.addOption(help);
return options;
}
// Construct all required command line options
private static Options constructCliOptions() {
Options options = new Options();
Option cluster = Option.builder("c")
.desc("Cluster name in which the topology needs to run on")
.longOpt(CliArgs.CLUSTER.text)
.hasArgs()
.argName(CliArgs.CLUSTER.text)
.required()
.build();
Option role = Option.builder("r")
.desc("Role under which the topology needs to run")
.longOpt(CliArgs.ROLE.text)
.hasArgs()
.argName(CliArgs.ROLE.text)
.required()
.build();
Option environment = Option.builder("e")
.desc("Environment under which the topology needs to run")
.longOpt(CliArgs.ENVIRONMENT.text)
.hasArgs()
.argName(CliArgs.ENVIRONMENT.text)
.build();
Option heronHome = Option.builder("d")
.desc("Directory where heron is installed")
.longOpt(CliArgs.HERON_HOME.text)
.hasArgs()
.argName("heron home dir")
.build();
Option configFile = Option.builder("p")
.desc("Path of the config files")
.longOpt(CliArgs.CONFIG_PATH.text)
.hasArgs()
.argName("config path")
.build();
Option topologyName = Option.builder("n")
.desc("Name of the topology")
.longOpt(CliArgs.TOPOLOGY_NAME.text)
.hasArgs()
.argName("topology name")
.required()
.build();
Option metricsSourceURL = Option.builder("t")
.desc("metrics data source url with port number")
.longOpt(CliArgs.METRIC_SOURCE_URL.text)
.hasArgs()
.argName("data source url")
.build();
// candidate metrics sources are:
// com.twitter.heron.healthmgr.sensors.TrackerMetricsProvider (default)
// com.twitter.heron.healthmgr.sensors.MetricsCacheMetricsProvider
Option metricsSourceType = Option.builder("s")
.desc("metrics data source type")
.longOpt(CliArgs.METRIC_SOURCE_TYPE.text)
.hasArg()
.argName("data source type")
.build();
// candidates:
// local: Health manager is started manually
// cluster: Health manager is started by executor on container 0 (default)
Option mode = Option.builder("m")
.desc("Health manager process mode, cluster or local")
.longOpt(CliArgs.MODE.text)
.hasArg()
.argName("process mode")
.build();
Option verbose = Option.builder("v")
.desc("Enable debug logs")
.longOpt(CliArgs.VERBOSE.text)
.build();
options.addOption(cluster);
options.addOption(role);
options.addOption(environment);
options.addOption(heronHome);
options.addOption(configFile);
options.addOption(topologyName);
options.addOption(metricsSourceType);
options.addOption(metricsSourceURL);
options.addOption(mode);
options.addOption(verbose);
return options;
}
@VisibleForTesting
List<IHealthPolicy> getHealthPolicies() {
return healthPolicies;
}
}