| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.heron.scheduler; |
| |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.DefaultParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.heron.common.basics.DryRunFormatType; |
| import org.apache.heron.common.basics.SysUtils; |
| import org.apache.heron.common.utils.logging.LoggingHelper; |
| import org.apache.heron.proto.system.ExecutionEnvironment; |
| import org.apache.heron.scheduler.client.ISchedulerClient; |
| import org.apache.heron.scheduler.client.SchedulerClientFactory; |
| import org.apache.heron.scheduler.dryrun.UpdateDryRunResponse; |
| import org.apache.heron.scheduler.utils.DryRunRenders; |
| import org.apache.heron.spi.common.Config; |
| import org.apache.heron.spi.common.ConfigLoader; |
| import org.apache.heron.spi.common.Context; |
| import org.apache.heron.spi.common.Key; |
| import org.apache.heron.spi.packing.PackingException; |
| import org.apache.heron.spi.scheduler.SchedulerException; |
| import org.apache.heron.spi.statemgr.IStateManager; |
| import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor; |
| import org.apache.heron.spi.utils.ReflectionUtils; |
| import org.apache.heron.spi.utils.TMasterException; |
| |
| public class RuntimeManagerMain { |
| private static final Logger LOG = Logger.getLogger(RuntimeManagerMain.class.getName()); |
| |
| // Print usage options |
| private static void usage(Options options) { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp("RuntimeManagerMain", options); |
| } |
| |
| // Construct all required command line options |
| private static Options constructOptions() { |
| Options options = new Options(); |
| |
| Option cluster = Option.builder("c") |
| .desc("Cluster name in which the topology needs to run on") |
| .longOpt("cluster") |
| .hasArgs() |
| .argName("cluster") |
| .required() |
| .build(); |
| |
| Option role = Option.builder("r") |
| .desc("Role under which the topology needs to run") |
| .longOpt("role") |
| .hasArgs() |
| .argName("role") |
| .required() |
| .build(); |
| |
| Option environment = Option.builder("e") |
| .desc("Environment under which the topology needs to run") |
| .longOpt("environment") |
| .hasArgs() |
| .argName("environment") |
| .required() |
| .build(); |
| |
| Option submitUser = Option.builder("s") |
| .desc("User submitting the topology") |
| .longOpt("submit_user") |
| .hasArgs() |
| .argName("submit userid") |
| .required() |
| .build(); |
| |
| Option topologyName = Option.builder("n") |
| .desc("Name of the topology") |
| .longOpt("topology_name") |
| .hasArgs() |
| .argName("topology name") |
| .required() |
| .build(); |
| |
| Option heronHome = Option.builder("d") |
| .desc("Directory where heron is installed") |
| .longOpt("heron_home") |
| .hasArgs() |
| .argName("heron home dir") |
| .required() |
| .build(); |
| |
| Option componentParallelism = Option.builder("a") |
| .desc("Component parallelism to update: <name>:<value>,<name>:<value>,...") |
| .longOpt("component_parallelism") |
| .hasArgs() |
| .argName("component parallelism") |
| .build(); |
| |
| Option containerNumber = Option.builder("cn") |
| .desc("Container Number for updation: <value>") |
| .longOpt("container_number") |
| .hasArgs() |
| .argName("container number") |
| .build(); |
| |
| Option runtimeConfig = Option.builder("rc") |
| .desc("Runtime config to update: [comp:]<name>:<value>,[comp:]<name>:<value>,...") |
| .longOpt("runtime_config") |
| .hasArgs() |
| .argName("runtime config") |
| .build(); |
| |
| Option configFile = Option.builder("p") |
| .desc("Path of the config files") |
| .longOpt("config_path") |
| .hasArgs() |
| .argName("config path") |
| .required() |
| .build(); |
| |
| Option configOverrides = Option.builder("o") |
| .desc("Command line override config path") |
| .longOpt("override_config_file") |
| .hasArgs() |
| .argName("override config file") |
| .build(); |
| |
| Option releaseFile = Option.builder("b") |
| .desc("Release file name") |
| .longOpt("release_file") |
| .hasArgs() |
| .argName("release information") |
| .build(); |
| |
| Option command = Option.builder("m") |
| .desc("Command to run") |
| .longOpt("command") |
| .hasArgs() |
| .required() |
| .argName("command to run") |
| .build(); |
| |
| Option containerId = Option.builder("i") |
| .desc("Container Id for restart command") |
| .longOpt("container_id") |
| .hasArgs() |
| .argName("container id") |
| .build(); |
| |
| Option dryRun = Option.builder("u") |
| .desc("run in dry-run mode") |
| .longOpt("dry_run") |
| .required(false) |
| .build(); |
| |
| Option dryRunFormat = Option.builder("t") |
| .desc("dry-run format") |
| .longOpt("dry_run_format") |
| .hasArg() |
| .required(false) |
| .build(); |
| |
| Option verbose = Option.builder("v") |
| .desc("Enable debug logs") |
| .longOpt("verbose") |
| .build(); |
| |
| options.addOption(cluster); |
| options.addOption(role); |
| options.addOption(environment); |
| options.addOption(submitUser); |
| options.addOption(topologyName); |
| options.addOption(configFile); |
| options.addOption(configOverrides); |
| options.addOption(releaseFile); |
| options.addOption(command); |
| options.addOption(heronHome); |
| options.addOption(containerId); |
| options.addOption(componentParallelism); |
| options.addOption(containerNumber); |
| options.addOption(runtimeConfig); |
| options.addOption(dryRun); |
| options.addOption(dryRunFormat); |
| options.addOption(verbose); |
| |
| return options; |
| } |
| |
| // construct command line help options |
| private static Options constructHelpOptions() { |
| Options options = new Options(); |
| Option help = Option.builder("h") |
| .desc("List all options and their description") |
| .longOpt("help") |
| .build(); |
| |
| options.addOption(help); |
| return options; |
| } |
| |
| public static void main(String[] args) |
| throws ClassNotFoundException, IllegalAccessException, |
| InstantiationException, IOException, ParseException { |
| |
| Options options = constructOptions(); |
| Options helpOptions = constructHelpOptions(); |
| CommandLineParser parser = new DefaultParser(); |
| // parse the help options first. |
| CommandLine cmd = parser.parse(helpOptions, args, true); |
| |
| if (cmd.hasOption("h")) { |
| usage(options); |
| return; |
| } |
| |
| try { |
| // Now parse the required options |
| cmd = parser.parse(options, args); |
| } catch (ParseException e) { |
| usage(options); |
| throw new RuntimeException("Error parsing command line options: ", e); |
| } |
| |
| Boolean verbose = false; |
| Level logLevel = Level.INFO; |
| if (cmd.hasOption("v")) { |
| logLevel = Level.ALL; |
| verbose = true; |
| } |
| |
| // init log |
| LoggingHelper.loggerInit(logLevel, false); |
| |
| String cluster = cmd.getOptionValue("cluster"); |
| String role = cmd.getOptionValue("role"); |
| String environ = cmd.getOptionValue("environment"); |
| String submitUser = cmd.getOptionValue("submit_user"); |
| String heronHome = cmd.getOptionValue("heron_home"); |
| String configPath = cmd.getOptionValue("config_path"); |
| String overrideConfigFile = cmd.getOptionValue("override_config_file"); |
| String releaseFile = cmd.getOptionValue("release_file"); |
| String topologyName = cmd.getOptionValue("topology_name"); |
| String commandOption = cmd.getOptionValue("command"); |
| |
| // Optional argument in the case of restart |
| // TODO(karthik): convert into CLI |
| String containerId = Integer.toString(-1); |
| if (cmd.hasOption("container_id")) { |
| containerId = cmd.getOptionValue("container_id"); |
| } |
| |
| Boolean dryRun = false; |
| if (cmd.hasOption("u")) { |
| dryRun = true; |
| } |
| |
| // Default dry-run output format type |
| DryRunFormatType dryRunFormat = DryRunFormatType.TABLE; |
| if (dryRun && cmd.hasOption("t")) { |
| String format = cmd.getOptionValue("dry_run_format"); |
| dryRunFormat = DryRunFormatType.getDryRunFormatType(format); |
| LOG.fine(String.format("Running dry-run mode using format %s", format)); |
| } |
| |
| Command command = Command.makeCommand(commandOption); |
| |
| // add config parameters from the command line |
| Config.Builder commandLineConfig = Config.newBuilder() |
| .put(Key.CLUSTER, cluster) |
| .put(Key.ROLE, role) |
| .put(Key.ENVIRON, environ) |
| .put(Key.SUBMIT_USER, submitUser) |
| .put(Key.DRY_RUN, dryRun) |
| .put(Key.DRY_RUN_FORMAT_TYPE, dryRunFormat) |
| .put(Key.VERBOSE, verbose) |
| .put(Key.TOPOLOGY_CONTAINER_ID, containerId); |
| |
| // This is a command line option, but not a valid config key. Hence we don't use Keys |
| translateCommandLineConfig(cmd, commandLineConfig); |
| |
| Config.Builder topologyConfig = Config.newBuilder() |
| .put(Key.TOPOLOGY_NAME, topologyName); |
| |
| // build the final config by expanding all the variables |
| Config config = Config.toLocalMode(Config.newBuilder() |
| .putAll(ConfigLoader.loadConfig(heronHome, configPath, releaseFile, overrideConfigFile)) |
| .putAll(commandLineConfig.build()) |
| .putAll(topologyConfig.build()) |
| .build()); |
| |
| LOG.fine("Static config loaded successfully "); |
| LOG.fine(config.toString()); |
| |
| /* Meaning of exit status code: |
| - status code = 0: |
| program exits without error |
| - 0 < status code < 100: |
| program fails to execute before program execution. For example, |
| JVM cannot find or load main class |
| - 100 <= status code < 200: |
| program fails to launch after program execution. For example, |
| topology definition file fails to be loaded |
| - status code == 200 |
| program sends out dry-run response */ |
| /* Since only stderr is used (by logging), we use stdout here to |
| propagate any message back to Python's executor.py (invoke site). */ |
| // Create a new instance of RuntimeManagerMain |
| RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command); |
| try { |
| runtimeManagerMain.manageTopology(); |
| // SUPPRESS CHECKSTYLE IllegalCatch |
| } catch (UpdateDryRunResponse response) { |
| LOG.log(Level.FINE, "Sending out dry-run response"); |
| // Output may contain UTF-8 characters, so we should print using UTF-8 encoding |
| PrintStream out = new PrintStream(System.out, true, StandardCharsets.UTF_8.name()); |
| out.print(DryRunRenders.render(response, Context.dryRunFormatType(config))); |
| // SUPPRESS CHECKSTYLE RegexpSinglelineJava |
| // Exit with status code 200 to indicate dry-run response is sent out |
| System.exit(200); |
| // SUPPRESS CHECKSTYLE IllegalCatch |
| } catch (Exception e) { |
| LOG.log(Level.FINE, "Exception when executing command " + commandOption, e); |
| System.out.println(e.getMessage()); |
| // Exit with status code 100 to indicate that error has happened on user-land |
| // SUPPRESS CHECKSTYLE RegexpSinglelineJava |
| System.exit(100); |
| } |
| } |
| |
| // holds all the config read |
| private final Config config; |
| |
| // command to manage a topology |
| private final Command command; |
| |
| public RuntimeManagerMain( |
| Config config, |
| Command command) { |
| // initialize the options |
| this.config = config; |
| this.command = command; |
| } |
| |
| /** |
| * Manager a topology |
| * 1. Instantiate necessary resources |
| * 2. Valid whether the runtime management is legal |
| * 3. Complete the runtime management for a specific command |
| */ |
| public void manageTopology() |
| throws TopologyRuntimeManagementException, TMasterException, PackingException { |
| String topologyName = Context.topologyName(config); |
| // 1. Do prepare work |
| // create an instance of state manager |
| String statemgrClass = Context.stateManagerClass(config); |
| IStateManager statemgr; |
| try { |
| statemgr = ReflectionUtils.newInstance(statemgrClass); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new TopologyRuntimeManagementException(String.format( |
| "Failed to instantiate state manager class '%s'", |
| statemgrClass), e); |
| } |
| |
| // Put it in a try block so that we can always clean resources |
| try { |
| // initialize the statemgr |
| statemgr.initialize(config); |
| |
| // TODO(mfu): timeout should read from config |
| SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000); |
| |
| boolean hasExecutionData = validateRuntimeManage(adaptor, topologyName); |
| |
| // 2. Try to manage topology if valid |
| // invoke the appropriate command to manage the topology |
| LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command}); |
| |
| // build the runtime config |
| Config runtime = Config.newBuilder() |
| .put(Key.TOPOLOGY_NAME, Context.topologyName(config)) |
| .put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor) |
| .build(); |
| |
| // Create a ISchedulerClient basing on the config |
| ISchedulerClient schedulerClient = getSchedulerClient(runtime); |
| |
| callRuntimeManagerRunner(runtime, schedulerClient, !hasExecutionData); |
| } finally { |
| // 3. Do post work basing on the result |
| // Currently nothing to do here |
| |
| // 4. Close the resources |
| SysUtils.closeIgnoringExceptions(statemgr); |
| } |
| } |
| |
| /** |
| * Before continuing to the action logic, verify: |
| * - the topology is running |
| * - the information in execution state matches the request |
| * There is an edge case that the topology data could be only partially available, |
| * which could be caused by not fully successful SUBMIT or KILL command. In this |
| * case, we need to skip the validation and allow KILL command to go through. |
| * In case execution state data is available, environment check will be done anyway. |
| * @return true if the topology execution data is found, false otherwise. |
| */ |
| protected boolean validateRuntimeManage( |
| SchedulerStateManagerAdaptor adaptor, |
| String topologyName) throws TopologyRuntimeManagementException { |
| // Check whether the topology has already been running |
| Boolean isTopologyRunning = adaptor.isTopologyRunning(topologyName); |
| boolean hasExecutionData = isTopologyRunning != null && isTopologyRunning.equals(Boolean.TRUE); |
| if (!hasExecutionData) { |
| if (command == Command.KILL) { |
| LOG.warning(String.format("Topology '%s' is not found or not running", topologyName)); |
| } else { |
| throw new TopologyRuntimeManagementException( |
| String.format("Topology '%s' does not exist", topologyName)); |
| } |
| } |
| |
| // Check whether cluster/role/environ matched if execution state data is available. |
| ExecutionEnvironment.ExecutionState executionState = adaptor.getExecutionState(topologyName); |
| if (executionState == null) { |
| if (command == Command.KILL) { |
| LOG.warning(String.format("Topology execution state for '%s' is not found", topologyName)); |
| } else { |
| throw new TopologyRuntimeManagementException( |
| String.format("Failed to get execution state for topology %s", topologyName)); |
| } |
| } else { |
| // Execution state is available, validate configurations. |
| validateExecutionState(topologyName, executionState); |
| } |
| return hasExecutionData; |
| } |
| |
| /** |
| * Verify that the environment information in execution state matches the request |
| */ |
| protected void validateExecutionState( |
| String topologyName, |
| ExecutionEnvironment.ExecutionState executionState) |
| throws TopologyRuntimeManagementException { |
| String stateCluster = executionState.getCluster(); |
| String stateRole = executionState.getRole(); |
| String stateEnv = executionState.getEnviron(); |
| String configCluster = Context.cluster(config); |
| String configRole = Context.role(config); |
| String configEnv = Context.environ(config); |
| |
| if (!stateCluster.equals(configCluster) |
| || !stateRole.equals(configRole) |
| || !stateEnv.equals(configEnv)) { |
| String currentState = String.format("%s/%s/%s", stateCluster, stateRole, stateEnv); |
| String configState = String.format("%s/%s/%s", configCluster, configRole, configEnv); |
| throw new TopologyRuntimeManagementException(String.format( |
| "cluster/role/environ does not match. Topology '%s' is running at %s, not %s", |
| topologyName, currentState, configState)); |
| } |
| } |
| |
| protected void callRuntimeManagerRunner( |
| Config runtime, |
| ISchedulerClient schedulerClient, |
| boolean potentialStaleExecutionData) |
| throws TopologyRuntimeManagementException, TMasterException, PackingException { |
| // create an instance of the runner class |
| RuntimeManagerRunner runtimeManagerRunner = |
| new RuntimeManagerRunner(config, runtime, command, schedulerClient, |
| potentialStaleExecutionData); |
| |
| // invoke the appropriate handlers based on command |
| runtimeManagerRunner.call(); |
| } |
| |
| protected ISchedulerClient getSchedulerClient(Config runtime) |
| throws SchedulerException { |
| return new SchedulerClientFactory(config, runtime).getSchedulerClient(); |
| } |
| |
| protected static void translateCommandLineConfig(CommandLine cmd, Config.Builder config) { |
| String componentParallelism = cmd.getOptionValue("component_parallelism"); |
| if (componentParallelism != null && !componentParallelism.isEmpty()) { |
| config.put( |
| RuntimeManagerRunner.RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY, componentParallelism); |
| } |
| |
| String containerNumber = cmd.getOptionValue("container_number"); |
| if (containerNumber != null && !containerNumber.isEmpty()) { |
| config.put( |
| RuntimeManagerRunner.RUNTIME_MANAGER_CONTAINER_NUMBER_KEY, containerNumber); |
| } |
| |
| String runtimeConfigurations = cmd.getOptionValue("runtime_config"); |
| if (runtimeConfigurations != null && !runtimeConfigurations.isEmpty()) { |
| config.put( |
| RuntimeManagerRunner.RUNTIME_MANAGER_RUNTIME_CONFIG_KEY, runtimeConfigurations); |
| } |
| } |
| } |