blob: 649550534b828948fd802347d778cd34508288bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler;
import java.io.PrintStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.DryRunFormatType;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.utils.logging.LoggingHelper;
import org.apache.heron.scheduler.dryrun.SubmitDryRunResponse;
import org.apache.heron.scheduler.utils.DryRunRenders;
import org.apache.heron.scheduler.utils.LauncherUtils;
import org.apache.heron.scheduler.utils.SubmitterUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.ConfigLoader;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.scheduler.ILauncher;
import org.apache.heron.spi.scheduler.LauncherException;
import org.apache.heron.spi.statemgr.IStateManager;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.uploader.IUploader;
import org.apache.heron.spi.uploader.UploaderException;
import org.apache.heron.spi.utils.ReflectionUtils;
/**
* Calls Uploader to upload topology package, and Launcher to launch Scheduler.
*/
public class SubmitterMain {
private static final Logger LOG = Logger.getLogger(SubmitterMain.class.getName());
/**
* Load the config parameters from the command line
*
* @param cluster name of the cluster
* @param role user role
* @param environ user provided environment/tag
* @param submitUser the submit user
* @param dryRun run as dry run
* @param dryRunFormat the dry run format
* @param verbose enable verbose logging
* @return config the command line config
*/
protected static Config commandLineConfigs(String cluster,
String role,
String environ,
String submitUser,
Boolean dryRun,
DryRunFormatType dryRunFormat,
Boolean verbose) {
return Config.newBuilder()
.put(Key.CLUSTER, cluster)
.put(Key.ROLE, role)
.put(Key.ENVIRON, environ)
.put(Key.SUBMIT_USER, submitUser)
.put(Key.DRY_RUN, dryRun)
.put(Key.DRY_RUN_FORMAT_TYPE, dryRunFormat)
.put(Key.VERBOSE, verbose)
.build();
}
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("SubmitterMain", options);
}
// Construct all required command line options
private static Options constructOptions() {
Options options = new Options();
Option cluster = Option.builder("c")
.desc("Cluster name in which the topology needs to run on")
.longOpt("cluster")
.hasArgs()
.argName("cluster")
.required()
.build();
Option role = Option.builder("r")
.desc("Role under which the topology needs to run")
.longOpt("role")
.hasArgs()
.argName("role")
.required()
.build();
Option environment = Option.builder("e")
.desc("Environment under which the topology needs to run")
.longOpt("environment")
.hasArgs()
.argName("environment")
.required()
.build();
Option submitUser = Option.builder("s")
.desc("User submitting the topology")
.longOpt("submit_user")
.hasArgs()
.argName("submit userid")
.required()
.build();
Option heronHome = Option.builder("d")
.desc("Directory where heron is installed")
.longOpt("heron_home")
.hasArgs()
.argName("heron home dir")
.required()
.build();
Option configFile = Option.builder("p")
.desc("Path of the config files")
.longOpt("config_path")
.hasArgs()
.argName("config path")
.required()
.build();
Option configOverrides = Option.builder("o")
.desc("Command line override config path")
.longOpt("override_config_file")
.hasArgs()
.argName("override config file")
.build();
Option releaseFile = Option.builder("b")
.desc("Release file name")
.longOpt("release_file")
.hasArgs()
.argName("release information")
.build();
Option topologyPackage = Option.builder("y")
.desc("tar ball containing user submitted jar/tar, defn and config")
.longOpt("topology_package")
.hasArgs()
.argName("topology package")
.required()
.build();
Option topologyDefn = Option.builder("f")
.desc("serialized file containing Topology protobuf")
.longOpt("topology_defn")
.hasArgs()
.argName("topology definition")
.required()
.build();
Option topologyJar = Option.builder("j")
.desc("The filename of the heron topology jar/tar/pex file to be run by the executor")
.longOpt("topology_bin")
.hasArgs()
.argName("topology binary filename on the cluster")
.required()
.build();
Option dryRun = Option.builder("u")
.desc("run in dry-run mode")
.longOpt("dry_run")
.required(false)
.build();
Option dryRunFormat = Option.builder("t")
.desc("dry-run format")
.longOpt("dry_run_format")
.hasArg()
.required(false)
.build();
Option verbose = Option.builder("v")
.desc("Enable debug logs")
.longOpt("verbose")
.build();
options.addOption(cluster);
options.addOption(role);
options.addOption(environment);
options.addOption(submitUser);
options.addOption(heronHome);
options.addOption(configFile);
options.addOption(configOverrides);
options.addOption(releaseFile);
options.addOption(topologyPackage);
options.addOption(topologyDefn);
options.addOption(topologyJar);
options.addOption(dryRun);
options.addOption(dryRunFormat);
options.addOption(verbose);
return options;
}
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
.longOpt("help")
.build();
options.addOption(help);
return options;
}
private static boolean isVerbose(CommandLine cmd) {
return cmd.hasOption("v");
}
@SuppressWarnings("JavadocMethod")
@VisibleForTesting
public static Config loadConfig(CommandLine cmd, TopologyAPI.Topology topology) {
String cluster = cmd.getOptionValue("cluster");
String role = cmd.getOptionValue("role");
String environ = cmd.getOptionValue("environment");
String submitUser = cmd.getOptionValue("submit_user");
String heronHome = cmd.getOptionValue("heron_home");
String configPath = cmd.getOptionValue("config_path");
String overrideConfigFile = cmd.getOptionValue("override_config_file");
String releaseFile = cmd.getOptionValue("release_file");
String topologyPackage = cmd.getOptionValue("topology_package");
String topologyDefnFile = cmd.getOptionValue("topology_defn");
String topologyBinaryFile = cmd.getOptionValue("topology_bin");
Boolean dryRun = false;
if (cmd.hasOption("u")) {
dryRun = true;
}
// Default dry-run output format type
DryRunFormatType dryRunFormat = DryRunFormatType.TABLE;
if (dryRun && cmd.hasOption("t")) {
String format = cmd.getOptionValue("dry_run_format");
dryRunFormat = DryRunFormatType.getDryRunFormatType(format);
LOG.fine(String.format("Running dry-run mode using format %s", format));
}
// first load the defaults, then the config from files to override it
// next add config parameters from the command line
// load the topology configs
// build the final config by expanding all the variables
return Config.toLocalMode(Config.newBuilder()
.putAll(ConfigLoader.loadConfig(heronHome, configPath, releaseFile, overrideConfigFile))
.putAll(commandLineConfigs(cluster, role, environ, submitUser, dryRun,
dryRunFormat, isVerbose(cmd)))
.putAll(SubmitterUtils.topologyConfigs(topologyPackage, topologyBinaryFile,
topologyDefnFile, topology))
.build());
}
public static void main(String[] args) throws Exception {
Options options = constructOptions();
Options helpOptions = constructHelpOptions();
CommandLineParser parser = new DefaultParser();
// parse the help options first.
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
usage(options);
return;
}
try {
// Now parse the required options
cmd = parser.parse(options, args);
} catch (ParseException e) {
usage(options);
throw new RuntimeException("Error parsing command line options: ", e);
}
Level logLevel = Level.INFO;
if (isVerbose(cmd)) {
logLevel = Level.ALL;
}
// init log
LoggingHelper.loggerInit(logLevel, false);
// load the topology definition into topology proto
TopologyAPI.Topology topology = TopologyUtils.getTopology(cmd.getOptionValue("topology_defn"));
Config config = loadConfig(cmd, topology);
LOG.fine("Static config loaded successfully");
LOG.fine(config.toString());
SubmitterMain submitterMain = new SubmitterMain(config, topology);
/* Meaning of exit status code:
- status code = 0:
program exits without error
- 0 < status code < 100:
program fails to execute before program execution. For example,
JVM cannot find or load main class
- 100 <= status code < 200:
program fails to launch after program execution. For example,
topology definition file fails to be loaded
- status code >= 200
program sends out dry-run response */
try {
submitterMain.submitTopology();
} catch (SubmitDryRunResponse response) {
LOG.log(Level.FINE, "Sending out dry-run response");
// Output may contain UTF-8 characters, so we should print using UTF-8 encoding
PrintStream out = new PrintStream(System.out, true, StandardCharsets.UTF_8.name());
out.print(DryRunRenders.render(response, Context.dryRunFormatType(config)));
// Exit with status code 200 to indicate dry-run response is sent out
// SUPPRESS CHECKSTYLE RegexpSinglelineJava
System.exit(200);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Exception e) {
/* Since only stderr is used (by logging), we use stdout here to
propagate error message back to Python's executor.py (invoke site). */
LOG.log(Level.FINE, "Exception when submitting topology", e);
System.out.println(e.getMessage());
// Exit with status code 100 to indicate that error has happened on user-land
// SUPPRESS CHECKSTYLE RegexpSinglelineJava
System.exit(100);
}
LOG.log(Level.FINE, "Topology {0} submitted successfully", topology.getName());
}
// holds all the config read
private final Config config;
// topology definition
private final TopologyAPI.Topology topology;
public SubmitterMain(Config config, TopologyAPI.Topology topology) {
// initialize the options
this.config = config;
this.topology = topology;
}
/**
* Submit a topology
* 1. Instantiate necessary resources
* 2. Valid whether it is legal to submit a topology
* 3. Call LauncherRunner
*
*/
public void submitTopology() throws TopologySubmissionException {
// build primary runtime config first
Config primaryRuntime = Config.newBuilder()
.putAll(LauncherUtils.getInstance().createPrimaryRuntime(topology)).build();
// call launcher directly here if in dry-run mode
if (Context.dryRun(config)) {
callLauncherRunner(primaryRuntime);
return;
}
// 1. Do prepare work
// create an instance of state manager
String statemgrClass = Context.stateManagerClass(config);
IStateManager statemgr;
// Create an instance of the launcher class
String launcherClass = Context.launcherClass(config);
ILauncher launcher;
// create an instance of the uploader class
String uploaderClass = Context.uploaderClass(config);
IUploader uploader;
// create an instance of state manager
try {
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new TopologySubmissionException(
String.format("Failed to instantiate state manager class '%s'", statemgrClass), e);
}
// create an instance of launcher
try {
launcher = ReflectionUtils.newInstance(launcherClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new LauncherException(
String.format("Failed to instantiate launcher class '%s'", launcherClass), e);
}
// create an instance of uploader
try {
uploader = ReflectionUtils.newInstance(uploaderClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new UploaderException(
String.format("Failed to instantiate uploader class '%s'", uploaderClass), e);
}
// Put it in a try block so that we can always clean resources
try {
// initialize the state manager
statemgr.initialize(config);
// initialize the uploader
uploader.initialize(config);
// TODO(mfu): timeout should read from config
SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
// Check if topology is already running
validateSubmit(adaptor, topology.getName());
LOG.log(Level.FINE, "Topology {0} to be submitted", topology.getName());
Config runtimeWithoutPackageURI = Config.newBuilder()
.putAll(primaryRuntime)
.putAll(LauncherUtils.getInstance().createAdaptorRuntime(adaptor))
.put(Key.LAUNCHER_CLASS_INSTANCE, launcher)
.build();
PackingPlan packingPlan = LauncherUtils.getInstance()
.createPackingPlan(config, runtimeWithoutPackageURI);
// The packing plan might call for a number of containers different than the config
// settings. If that's the case we need to modify the configs to match.
runtimeWithoutPackageURI =
updateNumContainersIfNeeded(runtimeWithoutPackageURI, topology, packingPlan);
// If the packing plan is valid we will upload necessary packages
URI packageURI = uploadPackage(uploader);
// Update the runtime config with the packageURI
Config runtimeAll = Config.newBuilder()
.putAll(runtimeWithoutPackageURI)
.put(Key.TOPOLOGY_PACKAGE_URI, packageURI)
.build();
callLauncherRunner(runtimeAll);
} catch (LauncherException | PackingException e) {
// we undo uploading of topology package only if launcher fails to
// launch topology, which will throw LauncherException or PackingException
uploader.undo();
throw e;
} finally {
SysUtils.closeIgnoringExceptions(uploader);
SysUtils.closeIgnoringExceptions(launcher);
SysUtils.closeIgnoringExceptions(statemgr);
}
}
/**
* Checks that the number of containers specified in the topology matches the number of containers
* called for in the packing plan. If they are different, returns a new config with settings
* updated to align with the packing plan. The new config will include an updated
* Key.TOPOLOGY_DEFINITION containing a cloned Topology with it's settings also updated.
*
* @param initialConfig initial config to clone and update (if necessary)
* @param initialTopology topology to check and clone/update (if necessary)
* @param packingPlan packing plan to compare settings with
* @return a new Config cloned from initialConfig and modified as needed to align with packedPlan
*/
@VisibleForTesting
Config updateNumContainersIfNeeded(Config initialConfig,
TopologyAPI.Topology initialTopology,
PackingPlan packingPlan) {
int configNumStreamManagers = TopologyUtils.getNumContainers(initialTopology);
int packingNumStreamManagers = packingPlan.getContainers().size();
if (configNumStreamManagers == packingNumStreamManagers) {
return initialConfig;
}
Config.Builder newConfigBuilder = Config.newBuilder()
.putAll(initialConfig)
.put(Key.NUM_CONTAINERS, packingNumStreamManagers + 1)
.put(Key.TOPOLOGY_DEFINITION,
cloneWithNewNumContainers(initialTopology, packingNumStreamManagers));
String packingClass = Context.packingClass(config);
LOG.warning(String.format("The packing plan (generated by %s) calls for a different number of "
+ "containers (%d) than what was explicitly set in the topology configs (%d). "
+ "Overriding the configs to specify %d containers. When using %s do not explicitly "
+ "call config.setNumStmgrs(..) or config.setNumWorkers(..).",
packingClass, packingNumStreamManagers, configNumStreamManagers,
packingNumStreamManagers, packingClass));
return newConfigBuilder.build();
}
private TopologyAPI.Topology cloneWithNewNumContainers(TopologyAPI.Topology initialTopology,
int numStreamManagers) {
TopologyAPI.Topology.Builder topologyBuilder = TopologyAPI.Topology.newBuilder(initialTopology);
TopologyAPI.Config.Builder configBuilder = TopologyAPI.Config.newBuilder();
for (TopologyAPI.Config.KeyValue keyValue : initialTopology.getTopologyConfig().getKvsList()) {
// override TOPOLOGY_STMGRS value once we find it
if (org.apache.heron.api.Config.TOPOLOGY_STMGRS.equals(keyValue.getKey())) {
TopologyAPI.Config.KeyValue.Builder kvBuilder = TopologyAPI.Config.KeyValue.newBuilder();
kvBuilder.setKey(keyValue.getKey());
kvBuilder.setValue(Integer.toString(numStreamManagers));
configBuilder.addKvs(kvBuilder.build());
} else {
configBuilder.addKvs(keyValue);
}
}
return topologyBuilder.setTopologyConfig(configBuilder).build();
}
protected void validateSubmit(SchedulerStateManagerAdaptor adaptor, String topologyName)
throws TopologySubmissionException {
// Check whether the topology has already been running
// TODO(rli): anti-pattern is too nested on this path to be refactored
Boolean isTopologyRunning = adaptor.isTopologyRunning(topologyName);
if (isTopologyRunning != null && isTopologyRunning.equals(Boolean.TRUE)) {
throw new TopologySubmissionException(
String.format("Topology '%s' already exists", topologyName));
}
}
protected URI uploadPackage(IUploader uploader) throws UploaderException {
// upload the topology package to the storage
return uploader.uploadPackage();
}
protected void callLauncherRunner(Config runtime)
throws LauncherException, PackingException, SubmitDryRunResponse {
// using launch runner, launch the topology
LaunchRunner launchRunner = new LaunchRunner(config, runtime);
launchRunner.call();
}
}