* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.heron.scheduler;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.DryRunFormatType;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.utils.logging.LoggingHelper;
import org.apache.heron.scheduler.dryrun.SubmitDryRunResponse;
import org.apache.heron.scheduler.utils.DryRunRenders;
import org.apache.heron.scheduler.utils.LauncherUtils;
import org.apache.heron.scheduler.utils.SubmitterUtils;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.ConfigLoader;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingException;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.scheduler.ILauncher;
import org.apache.heron.spi.scheduler.LauncherException;
import org.apache.heron.spi.statemgr.IStateManager;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.uploader.IUploader;
import org.apache.heron.spi.uploader.UploaderException;
import org.apache.heron.spi.utils.ReflectionUtils;
* Calls Uploader to upload topology package, and Launcher to launch Scheduler.
public class SubmitterMain {
private static final Logger LOG = Logger.getLogger(SubmitterMain.class.getName());
* Load the config parameters from the command line
* @param cluster name of the cluster
* @param role user role
* @param environ user provided environment/tag
* @param submitUser the submit user
* @param dryRun run as dry run
* @param dryRunFormat the dry run format
* @param verbose enable verbose logging
* @return config the command line config
protected static Config commandLineConfigs(String cluster,
String role,
String environ,
String submitUser,
Boolean dryRun,
DryRunFormatType dryRunFormat,
Boolean verbose) {
return Config.newBuilder()
.put(Key.CLUSTER, cluster)
.put(Key.ROLE, role)
.put(Key.ENVIRON, environ)
.put(Key.SUBMIT_USER, submitUser)
.put(Key.DRY_RUN, dryRun)
.put(Key.DRY_RUN_FORMAT_TYPE, dryRunFormat)
.put(Key.VERBOSE, verbose)
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("SubmitterMain", options);
// Construct all required command line options
private static Options constructOptions() {
Options options = new Options();
Option cluster = Option.builder("c")
.desc("Cluster name in which the topology needs to run on")
Option role = Option.builder("r")
.desc("Role under which the topology needs to run")
Option environment = Option.builder("e")
.desc("Environment under which the topology needs to run")
Option submitUser = Option.builder("s")
.desc("User submitting the topology")
.argName("submit userid")
Option heronHome = Option.builder("d")
.desc("Directory where heron is installed")
.argName("heron home dir")
Option configFile = Option.builder("p")
.desc("Path of the config files")
.argName("config path")
Option configOverrides = Option.builder("o")
.desc("Command line override config path")
.argName("override config file")
Option releaseFile = Option.builder("b")
.desc("Release file name")
.argName("release information")
Option topologyPackage = Option.builder("y")
.desc("tar ball containing user submitted jar/tar, defn and config")
.argName("topology package")
Option topologyDefn = Option.builder("f")
.desc("serialized file containing Topology protobuf")
.argName("topology definition")
Option topologyJar = Option.builder("j")
.desc("The filename of the heron topology jar/tar/pex file to be run by the executor")
.argName("topology binary filename on the cluster")
Option dryRun = Option.builder("u")
.desc("run in dry-run mode")
Option dryRunFormat = Option.builder("t")
.desc("dry-run format")
Option verbose = Option.builder("v")
.desc("Enable debug logs")
return options;
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
return options;
private static boolean isVerbose(CommandLine cmd) {
return cmd.hasOption("v");
public static Config loadConfig(CommandLine cmd, TopologyAPI.Topology topology) {
String cluster = cmd.getOptionValue("cluster");
String role = cmd.getOptionValue("role");
String environ = cmd.getOptionValue("environment");
String submitUser = cmd.getOptionValue("submit_user");
String heronHome = cmd.getOptionValue("heron_home");
String configPath = cmd.getOptionValue("config_path");
String overrideConfigFile = cmd.getOptionValue("override_config_file");
String releaseFile = cmd.getOptionValue("release_file");
String topologyPackage = cmd.getOptionValue("topology_package");
String topologyDefnFile = cmd.getOptionValue("topology_defn");
String topologyBinaryFile = cmd.getOptionValue("topology_bin");
Boolean dryRun = false;
if (cmd.hasOption("u")) {
dryRun = true;
// Default dry-run output format type
DryRunFormatType dryRunFormat = DryRunFormatType.TABLE;
if (dryRun && cmd.hasOption("t")) {
String format = cmd.getOptionValue("dry_run_format");
dryRunFormat = DryRunFormatType.getDryRunFormatType(format);
LOG.fine(String.format("Running dry-run mode using format %s", format));
// first load the defaults, then the config from files to override it
// next add config parameters from the command line
// load the topology configs
// build the final config by expanding all the variables
return Config.toLocalMode(Config.newBuilder()
.putAll(ConfigLoader.loadConfig(heronHome, configPath, releaseFile, overrideConfigFile))
.putAll(commandLineConfigs(cluster, role, environ, submitUser, dryRun,
dryRunFormat, isVerbose(cmd)))
.putAll(SubmitterUtils.topologyConfigs(topologyPackage, topologyBinaryFile,
topologyDefnFile, topology))
public static void main(String[] args) throws Exception {
Options options = constructOptions();
Options helpOptions = constructHelpOptions();
CommandLineParser parser = new DefaultParser();
// parse the help options first.
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
try {
// Now parse the required options
cmd = parser.parse(options, args);
} catch (ParseException e) {
throw new RuntimeException("Error parsing command line options: ", e);
Level logLevel = Level.INFO;
if (isVerbose(cmd)) {
logLevel = Level.ALL;
// init log
LoggingHelper.loggerInit(logLevel, false);
// load the topology definition into topology proto
TopologyAPI.Topology topology = TopologyUtils.getTopology(cmd.getOptionValue("topology_defn"));
Config config = loadConfig(cmd, topology);
LOG.fine("Static config loaded successfully");
SubmitterMain submitterMain = new SubmitterMain(config, topology);
/* Meaning of exit status code:
- status code = 0:
program exits without error
- 0 < status code < 100:
program fails to execute before program execution. For example,
JVM cannot find or load main class
- 100 <= status code < 200:
program fails to launch after program execution. For example,
topology definition file fails to be loaded
- status code >= 200
program sends out dry-run response */
try {
} catch (SubmitDryRunResponse response) {
LOG.log(Level.FINE, "Sending out dry-run response");
// Output may contain UTF-8 characters, so we should print using UTF-8 encoding
PrintStream out = new PrintStream(System.out, true,;
out.print(DryRunRenders.render(response, Context.dryRunFormatType(config)));
// Exit with status code 200 to indicate dry-run response is sent out
// SUPPRESS CHECKSTYLE RegexpSinglelineJava
} catch (Exception e) {
/* Since only stderr is used (by logging), we use stdout here to
propagate error message back to Python's (invoke site). */
LOG.log(Level.SEVERE, "Exception when submitting topology", e);
// Exit with status code 100 to indicate that error has happened on user-land
// SUPPRESS CHECKSTYLE RegexpSinglelineJava
LOG.log(Level.FINE, "Topology {0} submitted successfully", topology.getName());
// holds all the config read
private final Config config;
// topology definition
private final TopologyAPI.Topology topology;
public SubmitterMain(Config config, TopologyAPI.Topology topology) {
// initialize the options
this.config = config;
this.topology = topology;
* Submit a topology
* 1. Instantiate necessary resources
* 2. Valid whether it is legal to submit a topology
* 3. Call LauncherRunner
public void submitTopology() throws TopologySubmissionException {
// build primary runtime config first
Config primaryRuntime = Config.newBuilder()
// call launcher directly here if in dry-run mode
if (Context.dryRun(config)) {
// 1. Do prepare work
// create an instance of state manager
String statemgrClass = Context.stateManagerClass(config);
IStateManager statemgr;
// Create an instance of the launcher class
String launcherClass = Context.launcherClass(config);
ILauncher launcher;
// create an instance of the uploader class
String uploaderClass = Context.uploaderClass(config);
IUploader uploader;
// create an instance of state manager
try {
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new TopologySubmissionException(
String.format("Failed to instantiate state manager class '%s'", statemgrClass), e);
// create an instance of launcher
try {
launcher = ReflectionUtils.newInstance(launcherClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new LauncherException(
String.format("Failed to instantiate launcher class '%s'", launcherClass), e);
// create an instance of uploader
try {
uploader = ReflectionUtils.newInstance(uploaderClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new UploaderException(
String.format("Failed to instantiate uploader class '%s'", uploaderClass), e);
// Put it in a try block so that we can always clean resources
try {
// initialize the state manager
// initialize the uploader
// TODO(mfu): timeout should read from config
SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
// Check if topology is already running
validateSubmit(adaptor, topology.getName());
LOG.log(Level.FINE, "Topology {0} to be submitted", topology.getName());
Config runtimeWithoutPackageURI = Config.newBuilder()
PackingPlan packingPlan = LauncherUtils.getInstance()
.createPackingPlan(config, runtimeWithoutPackageURI);
// The packing plan might call for a number of containers different than the config
// settings. If that's the case we need to modify the configs to match.
runtimeWithoutPackageURI =
updateNumContainersIfNeeded(runtimeWithoutPackageURI, topology, packingPlan);
// If the packing plan is valid we will upload necessary packages
URI packageURI = uploadPackage(uploader);
// Update the runtime config with the packageURI
Config runtimeAll = Config.newBuilder()
} catch (LauncherException | PackingException e) {
// we undo uploading of topology package only if launcher fails to
// launch topology, which will throw LauncherException or PackingException
throw e;
} finally {
* Checks that the number of containers specified in the topology matches the number of containers
* called for in the packing plan. If they are different, returns a new config with settings
* updated to align with the packing plan. The new config will include an updated
* Key.TOPOLOGY_DEFINITION containing a cloned Topology with it's settings also updated.
* @param initialConfig initial config to clone and update (if necessary)
* @param initialTopology topology to check and clone/update (if necessary)
* @param packingPlan packing plan to compare settings with
* @return a new Config cloned from initialConfig and modified as needed to align with packedPlan
Config updateNumContainersIfNeeded(Config initialConfig,
TopologyAPI.Topology initialTopology,
PackingPlan packingPlan) {
int configNumStreamManagers = TopologyUtils.getNumContainers(initialTopology);
int packingNumStreamManagers = packingPlan.getContainers().size();
if (configNumStreamManagers == packingNumStreamManagers) {
return initialConfig;
Config.Builder newConfigBuilder = Config.newBuilder()
.put(Key.NUM_CONTAINERS, packingNumStreamManagers + 1)
cloneWithNewNumContainers(initialTopology, packingNumStreamManagers));
String packingClass = Context.packingClass(config);
LOG.warning(String.format("The packing plan (generated by %s) calls for a different number of "
+ "containers (%d) than what was explicitly set in the topology configs (%d). "
+ "Overriding the configs to specify %d containers. When using %s do not explicitly "
+ "call config.setNumStmgrs(..) or config.setNumWorkers(..).",
packingClass, packingNumStreamManagers, configNumStreamManagers,
packingNumStreamManagers, packingClass));
private TopologyAPI.Topology cloneWithNewNumContainers(TopologyAPI.Topology initialTopology,
int numStreamManagers) {
TopologyAPI.Topology.Builder topologyBuilder = TopologyAPI.Topology.newBuilder(initialTopology);
TopologyAPI.Config.Builder configBuilder = TopologyAPI.Config.newBuilder();
for (TopologyAPI.Config.KeyValue keyValue : initialTopology.getTopologyConfig().getKvsList()) {
// override TOPOLOGY_STMGRS value once we find it
if (org.apache.heron.api.Config.TOPOLOGY_STMGRS.equals(keyValue.getKey())) {
TopologyAPI.Config.KeyValue.Builder kvBuilder = TopologyAPI.Config.KeyValue.newBuilder();
} else {
return topologyBuilder.setTopologyConfig(configBuilder).build();
protected void validateSubmit(SchedulerStateManagerAdaptor adaptor, String topologyName)
throws TopologySubmissionException {
// Check whether the topology has already been running
// TODO(rli): anti-pattern is too nested on this path to be refactored
Boolean isTopologyRunning = adaptor.isTopologyRunning(topologyName);
if (isTopologyRunning != null && isTopologyRunning.equals(Boolean.TRUE)) {
throw new TopologySubmissionException(
String.format("Topology '%s' already exists", topologyName));
protected URI uploadPackage(IUploader uploader) throws UploaderException {
// upload the topology package to the storage
return uploader.uploadPackage();
protected void callLauncherRunner(Config runtime)
throws LauncherException, PackingException, SubmitDryRunResponse {
// using launch runner, launch the topology
LaunchRunner launchRunner = new LaunchRunner(config, runtime);;