| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.heron.scheduler; |
| |
| import java.io.PrintStream; |
| import java.net.URI; |
| import java.nio.charset.StandardCharsets; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.DefaultParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.heron.api.generated.TopologyAPI; |
| import org.apache.heron.api.utils.TopologyUtils; |
| import org.apache.heron.common.basics.DryRunFormatType; |
| import org.apache.heron.common.basics.SysUtils; |
| import org.apache.heron.common.utils.logging.LoggingHelper; |
| import org.apache.heron.scheduler.dryrun.SubmitDryRunResponse; |
| import org.apache.heron.scheduler.utils.DryRunRenders; |
| import org.apache.heron.scheduler.utils.LauncherUtils; |
| import org.apache.heron.scheduler.utils.SubmitterUtils; |
| import org.apache.heron.spi.common.Config; |
| import org.apache.heron.spi.common.ConfigLoader; |
| import org.apache.heron.spi.common.Context; |
| import org.apache.heron.spi.common.Key; |
| import org.apache.heron.spi.packing.PackingException; |
| import org.apache.heron.spi.packing.PackingPlan; |
| import org.apache.heron.spi.scheduler.ILauncher; |
| import org.apache.heron.spi.scheduler.LauncherException; |
| import org.apache.heron.spi.statemgr.IStateManager; |
| import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor; |
| import org.apache.heron.spi.uploader.IUploader; |
| import org.apache.heron.spi.uploader.UploaderException; |
| import org.apache.heron.spi.utils.ReflectionUtils; |
| |
| /** |
| * Calls Uploader to upload topology package, and Launcher to launch Scheduler. |
| */ |
| public class SubmitterMain { |
| private static final Logger LOG = Logger.getLogger(SubmitterMain.class.getName()); |
| |
| /** |
| * Load the config parameters from the command line |
| * |
| * @param cluster name of the cluster |
| * @param role user role |
| * @param environ user provided environment/tag |
| * @param submitUser the submit user |
| * @param dryRun run as dry run |
| * @param dryRunFormat the dry run format |
| * @param verbose enable verbose logging |
| * @return config the command line config |
| */ |
| protected static Config commandLineConfigs(String cluster, |
| String role, |
| String environ, |
| String submitUser, |
| Boolean dryRun, |
| DryRunFormatType dryRunFormat, |
| Boolean verbose) { |
| return Config.newBuilder() |
| .put(Key.CLUSTER, cluster) |
| .put(Key.ROLE, role) |
| .put(Key.ENVIRON, environ) |
| .put(Key.SUBMIT_USER, submitUser) |
| .put(Key.DRY_RUN, dryRun) |
| .put(Key.DRY_RUN_FORMAT_TYPE, dryRunFormat) |
| .put(Key.VERBOSE, verbose) |
| .build(); |
| } |
| |
| // Print usage options |
| private static void usage(Options options) { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp("SubmitterMain", options); |
| } |
| |
| // Construct all required command line options |
| private static Options constructOptions() { |
| Options options = new Options(); |
| |
| Option cluster = Option.builder("c") |
| .desc("Cluster name in which the topology needs to run on") |
| .longOpt("cluster") |
| .hasArgs() |
| .argName("cluster") |
| .required() |
| .build(); |
| |
| Option role = Option.builder("r") |
| .desc("Role under which the topology needs to run") |
| .longOpt("role") |
| .hasArgs() |
| .argName("role") |
| .required() |
| .build(); |
| |
| Option environment = Option.builder("e") |
| .desc("Environment under which the topology needs to run") |
| .longOpt("environment") |
| .hasArgs() |
| .argName("environment") |
| .required() |
| .build(); |
| |
| Option submitUser = Option.builder("s") |
| .desc("User submitting the topology") |
| .longOpt("submit_user") |
| .hasArgs() |
| .argName("submit userid") |
| .required() |
| .build(); |
| |
| Option heronHome = Option.builder("d") |
| .desc("Directory where heron is installed") |
| .longOpt("heron_home") |
| .hasArgs() |
| .argName("heron home dir") |
| .required() |
| .build(); |
| |
| Option configFile = Option.builder("p") |
| .desc("Path of the config files") |
| .longOpt("config_path") |
| .hasArgs() |
| .argName("config path") |
| .required() |
| .build(); |
| |
| Option configOverrides = Option.builder("o") |
| .desc("Command line override config path") |
| .longOpt("override_config_file") |
| .hasArgs() |
| .argName("override config file") |
| .build(); |
| |
| Option releaseFile = Option.builder("b") |
| .desc("Release file name") |
| .longOpt("release_file") |
| .hasArgs() |
| .argName("release information") |
| .build(); |
| |
| Option topologyPackage = Option.builder("y") |
| .desc("tar ball containing user submitted jar/tar, defn and config") |
| .longOpt("topology_package") |
| .hasArgs() |
| .argName("topology package") |
| .required() |
| .build(); |
| |
| Option topologyDefn = Option.builder("f") |
| .desc("serialized file containing Topology protobuf") |
| .longOpt("topology_defn") |
| .hasArgs() |
| .argName("topology definition") |
| .required() |
| .build(); |
| |
| Option topologyJar = Option.builder("j") |
| .desc("The filename of the heron topology jar/tar/pex file to be run by the executor") |
| .longOpt("topology_bin") |
| .hasArgs() |
| .argName("topology binary filename on the cluster") |
| .required() |
| .build(); |
| |
| Option dryRun = Option.builder("u") |
| .desc("run in dry-run mode") |
| .longOpt("dry_run") |
| .required(false) |
| .build(); |
| |
| Option dryRunFormat = Option.builder("t") |
| .desc("dry-run format") |
| .longOpt("dry_run_format") |
| .hasArg() |
| .required(false) |
| .build(); |
| |
| Option verbose = Option.builder("v") |
| .desc("Enable debug logs") |
| .longOpt("verbose") |
| .build(); |
| |
| options.addOption(cluster); |
| options.addOption(role); |
| options.addOption(environment); |
| options.addOption(submitUser); |
| options.addOption(heronHome); |
| options.addOption(configFile); |
| options.addOption(configOverrides); |
| options.addOption(releaseFile); |
| options.addOption(topologyPackage); |
| options.addOption(topologyDefn); |
| options.addOption(topologyJar); |
| options.addOption(dryRun); |
| options.addOption(dryRunFormat); |
| options.addOption(verbose); |
| |
| return options; |
| } |
| |
| // construct command line help options |
| private static Options constructHelpOptions() { |
| Options options = new Options(); |
| Option help = Option.builder("h") |
| .desc("List all options and their description") |
| .longOpt("help") |
| .build(); |
| |
| options.addOption(help); |
| return options; |
| } |
| |
| private static boolean isVerbose(CommandLine cmd) { |
| return cmd.hasOption("v"); |
| } |
| |
| @SuppressWarnings("JavadocMethod") |
| @VisibleForTesting |
| public static Config loadConfig(CommandLine cmd, TopologyAPI.Topology topology) { |
| String cluster = cmd.getOptionValue("cluster"); |
| String role = cmd.getOptionValue("role"); |
| String environ = cmd.getOptionValue("environment"); |
| String submitUser = cmd.getOptionValue("submit_user"); |
| String heronHome = cmd.getOptionValue("heron_home"); |
| String configPath = cmd.getOptionValue("config_path"); |
| String overrideConfigFile = cmd.getOptionValue("override_config_file"); |
| String releaseFile = cmd.getOptionValue("release_file"); |
| String topologyPackage = cmd.getOptionValue("topology_package"); |
| String topologyDefnFile = cmd.getOptionValue("topology_defn"); |
| String topologyBinaryFile = cmd.getOptionValue("topology_bin"); |
| |
| Boolean dryRun = false; |
| if (cmd.hasOption("u")) { |
| dryRun = true; |
| } |
| |
| // Default dry-run output format type |
| DryRunFormatType dryRunFormat = DryRunFormatType.TABLE; |
| if (dryRun && cmd.hasOption("t")) { |
| String format = cmd.getOptionValue("dry_run_format"); |
| dryRunFormat = DryRunFormatType.getDryRunFormatType(format); |
| LOG.fine(String.format("Running dry-run mode using format %s", format)); |
| } |
| |
| // first load the defaults, then the config from files to override it |
| // next add config parameters from the command line |
| // load the topology configs |
| |
| // build the final config by expanding all the variables |
| return Config.toLocalMode(Config.newBuilder() |
| .putAll(ConfigLoader.loadConfig(heronHome, configPath, releaseFile, overrideConfigFile)) |
| .putAll(commandLineConfigs(cluster, role, environ, submitUser, dryRun, |
| dryRunFormat, isVerbose(cmd))) |
| .putAll(SubmitterUtils.topologyConfigs(topologyPackage, topologyBinaryFile, |
| topologyDefnFile, topology)) |
| .build()); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Options options = constructOptions(); |
| Options helpOptions = constructHelpOptions(); |
| CommandLineParser parser = new DefaultParser(); |
| // parse the help options first. |
| CommandLine cmd = parser.parse(helpOptions, args, true); |
| |
| if (cmd.hasOption("h")) { |
| usage(options); |
| return; |
| } |
| |
| try { |
| // Now parse the required options |
| cmd = parser.parse(options, args); |
| } catch (ParseException e) { |
| usage(options); |
| throw new RuntimeException("Error parsing command line options: ", e); |
| } |
| |
| Level logLevel = Level.INFO; |
| if (isVerbose(cmd)) { |
| logLevel = Level.ALL; |
| } |
| |
| // init log |
| LoggingHelper.loggerInit(logLevel, false); |
| |
| // load the topology definition into topology proto |
| TopologyAPI.Topology topology = TopologyUtils.getTopology(cmd.getOptionValue("topology_defn")); |
| Config config = loadConfig(cmd, topology); |
| |
| LOG.fine("Static config loaded successfully"); |
| LOG.fine(config.toString()); |
| |
| SubmitterMain submitterMain = new SubmitterMain(config, topology); |
| /* Meaning of exit status code: |
| - status code = 0: |
| program exits without error |
| - 0 < status code < 100: |
| program fails to execute before program execution. For example, |
| JVM cannot find or load main class |
| - 100 <= status code < 200: |
| program fails to launch after program execution. For example, |
| topology definition file fails to be loaded |
| - status code >= 200 |
| program sends out dry-run response */ |
| try { |
| submitterMain.submitTopology(); |
| } catch (SubmitDryRunResponse response) { |
| LOG.log(Level.FINE, "Sending out dry-run response"); |
| // Output may contain UTF-8 characters, so we should print using UTF-8 encoding |
| PrintStream out = new PrintStream(System.out, true, StandardCharsets.UTF_8.name()); |
| out.print(DryRunRenders.render(response, Context.dryRunFormatType(config))); |
| // Exit with status code 200 to indicate dry-run response is sent out |
| // SUPPRESS CHECKSTYLE RegexpSinglelineJava |
| System.exit(200); |
| // SUPPRESS CHECKSTYLE IllegalCatch |
| } catch (Exception e) { |
| /* Since only stderr is used (by logging), we use stdout here to |
| propagate error message back to Python's executor.py (invoke site). */ |
| LOG.log(Level.SEVERE, "Exception when submitting topology", e); |
| System.out.println(e.getMessage()); |
| // Exit with status code 100 to indicate that error has happened on user-land |
| // SUPPRESS CHECKSTYLE RegexpSinglelineJava |
| System.exit(100); |
| } |
| LOG.log(Level.FINE, "Topology {0} submitted successfully", topology.getName()); |
| } |
| |
| // holds all the config read |
| private final Config config; |
| |
| // topology definition |
| private final TopologyAPI.Topology topology; |
| |
| public SubmitterMain(Config config, TopologyAPI.Topology topology) { |
| // initialize the options |
| this.config = config; |
| this.topology = topology; |
| } |
| |
| /** |
| * Submit a topology |
| * 1. Instantiate necessary resources |
| * 2. Valid whether it is legal to submit a topology |
| * 3. Call LauncherRunner |
| * |
| */ |
| public void submitTopology() throws TopologySubmissionException { |
| // build primary runtime config first |
| Config primaryRuntime = Config.newBuilder() |
| .putAll(LauncherUtils.getInstance().createPrimaryRuntime(topology)).build(); |
| // call launcher directly here if in dry-run mode |
| if (Context.dryRun(config)) { |
| callLauncherRunner(primaryRuntime); |
| return; |
| } |
| // 1. Do prepare work |
| // create an instance of state manager |
| String statemgrClass = Context.stateManagerClass(config); |
| IStateManager statemgr; |
| |
| // Create an instance of the launcher class |
| String launcherClass = Context.launcherClass(config); |
| ILauncher launcher; |
| |
| // create an instance of the uploader class |
| String uploaderClass = Context.uploaderClass(config); |
| IUploader uploader; |
| |
| // create an instance of state manager |
| try { |
| statemgr = ReflectionUtils.newInstance(statemgrClass); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new TopologySubmissionException( |
| String.format("Failed to instantiate state manager class '%s'", statemgrClass), e); |
| } |
| |
| // create an instance of launcher |
| try { |
| launcher = ReflectionUtils.newInstance(launcherClass); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new LauncherException( |
| String.format("Failed to instantiate launcher class '%s'", launcherClass), e); |
| } |
| |
| // create an instance of uploader |
| try { |
| uploader = ReflectionUtils.newInstance(uploaderClass); |
| } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) { |
| throw new UploaderException( |
| String.format("Failed to instantiate uploader class '%s'", uploaderClass), e); |
| } |
| |
| // Put it in a try block so that we can always clean resources |
| try { |
| // initialize the state manager |
| statemgr.initialize(config); |
| |
| // initialize the uploader |
| uploader.initialize(config); |
| |
| // TODO(mfu): timeout should read from config |
| SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000); |
| |
| // Check if topology is already running |
| validateSubmit(adaptor, topology.getName()); |
| |
| LOG.log(Level.FINE, "Topology {0} to be submitted", topology.getName()); |
| |
| Config runtimeWithoutPackageURI = Config.newBuilder() |
| .putAll(primaryRuntime) |
| .putAll(LauncherUtils.getInstance().createAdaptorRuntime(adaptor)) |
| .put(Key.LAUNCHER_CLASS_INSTANCE, launcher) |
| .build(); |
| |
| PackingPlan packingPlan = LauncherUtils.getInstance() |
| .createPackingPlan(config, runtimeWithoutPackageURI); |
| |
| // The packing plan might call for a number of containers different than the config |
| // settings. If that's the case we need to modify the configs to match. |
| runtimeWithoutPackageURI = |
| updateNumContainersIfNeeded(runtimeWithoutPackageURI, topology, packingPlan); |
| |
| // If the packing plan is valid we will upload necessary packages |
| URI packageURI = uploadPackage(uploader); |
| |
| // Update the runtime config with the packageURI |
| Config runtimeAll = Config.newBuilder() |
| .putAll(runtimeWithoutPackageURI) |
| .put(Key.TOPOLOGY_PACKAGE_URI, packageURI) |
| .build(); |
| |
| callLauncherRunner(runtimeAll); |
| |
| } catch (LauncherException | PackingException e) { |
| // we undo uploading of topology package only if launcher fails to |
| // launch topology, which will throw LauncherException or PackingException |
| uploader.undo(); |
| throw e; |
| } finally { |
| SysUtils.closeIgnoringExceptions(uploader); |
| SysUtils.closeIgnoringExceptions(launcher); |
| SysUtils.closeIgnoringExceptions(statemgr); |
| } |
| } |
| |
| /** |
| * Checks that the number of containers specified in the topology matches the number of containers |
| * called for in the packing plan. If they are different, returns a new config with settings |
| * updated to align with the packing plan. The new config will include an updated |
| * Key.TOPOLOGY_DEFINITION containing a cloned Topology with it's settings also updated. |
| * |
| * @param initialConfig initial config to clone and update (if necessary) |
| * @param initialTopology topology to check and clone/update (if necessary) |
| * @param packingPlan packing plan to compare settings with |
| * @return a new Config cloned from initialConfig and modified as needed to align with packedPlan |
| */ |
| @VisibleForTesting |
| Config updateNumContainersIfNeeded(Config initialConfig, |
| TopologyAPI.Topology initialTopology, |
| PackingPlan packingPlan) { |
| |
| int configNumStreamManagers = TopologyUtils.getNumContainers(initialTopology); |
| int packingNumStreamManagers = packingPlan.getContainers().size(); |
| |
| if (configNumStreamManagers == packingNumStreamManagers) { |
| return initialConfig; |
| } |
| |
| Config.Builder newConfigBuilder = Config.newBuilder() |
| .putAll(initialConfig) |
| .put(Key.NUM_CONTAINERS, packingNumStreamManagers + 1) |
| .put(Key.TOPOLOGY_DEFINITION, |
| cloneWithNewNumContainers(initialTopology, packingNumStreamManagers)); |
| |
| String packingClass = Context.packingClass(config); |
| LOG.warning(String.format("The packing plan (generated by %s) calls for a different number of " |
| + "containers (%d) than what was explicitly set in the topology configs (%d). " |
| + "Overriding the configs to specify %d containers. When using %s do not explicitly " |
| + "call config.setNumStmgrs(..) or config.setNumWorkers(..).", |
| packingClass, packingNumStreamManagers, configNumStreamManagers, |
| packingNumStreamManagers, packingClass)); |
| |
| return newConfigBuilder.build(); |
| } |
| |
| private TopologyAPI.Topology cloneWithNewNumContainers(TopologyAPI.Topology initialTopology, |
| int numStreamManagers) { |
| TopologyAPI.Topology.Builder topologyBuilder = TopologyAPI.Topology.newBuilder(initialTopology); |
| TopologyAPI.Config.Builder configBuilder = TopologyAPI.Config.newBuilder(); |
| |
| for (TopologyAPI.Config.KeyValue keyValue : initialTopology.getTopologyConfig().getKvsList()) { |
| |
| // override TOPOLOGY_STMGRS value once we find it |
| if (org.apache.heron.api.Config.TOPOLOGY_STMGRS.equals(keyValue.getKey())) { |
| TopologyAPI.Config.KeyValue.Builder kvBuilder = TopologyAPI.Config.KeyValue.newBuilder(); |
| kvBuilder.setKey(keyValue.getKey()); |
| kvBuilder.setValue(Integer.toString(numStreamManagers)); |
| configBuilder.addKvs(kvBuilder.build()); |
| } else { |
| configBuilder.addKvs(keyValue); |
| } |
| } |
| |
| return topologyBuilder.setTopologyConfig(configBuilder).build(); |
| } |
| |
| protected void validateSubmit(SchedulerStateManagerAdaptor adaptor, String topologyName) |
| throws TopologySubmissionException { |
| // Check whether the topology has already been running |
| // TODO(rli): anti-pattern is too nested on this path to be refactored |
| Boolean isTopologyRunning = adaptor.isTopologyRunning(topologyName); |
| |
| if (isTopologyRunning != null && isTopologyRunning.equals(Boolean.TRUE)) { |
| throw new TopologySubmissionException( |
| String.format("Topology '%s' already exists", topologyName)); |
| } |
| } |
| |
| protected URI uploadPackage(IUploader uploader) throws UploaderException { |
| // upload the topology package to the storage |
| return uploader.uploadPackage(); |
| } |
| |
| protected void callLauncherRunner(Config runtime) |
| throws LauncherException, PackingException, SubmitDryRunResponse { |
| // using launch runner, launch the topology |
| LaunchRunner launchRunner = new LaunchRunner(config, runtime); |
| launchRunner.call(); |
| } |
| } |