blob: 5c57e24b5b0f498dc0faed1519e3b3904ee24d44 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.scheduler;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.heron.api.exception.InvalidTopologyException;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.utils.TopologyUtils;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.logging.LoggingHelper;
import org.apache.heron.proto.system.PackingPlans;
import org.apache.heron.scheduler.server.SchedulerServer;
import org.apache.heron.scheduler.utils.LauncherUtils;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.scheduler.utils.SchedulerConfigUtils;
import org.apache.heron.scheduler.utils.SchedulerUtils;
import org.apache.heron.scheduler.utils.Shutdown;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.PackingPlanProtoDeserializer;
import org.apache.heron.spi.scheduler.IScheduler;
import org.apache.heron.spi.statemgr.IStateManager;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.utils.ReflectionUtils;
/**
* Main class of scheduler.
*/
public class SchedulerMain {
private static final Logger LOG = Logger.getLogger(SchedulerMain.class.getName());
private int schedulerServerPort = -1; // http port where the scheduler is listening
private TopologyAPI.Topology topology = null; // topology definition
private Config config; // holds all the config read
// Properties passed from command line property arguments
private final Properties properties;
public SchedulerMain(
Config config,
TopologyAPI.Topology topology,
int schedulerServerPort,
Properties properties) {
// initialize the options
this.config = config;
this.topology = topology;
this.schedulerServerPort = schedulerServerPort;
this.properties = properties;
}
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("SchedulerMain", options);
}
// Construct all required command line options
private static Options constructOptions() {
Options options = new Options();
Option cluster = Option.builder("c")
.desc("Cluster name in which the topology needs to run on")
.longOpt("cluster")
.hasArgs()
.argName("cluster")
.required()
.build();
Option role = Option.builder("r")
.desc("Role under which the topology needs to run")
.longOpt("role")
.hasArgs()
.argName("role")
.required()
.build();
Option environment = Option.builder("e")
.desc("Environment under which the topology needs to run")
.longOpt("environment")
.hasArgs()
.argName("environment")
.required()
.build();
Option topologyName = Option.builder("n")
.desc("Name of the topology")
.longOpt("topology_name")
.hasArgs()
.argName("topology name")
.required()
.build();
Option topologyJar = Option.builder("f")
.desc("Topology jar/pex file path")
.longOpt("topology_bin")
.hasArgs()
.argName("topology binary file")
.required()
.build();
Option schedulerHTTPPort = Option.builder("p")
.desc("Http Port number on which the scheduler listens for requests")
.longOpt("http_port")
.hasArgs()
.argName("http port")
.required()
.build();
Option property = Option.builder(
SchedulerUtils.SCHEDULER_COMMAND_LINE_PROPERTIES_OVERRIDE_OPTION)
.desc("use value for given property")
.longOpt("property_override")
.hasArgs()
.valueSeparator()
.argName("property=value")
.build();
Option verbose = Option.builder("v")
.desc("Enable debug logs")
.longOpt("verbose")
.build();
options.addOption(cluster);
options.addOption(role);
options.addOption(environment);
options.addOption(topologyName);
options.addOption(topologyJar);
options.addOption(schedulerHTTPPort);
options.addOption(property);
options.addOption(verbose);
return options;
}
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
.longOpt("help")
.build();
options.addOption(help);
return options;
}
public static void main(String[] args) throws Exception {
// construct the options and help options first.
Options options = constructOptions();
Options helpOptions = constructHelpOptions();
// parse the options
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(helpOptions, args, true);
// print help, if we receive wrong set of arguments
if (cmd.hasOption("h")) {
usage(options);
return;
}
// Now parse the required options
try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
usage(options);
throw new RuntimeException("Error parsing command line options: ", e);
}
// It returns a new empty Properties instead of null,
// if no properties passed from command line. So no need for null check.
Properties schedulerProperties =
cmd.getOptionProperties(SchedulerUtils.SCHEDULER_COMMAND_LINE_PROPERTIES_OVERRIDE_OPTION);
// initialize the scheduler with the options
String topologyName = cmd.getOptionValue("topology_name");
SchedulerMain schedulerMain = createInstance(cmd.getOptionValue("cluster"),
cmd.getOptionValue("role"),
cmd.getOptionValue("environment"),
cmd.getOptionValue("topology_bin"),
topologyName,
Integer.parseInt(cmd.getOptionValue("http_port")),
cmd.hasOption("verbose"),
schedulerProperties);
LOG.info("Scheduler command line properties override: " + schedulerProperties.toString());
// run the scheduler
boolean ret = schedulerMain.runScheduler();
// Log the result and exit
if (!ret) {
throw new RuntimeException("Failed to schedule topology: " + topologyName);
} else {
// stop the server and close the state manager
LOG.log(Level.INFO, "Shutting down topology: {0}", topologyName);
}
}
public static SchedulerMain createInstance(String cluster,
String role,
String env,
String topologyJar,
String topologyName,
int httpPort,
Boolean verbose
) throws IOException, InvalidTopologyException {
return createInstance(
cluster, role, env, topologyJar, topologyName, httpPort, verbose, new Properties());
}
public static SchedulerMain createInstance(String cluster,
String role,
String env,
String topologyJar,
String topologyName,
int httpPort,
Boolean verbose,
Properties schedulerProperties)
throws IOException, InvalidTopologyException {
// Look up the topology def file location
String topologyDefnFile = TopologyUtils.lookUpTopologyDefnFile(".", topologyName);
// load the topology definition into topology proto
TopologyAPI.Topology topology = TopologyUtils.getTopology(topologyDefnFile);
// build the config by expanding all the variables
Config schedulerConfig = SchedulerConfigUtils.loadConfig(
cluster,
role,
env,
topologyJar,
topologyDefnFile,
verbose,
topology);
// set up logging with complete Config
setupLogging(schedulerConfig);
// Create a new instance
SchedulerMain schedulerMain =
new SchedulerMain(schedulerConfig, topology, httpPort, schedulerProperties);
LOG.log(Level.INFO, "Loaded scheduler config: {0}", schedulerMain.config);
return schedulerMain;
}
// Set up logging based on the Config
private static void setupLogging(Config config) throws IOException {
String systemConfigFilename = Context.systemConfigFile(config);
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFilename, true)
.build();
// Init the logging setting and redirect the stdout and stderr to logging
// For now we just set the logging level as INFO; later we may accept an argument to set it.
Level loggingLevel = Level.INFO;
if (Context.verbose(config).booleanValue()) {
loggingLevel = Level.FINE;
}
// TODO(mfu): The folder creation may be duplicated with heron-executor in future
// TODO(mfu): Remove the creation in future if feasible
String loggingDir = systemConfig.getHeronLoggingDirectory();
if (!FileUtils.isDirectoryExists(loggingDir)) {
FileUtils.createDirectory(loggingDir);
}
// Log to file
LoggingHelper.loggerInit(loggingLevel, true);
// TODO(mfu): Pass the scheduler id from cmd
String processId =
String.format("%s-%s-%s", "heron", Context.topologyName(config), "scheduler");
LoggingHelper.addLoggingHandler(
LoggingHelper.getFileHandler(processId, loggingDir, true,
systemConfig.getHeronLoggingMaximumSize(),
systemConfig.getHeronLoggingMaximumFiles()));
LOG.info("Logging setup done.");
}
/**
* Get the http server for receiving scheduler requests
*
* @param runtime the runtime configuration
* @param scheduler an instance of the scheduler
* @param port the port for scheduler to listen on
* @return an instance of the http server
*/
protected SchedulerServer getServer(
Config runtime, IScheduler scheduler, int port) throws IOException {
// create an instance of the server using scheduler class and port
return new SchedulerServer(runtime, scheduler, port);
}
/**
* Run the scheduler.
* It is a blocking call, and it will return in 2 cases:
* 1. The topology is requested to kill
* 2. Unexpected exceptions happen
*
* @return true if scheduled successfully
*/
public boolean runScheduler() {
IScheduler scheduler = null;
String statemgrClass = Context.stateManagerClass(config);
IStateManager statemgr;
try {
// create an instance of state manager
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
LOG.log(Level.SEVERE, "Failed to instantiate instances using config: " + config, e);
return false;
}
SchedulerServer server = null;
boolean isSuccessful = false;
// Put it in a try block so that we can always clean resources
try {
// initialize the state manager
statemgr.initialize(config);
// TODO(mfu): timeout should read from config
SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);
// get a packed plan and schedule it
PackingPlans.PackingPlan serializedPackingPlan = adaptor.getPackingPlan(topology.getName());
if (serializedPackingPlan == null) {
LOG.log(Level.SEVERE, "Failed to fetch PackingPlan for topology:{0} from the state manager",
topology.getName());
return false;
}
LOG.log(Level.INFO, "Packing plan fetched from state: {0}", serializedPackingPlan);
PackingPlan packedPlan = new PackingPlanProtoDeserializer().fromProto(serializedPackingPlan);
// build the runtime config
LauncherUtils launcherUtils = LauncherUtils.getInstance();
Config runtime = Config.newBuilder()
.putAll(launcherUtils.createPrimaryRuntime(topology))
.putAll(launcherUtils.createAdaptorRuntime(adaptor))
.put(Key.SCHEDULER_SHUTDOWN, getShutdown())
.put(Key.SCHEDULER_PROPERTIES, properties)
.build();
Config ytruntime = launcherUtils.createConfigWithPackingDetails(runtime, packedPlan);
// invoke scheduler
scheduler = launcherUtils.getSchedulerInstance(config, ytruntime);
if (scheduler == null) {
return false;
}
isSuccessful = scheduler.onSchedule(packedPlan);
if (!isSuccessful) {
LOG.severe("Failed to schedule topology");
return false;
}
// Failures in server initialization throw exceptions
// get the scheduler server endpoint for receiving requests
server = getServer(ytruntime, scheduler, schedulerServerPort);
// start the server to manage runtime requests
server.start();
// write the scheduler location to state manager
// Make sure it happens after IScheduler.onScheduler
isSuccessful = SchedulerUtils.setSchedulerLocation(
runtime, String.format("%s:%d", server.getHost(), server.getPort()), scheduler);
if (isSuccessful) {
// wait until kill request or some interrupt occurs if the scheduler starts successfully
LOG.info("Waiting for termination... ");
Runtime.schedulerShutdown(ytruntime).await();
}
} catch (IOException e) {
LOG.log(Level.SEVERE, "Failed to start server", e);
return false;
} finally {
// Clean the resources
if (server != null) {
server.stop();
}
// 4. Close the resources
SysUtils.closeIgnoringExceptions(scheduler);
SysUtils.closeIgnoringExceptions(statemgr);
}
return isSuccessful;
}
// Utils method
protected Shutdown getShutdown() {
return new Shutdown();
}
}