Merge pull request #241 from twitter/karthik/ps
added ps command
diff --git a/heron/cli/src/python/BUILD b/heron/cli/src/python/BUILD
index b9d22b5..111ec46 100644
--- a/heron/cli/src/python/BUILD
+++ b/heron/cli/src/python/BUILD
@@ -13,6 +13,7 @@
"jars.py",
"kill.py",
"opts.py",
+ "ps.py",
"restart.py",
"submit.py",
"utils.py",
diff --git a/heron/cli/src/python/jars.py b/heron/cli/src/python/jars.py
index 6cd80c1..7845094 100644
--- a/heron/cli/src/python/jars.py
+++ b/heron/cli/src/python/jars.py
@@ -10,7 +10,7 @@
return file_list[0] if file_list else None
################################################################################
-# Get the topology jars - TODO, make the jars independent version free
+# Get the topology jars
################################################################################
def topology_jars():
jars = [
@@ -19,6 +19,15 @@
return jars
################################################################################
+# Get the command jars
+################################################################################
+def command_jars():
+ jars = [
+ os.path.join(utils.get_heron_lib_dir(), "commands", "*")
+ ]
+ return jars
+
+################################################################################
# Get the scheduler jars
################################################################################
def scheduler_jars():
diff --git a/heron/cli/src/python/main.py b/heron/cli/src/python/main.py
index 3f6d16d..b93fbe2 100644
--- a/heron/cli/src/python/main.py
+++ b/heron/cli/src/python/main.py
@@ -23,6 +23,7 @@
import heron.cli.src.python.activate as activate
import heron.cli.src.python.deactivate as deactivate
import heron.cli.src.python.kill as kill
+import heron.cli.src.python.ps as ps
import heron.cli.src.python.restart as restart
import heron.cli.src.python.submit as submit
import heron.cli.src.python.utils as utils
@@ -71,6 +72,7 @@
deactivate.create_parser(subparsers)
help.create_parser(subparsers)
kill.create_parser(subparsers)
+ ps.create_parser(subparsers)
restart.create_parser(subparsers)
submit.create_parser(subparsers)
version.create_parser(subparsers)
@@ -93,6 +95,9 @@
elif command == 'restart':
return restart.run(command, parser, command_args, unknown_args)
+ elif command == 'ps':
+ return ps.run(command, parser, command_args, unknown_args)
+
elif command == 'submit':
return submit.run(command, parser, command_args, unknown_args)
diff --git a/heron/cli/src/python/ps.py b/heron/cli/src/python/ps.py
new file mode 100644
index 0000000..b3558a7
--- /dev/null
+++ b/heron/cli/src/python/ps.py
@@ -0,0 +1,70 @@
+#!/usr/bin/python2.7
+
+import argparse
+import atexit
+import base64
+import contextlib
+import glob
+import logging
+import logging.handlers
+import os
+import shutil
+import sys
+import subprocess
+import tarfile
+import tempfile
+
+from heron.common.src.python.color import Log
+
+import heron.cli.src.python.args as args
+import heron.cli.src.python.execute as execute
+import heron.cli.src.python.jars as jars
+import heron.cli.src.python.utils as utils
+
+def create_parser(subparsers):
+ parser = subparsers.add_parser(
+ 'ps',
+ help='List all topologies',
+ usage = "%(prog)s [options] cluster/[role]/[environ]",
+ add_help = False)
+
+ args.add_titles(parser)
+ args.add_cluster_role_env(parser)
+
+ args.add_config(parser)
+ args.add_verbose(parser)
+
+ parser.set_defaults(subcommand='ps')
+ return parser
+
+def run(command, parser, cl_args, unknown_args):
+
+ try:
+ config_overrides = utils.parse_cmdline_override(cl_args)
+
+ new_args = [
+ "--cluster", cl_args['cluster'],
+ "--role", cl_args['role'],
+ "--environment", cl_args['environ'],
+ "--heron_home", utils.get_heron_dir(),
+ "--config_path", cl_args['config_path'],
+ "--config_overrides", base64.b64encode(config_overrides),
+ "--command", "ps",
+ ]
+
+ lib_jars = utils.get_heron_libs(jars.command_jars() + jars.statemgr_jars())
+
+ # invoke the runtime manager to kill the topology
+ execute.heron_class(
+ 'com.twitter.heron.command.CommandHandlerMain',
+ lib_jars,
+ extra_jars=[],
+ args= new_args
+ )
+
+ except Exception as ex:
+ print ex
+ Log.error('Failed to get list of topologies')
+ return False
+
+ return True
diff --git a/heron/commands/src/java/BUILD b/heron/commands/src/java/BUILD
new file mode 100644
index 0000000..dca9802
--- /dev/null
+++ b/heron/commands/src/java/BUILD
@@ -0,0 +1,41 @@
+package(default_visibility = ["//visibility:public"])
+
+load("/tools/rules/heron_deps", "heron_java_proto_files")
+
+common_deps_files = [
+ "//heron/common/src/java:common-java",
+ "//3rdparty/commons:commons-cli-java",
+ "//3rdparty/guava:guava-java",
+]
+
+spi_deps_files = [
+ "//heron/spi/src/java:common-spi-java",
+ "//heron/spi/src/java:statemgr-spi-java",
+ "//heron/spi/src/java:utils-spi-java",
+]
+
+commands_deps_files = \
+ common_deps_files + \
+ heron_java_proto_files() + \
+ spi_deps_files
+
+java_library(
+ name = 'commands-java',
+ srcs = glob(
+ ["**/*.java"],
+ ),
+ deps = commands_deps_files,
+)
+
+java_binary(
+ name = 'commands-unshaded',
+ srcs = glob(["**/*.java"]),
+ deps = commands_deps_files,
+)
+
+genrule(
+ name = "heron-commands",
+ srcs = [":commands-unshaded_deploy.jar"],
+ outs = ["heron-commands.jar"],
+ cmd = "cp $< $@",
+)
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java
new file mode 100644
index 0000000..b646cdd
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java
@@ -0,0 +1,40 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+
+public abstract class CommandHandler {
+
+ // static config read from the config files
+ protected Config config;
+
+ // runtime config gathered during execution
+ protected Config runtime;
+
+ /**
+ * Construct the command handler with static and runtime config
+ */
+ CommandHandler(Config config, Config runtime) {
+ this.config = config;
+ this.runtime = runtime;
+ }
+
+ /**
+ * Execute any conditions before the command execution
+ */
+ public abstract boolean beforeExecution() throws Exception;
+
+ /**
+ * Execute any cleanup after the command execution
+ */
+ public abstract boolean afterExecution() throws Exception;
+
+ /**
+ * Execute the command
+ */
+ public abstract boolean execute() throws Exception;
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java
new file mode 100644
index 0000000..a940fa6
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java
@@ -0,0 +1,79 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.common.basics.FileUtils;
+import com.twitter.heron.proto.scheduler.Scheduler;
+
+import com.twitter.heron.spi.common.ClusterConfig;
+import com.twitter.heron.spi.common.ClusterDefaults;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.common.Keys;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * For loading command handler config
+ */
+public class CommandHandlerConfig {
+ private static final Logger LOG = Logger.getLogger(CommandHandlerConfig.class.getName());
+
+ /**
+ * Load the defaults config
+ *
+ * @return config, the defaults config
+ */
+ protected static Config defaultConfigs(String heronHome, String configPath) {
+ Config config = Config.newBuilder()
+ .putAll(ClusterDefaults.getDefaults())
+ .putAll(ClusterConfig.loadCommandsConfig(heronHome, configPath))
+ .build();
+ return config;
+ }
+
+ /**
+ * Load the config parameters from the command line
+ *
+ * @param cluster, name of the cluster
+ * @param role, user role
+ * @param environ, user provided environment/tag
+ *
+ * @return config, the command line config
+ */
+ protected static Config commandLineConfigs(String cluster, String role, String environ) {
+ Config config = Config.newBuilder()
+ .put(Keys.cluster(), cluster)
+ .put(Keys.role(), role)
+ .put(Keys.environ(), environ)
+ .build();
+ return config;
+ }
+
+ /**
+ * Load the config from static config files
+ *
+ * @param commandLine, the command line args provided
+ *
+ * @return config, the static config
+ */
+ protected static Config loadConfig(CommandLine commandLine) {
+
+ String cluster = commandLine.getOptionValue("cluster");
+ String role = commandLine.getOptionValue("role");
+ String environ = commandLine.getOptionValue("environment");
+ String heronHome = commandLine.getOptionValue("heron_home");
+ String configPath = commandLine.getOptionValue("config_path");
+
+ // build the config by expanding all the variables
+ Config config = Config.expand(
+ Config.newBuilder()
+ .putAll(defaultConfigs(heronHome, configPath))
+ .putAll(commandLineConfigs(cluster, role, environ))
+ .build());
+ return config;
+ }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java
new file mode 100644
index 0000000..c932861
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java
@@ -0,0 +1,20 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+
+public class CommandHandlerFactory {
+ private static final Logger LOG = Logger.getLogger(CommandHandlerFactory.class.getName());
+
+ public static CommandHandler makeCommand(String command, Config config, Config runtime) {
+ if (command.equalsIgnoreCase("ps"))
+ return new ListTopologiesHandler(config, runtime);
+
+ LOG.info("Invalid command " + command);
+ return null;
+ }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java
new file mode 100644
index 0000000..9bbd192
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java
@@ -0,0 +1,95 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.spi.common.ClusterConfig;
+import com.twitter.heron.spi.common.ClusterDefaults;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.common.Keys;
+import com.twitter.heron.spi.statemgr.IStateManager;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.HelpFormatter;
+
+public class CommandHandlerMain {
+ private static final Logger LOG = Logger.getLogger(CommandHandlerMain.class.getName());
+
+ public static void main(String[] args)
+ throws ClassNotFoundException, IllegalAccessException,
+ InstantiationException, IOException, ParseException {
+
+ // parse the help options first.
+ Options options = CommandHandlerOptions.constructOptions();
+ Options helpOptions = CommandHandlerOptions.constructHelpOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse(helpOptions, args, true);;
+
+ if(cmd.hasOption("h")) {
+ CommandHandlerOptions.usage(options);
+ System.exit(0);
+ }
+
+ // Now parse the required options
+ try {
+ cmd = parser.parse(options, args);
+ } catch(ParseException e) {
+ LOG.severe("Error parsing command line options: " + e.getMessage());
+ CommandHandlerOptions.usage(options);
+ System.exit(1);
+ }
+
+ // get the command to be executed
+ String command = cmd.getOptionValue("command");
+
+ // load the static config
+ Config config = CommandHandlerConfig.loadConfig(cmd);
+ LOG.info("Static config loaded successfully ");
+ LOG.info(config.toString());
+
+ // create an instance of state manager
+ String statemgrClass = Context.stateManagerClass(config);
+ IStateManager statemgr = (IStateManager) Class.forName(statemgrClass).newInstance();
+
+ try {
+ // initialize the statemgr
+ statemgr.initialize(config);
+
+ // build the runtime config
+ Config runtime = Config.newBuilder()
+ .put(Keys.schedulerStateManagerAdaptor(), new SchedulerStateManagerAdaptor(statemgr))
+ .build();
+
+ // instantiate the command handler
+ CommandHandler commandHandler = CommandHandlerFactory.makeCommand(command, config, runtime);
+
+ // execute preconditions for command execution
+ commandHandler.beforeExecution();
+
+ commandHandler.execute();
+
+ // execute post conditions for command execution
+ commandHandler.afterExecution();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.severe("Unable to execute command " + command);
+ System.exit(1);
+
+ } finally {
+ // close the state manager
+ statemgr.close();
+ }
+
+ System.exit(0);
+ }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java
new file mode 100644
index 0000000..81feabb
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java
@@ -0,0 +1,105 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.HelpFormatter;
+
+public class CommandHandlerOptions {
+ private static final Logger LOG = Logger.getLogger(CommandHandlerOptions.class.getName());
+
+ // Print usage options
+ protected static void usage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "CommandHandlerOptions", options );
+ }
+
+ // Construct all required command line options
+ protected static Options constructOptions() {
+ Options options = new Options();
+
+ Option cluster = Option.builder("c")
+ .desc("Cluster name in which the topology needs to run on")
+ .longOpt("cluster")
+ .hasArgs()
+ .argName("cluster")
+ .required()
+ .build();
+
+ Option role = Option.builder("r")
+ .desc("Role under which the topology needs to run")
+ .longOpt("role")
+ .hasArgs()
+ .argName("role")
+ .required()
+ .build();
+
+ Option environment = Option.builder("e")
+ .desc("Environment under which the topology needs to run")
+ .longOpt("environment")
+ .hasArgs()
+ .argName("environment")
+ .required()
+ .build();
+
+ Option heronHome = Option.builder("d")
+ .desc("Directory where heron is installed")
+ .longOpt("heron_home")
+ .hasArgs()
+ .argName("heron home dir")
+ .required()
+ .build();
+
+ Option configPath = Option.builder("p")
+ .desc("Path of the config files")
+ .longOpt("config_path")
+ .hasArgs()
+ .argName("config path")
+ .required()
+ .build();
+
+ // TODO: Need to figure out the exact format
+ Option configOverrides = Option.builder("o")
+ .desc("Command line config overrides")
+ .longOpt("config_overrides")
+ .hasArgs()
+ .argName("config overrides")
+ .build();
+
+ Option command = Option.builder("m")
+ .desc("Command to run")
+ .longOpt("command")
+ .hasArgs()
+ .required()
+ .argName("command to run")
+ .build();
+
+ options.addOption(cluster);
+ options.addOption(role);
+ options.addOption(environment);
+ options.addOption(configPath);
+ options.addOption(configOverrides);
+ options.addOption(command);
+ options.addOption(heronHome);
+
+ return options;
+ }
+
+ // construct command line help options
+ protected static Options constructHelpOptions() {
+ Options options = new Options();
+ Option help = Option.builder("h")
+ .desc("List all options and their description")
+ .longOpt("help")
+ .build();
+
+ options.addOption(help);
+ return options;
+ }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java b/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java
new file mode 100644
index 0000000..a42727e
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java
@@ -0,0 +1,94 @@
+package com.twitter.heron.command;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.util.concurrent.TimeUnit;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.spi.utils.Runtime;
+
+import com.twitter.heron.proto.system.ExecutionEnvironment;
+
+public class ListTopologiesHandler extends CommandHandler {
+ private static final Logger LOG = Logger.getLogger(ListTopologiesHandler.class.getName());
+
+ /**
+ * Construct the command handler with static and runtime config
+ */
+ ListTopologiesHandler(Config config, Config runtime) {
+ super(config, runtime);
+ }
+
+ /**
+ * Execute any conditions before the command execution
+ */
+ public boolean beforeExecution() {
+ return true;
+ }
+
+ /**
+ * Execute any cleanup after the command execution
+ */
+ public boolean afterExecution() {
+ return true;
+ }
+
+ /**
+ * Execute the command
+ */
+ public boolean execute() throws Exception {
+
+ // get the state manager instance
+ SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
+
+ // get the execution states of all topologies
+ List<ExecutionEnvironment.ExecutionState> executionStates;
+ executionStates = stateManager.getAllExecutionStates().get(5, TimeUnit.SECONDS);
+
+ // if no topologies are found, return right away
+ if (executionStates.isEmpty()) {
+ System.out.println("No topologies found");
+ return true;
+ }
+
+ // print the header
+ System.out.format("%-12.12s %-12.12s %-12.12s %-15.15s %-15.15s %s\n",
+ "CLUSTER", "ROLE", "ENVIRON", "USER", "TIME", "NAME");
+
+ long now = System.currentTimeMillis()/1000;
+ for (ExecutionEnvironment.ExecutionState es: executionStates) {
+
+ // calculate the difference in time in seconds
+ long delta = now - es.getSubmissionTime();
+
+ // calculate (and subtract) whole days
+ long days = (long)Math.floor(delta / 86400);
+ delta -= days * 86400;
+
+ // calculate (and subtract) whole hours
+ long hours = ((long)Math.floor(delta / 3600)) % 24;
+ delta -= hours * 3600;
+
+ // calculate (and subtract) whole minutes
+ long minutes = ((long)Math.floor(delta / 60)) % 60;
+ delta -= minutes * 60;
+
+ // what's left is seconds
+ long seconds = delta % 60; // in theory the modulus is not required
+
+ String timeString = String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds);
+
+ System.out.format("%-12.12s %-12.12s %-12.12s %-15.15s %-15.15s %s\n",
+ es.getCluster(), es.getRole(), es.getEnviron(),
+ es.getSubmissionUser(), timeString, es.getTopologyName());
+ }
+
+ return true;
+ }
+}
diff --git a/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java b/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java
index b1beef5..3f9155b 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java
@@ -146,6 +146,15 @@
return cb.build();
}
+ public static Config loadCommandsConfig(String heronHome, String configPath) {
+ Config homeConfig = loadBasicConfig(heronHome, configPath);
+
+ Config.Builder cb = Config.newBuilder()
+ .putAll(homeConfig)
+ .putAll(loadStateManagerConfig(Context.stateManagerFile(homeConfig)));
+ return cb.build();
+ }
+
public static Config loadSandboxConfig() {
Config sandboxConfig = loadBasicSandboxConfig();
diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java
index 17ee0a6..ca8e71c 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java
@@ -1,5 +1,6 @@
package com.twitter.heron.spi.statemgr;
+import java.util.List;
import com.google.common.util.concurrent.ListenableFuture;
import com.twitter.heron.api.generated.TopologyAPI;
@@ -152,6 +153,13 @@
WatchCallback watcher, String topologyName);
/**
+ * Get the execution states of all topologies
+ *
+ * @return List<ExecutionState>
+ */
+ ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates();
+
+ /**
* Set the location of Tmaster.
* @param location
* @param topologyName
diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
index 6ef49ec..44b4007 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
@@ -1,5 +1,6 @@
package com.twitter.heron.spi.statemgr;
+import java.util.List;
import com.google.common.util.concurrent.ListenableFuture;
import com.twitter.heron.api.generated.TopologyAPI;
@@ -162,6 +163,15 @@
}
/**
+ * Get the execution states of all topologies
+ *
+ * @return List<ExecutionState>
+ */
+ public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+ return delegate.getAllExecutionStates();
+ }
+
+ /**
* Get the physical plan for the given topology
*
* @param watcher @see com.twitter.heron.spi.statemgr.WatchCallback
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java
index fc44031..d76cfb6 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java
@@ -1,5 +1,6 @@
package com.twitter.heron.statemgr;
+import java.util.List;
import java.util.Map;
import com.google.common.util.concurrent.ListenableFuture;
@@ -109,6 +110,11 @@
}
@Override
+ public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+ return nullFuture;
+ }
+
+ @Override
public ListenableFuture<PhysicalPlans.PhysicalPlan> getPhysicalPlan(WatchCallback watcher, String topologyName) {
return nullFuture;
}
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
index 432f6b8..07f5976 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
@@ -1,6 +1,9 @@
package com.twitter.heron.statemgr.localfs;
+import java.io.File;
+import java.util.List;
import java.util.Map;
+import java.util.LinkedList;
import java.util.logging.Logger;
import com.google.common.util.concurrent.ListenableFuture;
@@ -193,13 +196,36 @@
executionState = ExecutionEnvironment.ExecutionState.parseFrom(data);
future.set(executionState);
} catch (InvalidProtocolBufferException e) {
- future.setException(new RuntimeException("Could not parse SchedulerLocation", e));
+ future.setException(new RuntimeException("Could not parse ExecutionState", e));
}
return future;
}
@Override
+ public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+ SettableFuture<List<ExecutionEnvironment.ExecutionState>> future = SettableFuture.create();
+ ExecutionEnvironment.ExecutionState executionState;
+ List<ExecutionEnvironment.ExecutionState> executionStates = new LinkedList<ExecutionEnvironment.ExecutionState>();
+
+ File folder = new File(getExecutionStateDir());
+ File[] files = folder.listFiles();
+ for (File file: files) {
+ byte[] data = FileUtils.readFromFile(file.getAbsolutePath());
+ try {
+ executionState = ExecutionEnvironment.ExecutionState.parseFrom(data);
+ executionStates.add(executionState);
+ } catch (InvalidProtocolBufferException e) {
+ future.setException(new RuntimeException("Could not parse ExecutionState", e));
+ return future;
+ }
+ }
+
+ future.set(executionStates);
+ return future;
+ }
+
+ @Override
public ListenableFuture<PhysicalPlans.PhysicalPlan> getPhysicalPlan(WatchCallback watcher, String topologyName) {
SettableFuture<PhysicalPlans.PhysicalPlan> future = SettableFuture.create();
String path = getPhysicalPlanPath(topologyName);
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index 7cabfb9..e1176b8 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -1,5 +1,7 @@
package com.twitter.heron.statemgr.zookeeper.curator;
+import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -266,6 +268,13 @@
}
@Override
+ public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+ final SettableFuture<List<ExecutionEnvironment.ExecutionState>> executionStateFuture = SettableFuture.create();
+ executionStateFuture.set(new LinkedList<ExecutionEnvironment.ExecutionState>());
+ return executionStateFuture;
+ }
+
+ @Override
public ListenableFuture<ExecutionEnvironment.ExecutionState> getExecutionState(WatchCallback watcher, String topologyName) {
final SettableFuture<ExecutionEnvironment.ExecutionState> executionStateFuture = SettableFuture.create();
String path = getExecutionStatePath(topologyName);
diff --git a/scripts/centos5/BUILD b/scripts/centos5/BUILD
index e38fd99..0f2dc2a 100644
--- a/scripts/centos5/BUILD
+++ b/scripts/centos5/BUILD
@@ -83,6 +83,7 @@
":conf-local-uploader",
":hcli",
":hexamples",
+ ":hcommands",
":hscheduler",
":hlscheduler",
":hrrpacking",
@@ -112,6 +113,7 @@
"--cp $(location conf-local-uploader) conf/local/uploader.yaml",
"--cp $(location heron-core) dist/heron-core.tar.gz",
"--cp $(location hexamples) examples/heron-examples.jar",
+ "--cp $(location hcommands) lib/commands/heron-commands.jar",
"--cp $(location hscheduler) lib/scheduler/heron-scheduler.jar",
"--cp $(location hlscheduler) lib/scheduler/heron-local-scheduler.jar",
"--cp $(location hrrpacking) lib/packing/heron-roundrobin-packing.jar",
@@ -238,6 +240,11 @@
)
filegroup(
+ name = "hcommands",
+ srcs = ["//heron/commands/src/java:heron-commands"],
+)
+
+filegroup(
name = "hscheduler",
srcs = ["//heron/newscheduler/src/java:heron-scheduler"],
)
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index fb78c8b..596cc84 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -8,6 +8,7 @@
load("/tools/rules/heron_client", "heron_client_bin_files")
load("/tools/rules/heron_client", "heron_client_conf_files")
load("/tools/rules/heron_client", "heron_client_lib_3rdparty_files")
+load("/tools/rules/heron_client", "heron_client_lib_commands_files")
load("/tools/rules/heron_client", "heron_client_lib_scheduler_files")
load("/tools/rules/heron_client", "heron_client_lib_packing_files")
load("/tools/rules/heron_client", "heron_client_lib_statemgr_files")
@@ -133,6 +134,12 @@
)
pkg_tar(
+ name = "heron-client-lib-commands",
+ package_dir = "lib/commands",
+ files = heron_client_lib_commands_files(),
+)
+
+pkg_tar(
name = "heron-client-lib-scheduler",
package_dir = "lib/scheduler",
files = heron_client_lib_scheduler_files(),
@@ -207,8 +214,9 @@
":heron-client-dist",
":heron-client-examples",
":heron-client-lib-3rdparty",
- ":heron-client-lib-scheduler",
+ ":heron-client-lib-commands",
":heron-client-lib-packing",
+ ":heron-client-lib-scheduler",
":heron-client-lib-statemgr",
":heron-client-lib-uploader",
],
diff --git a/tools/rules/heron_client.bzl b/tools/rules/heron_client.bzl
index 0b01978..9df787b 100644
--- a/tools/rules/heron_client.bzl
+++ b/tools/rules/heron_client.bzl
@@ -21,6 +21,11 @@
"//heron/config/src/yaml:conf-aurora-yaml",
]
+def heron_client_lib_commands_files():
+ return [
+ "//heron/commands/src/java:heron-commands",
+ ]
+
def heron_client_lib_scheduler_files():
return [
"//heron/newscheduler/src/java:heron-scheduler",