blob: 221e133d675b508e6789266ff8db5eb8691f382a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.heron.common.basics.DryRunFormatType;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.utils.logging.LoggingHelper;
import org.apache.heron.proto.system.ExecutionEnvironment;
import org.apache.heron.scheduler.client.ISchedulerClient;
import org.apache.heron.scheduler.client.SchedulerClientFactory;
import org.apache.heron.scheduler.dryrun.UpdateDryRunResponse;
import org.apache.heron.scheduler.utils.DryRunRenders;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.ConfigLoader;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.scheduler.SchedulerException;
import org.apache.heron.spi.statemgr.IStateManager;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.utils.ReflectionUtils;
import org.apache.heron.spi.utils.TMasterException;
public class RuntimeManagerMain {
private static final Logger LOG = Logger.getLogger(RuntimeManagerMain.class.getName());
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("RuntimeManagerMain", options);
}
// Construct all required command line options
private static Options constructOptions() {
Options options = new Options();
Option cluster = Option.builder("c")
.desc("Cluster name in which the topology needs to run on")
.longOpt("cluster")
.hasArgs()
.argName("cluster")
.required()
.build();
Option role = Option.builder("r")
.desc("Role under which the topology needs to run")
.longOpt("role")
.hasArgs()
.argName("role")
.required()
.build();
Option environment = Option.builder("e")
.desc("Environment under which the topology needs to run")
.longOpt("environment")
.hasArgs()
.argName("environment")
.required()
.build();
Option submitUser = Option.builder("s")
.desc("User submitting the topology")
.longOpt("submit_user")
.hasArgs()
.argName("submit userid")
.required()
.build();
Option topologyName = Option.builder("n")
.desc("Name of the topology")
.longOpt("topology_name")
.hasArgs()
.argName("topology name")
.required()
.build();
Option heronHome = Option.builder("d")
.desc("Directory where heron is installed")
.longOpt("heron_home")
.hasArgs()
.argName("heron home dir")
.required()
.build();
Option componentParallelism = Option.builder("a")
.desc("Component parallelism to update: <name>:<value>,<name>:<value>,...")
.longOpt("component_parallelism")
.hasArgs()
.argName("component parallelism")
.build();
Option containerNumber = Option.builder("cn")
.desc("Container Number for updation: <value>")
.longOpt("container_number")
.hasArgs()
.argName("container number")
.build();
Option runtimeConfig = Option.builder("rc")
.desc("Runtime config to update: [comp:]<name>:<value>,[comp:]<name>:<value>,...")
.longOpt("runtime_config")
.hasArgs()
.argName("runtime config")
.build();
Option configFile = Option.builder("p")
.desc("Path of the config files")
.longOpt("config_path")
.hasArgs()
.argName("config path")
.required()
.build();
Option configOverrides = Option.builder("o")
.desc("Command line override config path")
.longOpt("override_config_file")
.hasArgs()
.argName("override config file")
.build();
Option releaseFile = Option.builder("b")
.desc("Release file name")
.longOpt("release_file")
.hasArgs()
.argName("release information")
.build();
Option command = Option.builder("m")
.desc("Command to run")
.longOpt("command")
.hasArgs()
.required()
.argName("command to run")
.build();
Option containerId = Option.builder("i")
.desc("Container Id for restart command")
.longOpt("container_id")
.hasArgs()
.argName("container id")
.build();
Option dryRun = Option.builder("u")
.desc("run in dry-run mode")
.longOpt("dry_run")
.required(false)
.build();
Option dryRunFormat = Option.builder("t")
.desc("dry-run format")
.longOpt("dry_run_format")
.hasArg()
.required(false)
.build();
Option verbose = Option.builder("v")
.desc("Enable debug logs")
.longOpt("verbose")
.build();
options.addOption(cluster);
options.addOption(role);
options.addOption(environment);
options.addOption(submitUser);
options.addOption(topologyName);
options.addOption(configFile);
options.addOption(configOverrides);
options.addOption(releaseFile);
options.addOption(command);
options.addOption(heronHome);
options.addOption(containerId);
options.addOption(componentParallelism);
options.addOption(containerNumber);
options.addOption(runtimeConfig);
options.addOption(dryRun);
options.addOption(dryRunFormat);
options.addOption(verbose);
return options;
}
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
.longOpt("help")
.build();
options.addOption(help);
return options;
}
public static void main(String[] args)
throws ClassNotFoundException, IllegalAccessException,
InstantiationException, IOException, ParseException {
Options options = constructOptions();
Options helpOptions = constructHelpOptions();
CommandLineParser parser = new DefaultParser();
// parse the help options first.
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
usage(options);
return;
}
try {
// Now parse the required options
cmd = parser.parse(options, args);
} catch (ParseException e) {
usage(options);
throw new RuntimeException("Error parsing command line options: ", e);
}
Boolean verbose = false;
Level logLevel = Level.INFO;
if (cmd.hasOption("v")) {
logLevel = Level.ALL;
verbose = true;
}
// init log
LoggingHelper.loggerInit(logLevel, false);
String cluster = cmd.getOptionValue("cluster");
String role = cmd.getOptionValue("role");
String environ = cmd.getOptionValue("environment");
String submitUser = cmd.getOptionValue("submit_user");
String heronHome = cmd.getOptionValue("heron_home");
String configPath = cmd.getOptionValue("config_path");
String overrideConfigFile = cmd.getOptionValue("override_config_file");
String releaseFile = cmd.getOptionValue("release_file");
String topologyName = cmd.getOptionValue("topology_name");
String commandOption = cmd.getOptionValue("command");
// Optional argument in the case of restart
// TODO(karthik): convert into CLI
String containerId = Integer.toString(-1);
if (cmd.hasOption("container_id")) {
containerId = cmd.getOptionValue("container_id");
}
Boolean dryRun = false;
if (cmd.hasOption("u")) {
dryRun = true;
}
// Default dry-run output format type
DryRunFormatType dryRunFormat = DryRunFormatType.TABLE;
if (dryRun && cmd.hasOption("t")) {
String format = cmd.getOptionValue("dry_run_format");
dryRunFormat = DryRunFormatType.getDryRunFormatType(format);
LOG.fine(String.format("Running dry-run mode using format %s", format));
}
Command command = Command.makeCommand(commandOption);
// add config parameters from the command line
Config.Builder commandLineConfig = Config.newBuilder()
.put(Key.CLUSTER, cluster)
.put(Key.ROLE, role)
.put(Key.ENVIRON, environ)
.put(Key.SUBMIT_USER, submitUser)
.put(Key.DRY_RUN, dryRun)
.put(Key.DRY_RUN_FORMAT_TYPE, dryRunFormat)
.put(Key.VERBOSE, verbose)
.put(Key.TOPOLOGY_CONTAINER_ID, containerId);
// This is a command line option, but not a valid config key. Hence we don't use Keys
translateCommandLineConfig(cmd, commandLineConfig);
Config.Builder topologyConfig = Config.newBuilder()
.put(Key.TOPOLOGY_NAME, topologyName);
// build the final config by expanding all the variables
Config config = Config.toLocalMode(Config.newBuilder()
.putAll(ConfigLoader.loadConfig(heronHome, configPath, releaseFile, overrideConfigFile))
.putAll(commandLineConfig.build())
.putAll(topologyConfig.build())
.build());
LOG.fine("Static config loaded successfully ");
LOG.fine(config.toString());
/* Meaning of exit status code:
- status code = 0:
program exits without error
- 0 < status code < 100:
program fails to execute before program execution. For example,
JVM cannot find or load main class
- 100 <= status code < 200:
program fails to launch after program execution. For example,
topology definition file fails to be loaded
- status code == 200
program sends out dry-run response */
/* Since only stderr is used (by logging), we use stdout here to
propagate any message back to Python's executor.py (invoke site). */
// Create a new instance of RuntimeManagerMain
RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command);
try {
runtimeManagerMain.manageTopology();
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (UpdateDryRunResponse response) {
LOG.log(Level.FINE, "Sending out dry-run response");
// Output may contain UTF-8 characters, so we should print using UTF-8 encoding
PrintStream out = new PrintStream(System.out, true, StandardCharsets.UTF_8.name());
out.print(DryRunRenders.render(response, Context.dryRunFormatType(config)));
// SUPPRESS CHECKSTYLE RegexpSinglelineJava
// Exit with status code 200 to indicate dry-run response is sent out
System.exit(200);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
LOG.log(Level.FINE, "Exception when executing command " + commandOption, e);
System.out.println(e.getMessage());
// Exit with status code 100 to indicate that error has happened on user-land
// SUPPRESS CHECKSTYLE RegexpSinglelineJava
System.exit(100);
}
}
// holds all the config read
private final Config config;
// command to manage a topology
private final Command command;
public RuntimeManagerMain(
Config config,
Command command) {
// initialize the options
this.config = config;
this.command = command;
}
/**
* Manager a topology
* 1. Instantiate necessary resources
* 2. Valid whether the runtime management is legal
* 3. Complete the runtime management for a specific command
*/
public void manageTopology()
throws TopologyRuntimeManagementException, TMasterException, PackingException {
String topologyName = Context.topologyName(config);
// 1. Do prepare work
// create an instance of state manager
String statemgrClass = Context.stateManagerClass(config);
IStateManager statemgr;
try {
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new TopologyRuntimeManagementException(String.format(
"Failed to instantiate state manager class '%s'",
statemgrClass), e);
}
// Put it in a try block so that we can always clean resources
try {
// initialize the statemgr
statemgr.initialize(config);
// TODO(mfu): timeout should read from config
SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
boolean hasExecutionData = validateRuntimeManage(adaptor, topologyName);
// 2. Try to manage topology if valid
// invoke the appropriate command to manage the topology
LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command});
// build the runtime config
Config runtime = Config.newBuilder()
.put(Key.TOPOLOGY_NAME, Context.topologyName(config))
.put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor)
.build();
// Create a ISchedulerClient basing on the config
ISchedulerClient schedulerClient = getSchedulerClient(runtime);
callRuntimeManagerRunner(runtime, schedulerClient, !hasExecutionData);
} finally {
// 3. Do post work basing on the result
// Currently nothing to do here
// 4. Close the resources
SysUtils.closeIgnoringExceptions(statemgr);
}
}
/**
* Before continuing to the action logic, verify:
* - the topology is running
* - the information in execution state matches the request
* There is an edge case that the topology data could be only partially available,
* which could be caused by not fully successful SUBMIT or KILL command. In this
* case, we need to skip the validation and allow KILL command to go through.
* In case execution state data is available, environment check will be done anyway.
* @return true if the topology execution data is found, false otherwise.
*/
protected boolean validateRuntimeManage(
SchedulerStateManagerAdaptor adaptor,
String topologyName) throws TopologyRuntimeManagementException {
// Check whether the topology has already been running
Boolean isTopologyRunning = adaptor.isTopologyRunning(topologyName);
boolean hasExecutionData = isTopologyRunning != null && isTopologyRunning.equals(Boolean.TRUE);
if (!hasExecutionData) {
if (command == Command.KILL) {
LOG.warning(String.format("Topology '%s' is not found or not running", topologyName));
} else {
throw new TopologyRuntimeManagementException(
String.format("Topology '%s' does not exist", topologyName));
}
}
// Check whether cluster/role/environ matched if execution state data is available.
ExecutionEnvironment.ExecutionState executionState = adaptor.getExecutionState(topologyName);
if (executionState == null) {
if (command == Command.KILL) {
LOG.warning(String.format("Topology execution state for '%s' is not found", topologyName));
} else {
throw new TopologyRuntimeManagementException(
String.format("Failed to get execution state for topology %s", topologyName));
}
} else {
// Execution state is available, validate configurations.
validateExecutionState(topologyName, executionState);
}
return hasExecutionData;
}
/**
* Verify that the environment information in execution state matches the request
*/
protected void validateExecutionState(
String topologyName,
ExecutionEnvironment.ExecutionState executionState)
throws TopologyRuntimeManagementException {
String stateCluster = executionState.getCluster();
String stateRole = executionState.getRole();
String stateEnv = executionState.getEnviron();
String configCluster = Context.cluster(config);
String configRole = Context.role(config);
String configEnv = Context.environ(config);
if (!stateCluster.equals(configCluster)
|| !stateRole.equals(configRole)
|| !stateEnv.equals(configEnv)) {
String currentState = String.format("%s/%s/%s", stateCluster, stateRole, stateEnv);
String configState = String.format("%s/%s/%s", configCluster, configRole, configEnv);
throw new TopologyRuntimeManagementException(String.format(
"cluster/role/environ does not match. Topology '%s' is running at %s, not %s",
topologyName, currentState, configState));
}
}
protected void callRuntimeManagerRunner(
Config runtime,
ISchedulerClient schedulerClient,
boolean potentialStaleExecutionData)
throws TopologyRuntimeManagementException, TMasterException, PackingException {
// create an instance of the runner class
RuntimeManagerRunner runtimeManagerRunner =
new RuntimeManagerRunner(config, runtime, command, schedulerClient,
potentialStaleExecutionData);
// invoke the appropriate handlers based on command
runtimeManagerRunner.call();
}
protected ISchedulerClient getSchedulerClient(Config runtime)
throws SchedulerException {
return new SchedulerClientFactory(config, runtime).getSchedulerClient();
}
protected static void translateCommandLineConfig(CommandLine cmd, Config.Builder config) {
String componentParallelism = cmd.getOptionValue("component_parallelism");
if (componentParallelism != null && !componentParallelism.isEmpty()) {
config.put(
RuntimeManagerRunner.RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY, componentParallelism);
}
String containerNumber = cmd.getOptionValue("container_number");
if (containerNumber != null && !containerNumber.isEmpty()) {
config.put(
RuntimeManagerRunner.RUNTIME_MANAGER_CONTAINER_NUMBER_KEY, containerNumber);
}
String runtimeConfigurations = cmd.getOptionValue("runtime_config");
if (runtimeConfigurations != null && !runtimeConfigurations.isEmpty()) {
config.put(
RuntimeManagerRunner.RUNTIME_MANAGER_RUNTIME_CONFIG_KEY, runtimeConfigurations);
}
}
}