Merge pull request #241 from twitter/karthik/ps

added ps command
diff --git a/heron/cli/src/python/BUILD b/heron/cli/src/python/BUILD
index b9d22b5..111ec46 100644
--- a/heron/cli/src/python/BUILD
+++ b/heron/cli/src/python/BUILD
@@ -13,6 +13,7 @@
         "jars.py",
         "kill.py",
         "opts.py",
+        "ps.py",
         "restart.py",
         "submit.py",
         "utils.py",
diff --git a/heron/cli/src/python/jars.py b/heron/cli/src/python/jars.py
index 6cd80c1..7845094 100644
--- a/heron/cli/src/python/jars.py
+++ b/heron/cli/src/python/jars.py
@@ -10,7 +10,7 @@
   return file_list[0] if file_list else None
     
 ################################################################################
-# Get the topology jars - TODO, make the jars independent version free
+# Get the topology jars
 ################################################################################
 def topology_jars():
   jars = [
@@ -19,6 +19,15 @@
   return jars
 
 ################################################################################
+# Get the command jars
+################################################################################
+def command_jars():
+  jars = [
+      os.path.join(utils.get_heron_lib_dir(), "commands", "*")
+  ]
+  return jars
+
+################################################################################
 # Get the scheduler jars
 ################################################################################
 def scheduler_jars():
diff --git a/heron/cli/src/python/main.py b/heron/cli/src/python/main.py
index 3f6d16d..b93fbe2 100644
--- a/heron/cli/src/python/main.py
+++ b/heron/cli/src/python/main.py
@@ -23,6 +23,7 @@
 import heron.cli.src.python.activate as activate
 import heron.cli.src.python.deactivate as deactivate
 import heron.cli.src.python.kill as kill
+import heron.cli.src.python.ps as ps
 import heron.cli.src.python.restart as restart
 import heron.cli.src.python.submit as submit
 import heron.cli.src.python.utils as utils
@@ -71,6 +72,7 @@
   deactivate.create_parser(subparsers)
   help.create_parser(subparsers)
   kill.create_parser(subparsers)
+  ps.create_parser(subparsers)
   restart.create_parser(subparsers)
   submit.create_parser(subparsers)
   version.create_parser(subparsers)
@@ -93,6 +95,9 @@
   elif command == 'restart':
     return restart.run(command, parser, command_args, unknown_args)
   
+  elif command == 'ps':
+    return ps.run(command, parser, command_args, unknown_args)
+  
   elif command == 'submit':
     return submit.run(command, parser, command_args, unknown_args)
 
diff --git a/heron/cli/src/python/ps.py b/heron/cli/src/python/ps.py
new file mode 100644
index 0000000..b3558a7
--- /dev/null
+++ b/heron/cli/src/python/ps.py
@@ -0,0 +1,70 @@
+#!/usr/bin/python2.7
+
+import argparse
+import atexit
+import base64
+import contextlib
+import glob
+import logging
+import logging.handlers
+import os
+import shutil
+import sys
+import subprocess
+import tarfile
+import tempfile
+
+from heron.common.src.python.color import Log
+
+import heron.cli.src.python.args as args
+import heron.cli.src.python.execute as execute
+import heron.cli.src.python.jars as jars
+import heron.cli.src.python.utils as utils
+
+def create_parser(subparsers):
+  parser = subparsers.add_parser(
+      'ps',
+      help='List all topologies',
+      usage = "%(prog)s [options] cluster/[role]/[environ]",
+      add_help = False)
+
+  args.add_titles(parser)
+  args.add_cluster_role_env(parser)
+
+  args.add_config(parser)
+  args.add_verbose(parser)
+
+  parser.set_defaults(subcommand='ps')
+  return parser
+
+def run(command, parser, cl_args, unknown_args):
+
+  try:
+    config_overrides = utils.parse_cmdline_override(cl_args)
+
+    new_args = [
+        "--cluster", cl_args['cluster'],
+        "--role", cl_args['role'],
+        "--environment", cl_args['environ'],
+        "--heron_home", utils.get_heron_dir(),
+        "--config_path", cl_args['config_path'],
+        "--config_overrides", base64.b64encode(config_overrides),
+        "--command", "ps",
+    ]
+
+    lib_jars = utils.get_heron_libs(jars.command_jars() + jars.statemgr_jars())
+
+    # invoke the runtime manager to kill the topology
+    execute.heron_class(
+        'com.twitter.heron.command.CommandHandlerMain',
+        lib_jars,
+        extra_jars=[],
+        args= new_args
+    )
+
+  except Exception as ex:
+    print ex
+    Log.error('Failed to get list of topologies')
+    return False
+
+  return True
diff --git a/heron/commands/src/java/BUILD b/heron/commands/src/java/BUILD
new file mode 100644
index 0000000..dca9802
--- /dev/null
+++ b/heron/commands/src/java/BUILD
@@ -0,0 +1,41 @@
+package(default_visibility = ["//visibility:public"])
+
+load("/tools/rules/heron_deps", "heron_java_proto_files")
+
+common_deps_files = [
+   "//heron/common/src/java:common-java",
+   "//3rdparty/commons:commons-cli-java",
+   "//3rdparty/guava:guava-java",
+]
+
+spi_deps_files = [
+    "//heron/spi/src/java:common-spi-java",
+    "//heron/spi/src/java:statemgr-spi-java",
+    "//heron/spi/src/java:utils-spi-java",
+]
+
+commands_deps_files = \
+    common_deps_files + \
+    heron_java_proto_files() + \
+    spi_deps_files
+
+java_library(
+    name = 'commands-java',
+    srcs = glob(
+        ["**/*.java"],
+    ),
+    deps = commands_deps_files,
+)
+
+java_binary(
+    name = 'commands-unshaded',
+    srcs = glob(["**/*.java"]),
+    deps = commands_deps_files,
+)
+
+genrule(
+    name = "heron-commands",
+    srcs = [":commands-unshaded_deploy.jar"],
+    outs = ["heron-commands.jar"],
+    cmd  = "cp $< $@",
+)
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java
new file mode 100644
index 0000000..b646cdd
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java
@@ -0,0 +1,40 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+
+public abstract class CommandHandler {
+
+  // static config read from the config files
+  protected Config config;
+
+  // runtime config gathered during execution
+  protected Config runtime;
+
+  /**
+   * Construct the command handler with static and runtime config
+   */
+  CommandHandler(Config config, Config runtime) {
+    this.config = config;
+    this.runtime = runtime;
+  }
+
+  /**
+   * Execute any conditions before the command execution
+   */
+  public abstract boolean beforeExecution() throws Exception;
+
+  /**
+   * Execute any cleanup after the command execution
+   */
+  public abstract boolean afterExecution() throws Exception;
+
+  /** 
+   * Execute the command
+   */
+  public abstract boolean execute() throws Exception;
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java
new file mode 100644
index 0000000..a940fa6
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java
@@ -0,0 +1,79 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.common.basics.FileUtils;
+import com.twitter.heron.proto.scheduler.Scheduler;
+
+import com.twitter.heron.spi.common.ClusterConfig;
+import com.twitter.heron.spi.common.ClusterDefaults;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.common.Keys;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * For loading command handler config
+ */
+public class CommandHandlerConfig {
+  private static final Logger LOG = Logger.getLogger(CommandHandlerConfig.class.getName());
+
+  /**
+   * Load the defaults config
+   *
+   * @return config, the defaults config
+   */
+  protected static Config defaultConfigs(String heronHome, String configPath) {
+    Config config = Config.newBuilder()
+        .putAll(ClusterDefaults.getDefaults())
+        .putAll(ClusterConfig.loadCommandsConfig(heronHome, configPath))
+        .build();
+    return config;
+  }
+
+  /**
+   * Load the config parameters from the command line
+   *
+   * @param cluster, name of the cluster
+   * @param role, user role
+   * @param environ, user provided environment/tag
+   *
+   * @return config, the command line config
+   */
+  protected static Config commandLineConfigs(String cluster, String role, String environ) {
+    Config config = Config.newBuilder()
+        .put(Keys.cluster(), cluster)
+        .put(Keys.role(), role)
+        .put(Keys.environ(), environ)
+        .build();
+    return config;
+  }
+
+  /**
+   * Load the config from static config files
+   *
+   * @param commandLine, the command line args provided
+   *
+   * @return config, the static config
+   */
+  protected static Config loadConfig(CommandLine commandLine) {
+
+    String cluster = commandLine.getOptionValue("cluster");
+    String role = commandLine.getOptionValue("role");
+    String environ = commandLine.getOptionValue("environment");
+    String heronHome = commandLine.getOptionValue("heron_home");
+    String configPath = commandLine.getOptionValue("config_path");
+
+    // build the config by expanding all the variables
+    Config config = Config.expand(
+        Config.newBuilder()
+            .putAll(defaultConfigs(heronHome, configPath))
+            .putAll(commandLineConfigs(cluster, role, environ))
+            .build());
+    return config;
+  }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java
new file mode 100644
index 0000000..c932861
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java
@@ -0,0 +1,20 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+
+public class CommandHandlerFactory {
+  private static final Logger LOG = Logger.getLogger(CommandHandlerFactory.class.getName());
+
+  public static CommandHandler makeCommand(String command, Config config, Config runtime) {
+    if (command.equalsIgnoreCase("ps"))
+      return new ListTopologiesHandler(config, runtime);
+
+    LOG.info("Invalid command " + command);
+    return null;
+  }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java
new file mode 100644
index 0000000..9bbd192
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java
@@ -0,0 +1,95 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.twitter.heron.spi.common.ClusterConfig;
+import com.twitter.heron.spi.common.ClusterDefaults;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.common.Keys;
+import com.twitter.heron.spi.statemgr.IStateManager;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.HelpFormatter;
+
+public class CommandHandlerMain {
+  private static final Logger LOG = Logger.getLogger(CommandHandlerMain.class.getName());
+
+  public static void main(String[] args)
+      throws ClassNotFoundException, IllegalAccessException,
+      InstantiationException, IOException, ParseException {
+
+    // parse the help options first.
+    Options options = CommandHandlerOptions.constructOptions();
+    Options helpOptions = CommandHandlerOptions.constructHelpOptions();
+    CommandLineParser parser = new DefaultParser();
+    CommandLine cmd = parser.parse(helpOptions, args, true);;
+
+    if(cmd.hasOption("h")) {
+      CommandHandlerOptions.usage(options);
+      System.exit(0);
+    }
+
+    // Now parse the required options
+    try {
+      cmd = parser.parse(options, args);
+    } catch(ParseException e) {
+      LOG.severe("Error parsing command line options: " + e.getMessage());
+      CommandHandlerOptions.usage(options);
+      System.exit(1);
+    }
+
+    // get the command to be executed
+    String command = cmd.getOptionValue("command");
+
+    // load the static config
+    Config config = CommandHandlerConfig.loadConfig(cmd);
+    LOG.info("Static config loaded successfully ");
+    LOG.info(config.toString());
+
+    // create an instance of state manager
+    String statemgrClass = Context.stateManagerClass(config);
+    IStateManager statemgr = (IStateManager) Class.forName(statemgrClass).newInstance();
+
+    try {
+      // initialize the statemgr
+      statemgr.initialize(config);
+
+      // build the runtime config 
+      Config runtime = Config.newBuilder()
+          .put(Keys.schedulerStateManagerAdaptor(), new SchedulerStateManagerAdaptor(statemgr))
+          .build();
+
+      // instantiate the command handler
+      CommandHandler commandHandler = CommandHandlerFactory.makeCommand(command, config, runtime);
+
+      // execute preconditions for command execution
+      commandHandler.beforeExecution();
+
+      commandHandler.execute();
+
+      // execute post conditions for command execution
+      commandHandler.afterExecution();
+
+    } catch (Exception e) { 
+      e.printStackTrace();
+      LOG.severe("Unable to execute command " + command);
+      System.exit(1);
+
+    } finally {
+      // close the state manager
+      statemgr.close();
+    }
+
+    System.exit(0);
+  }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java
new file mode 100644
index 0000000..81feabb
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java
@@ -0,0 +1,105 @@
+package com.twitter.heron.command;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.HelpFormatter;
+
+public class CommandHandlerOptions {
+  private static final Logger LOG = Logger.getLogger(CommandHandlerOptions.class.getName());
+
+  // Print usage options
+  protected static void usage(Options options) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp( "CommandHandlerOptions", options );
+  }
+
+  // Construct all required command line options
+  protected static Options constructOptions() {
+    Options options = new Options();
+
+    Option cluster = Option.builder("c")
+        .desc("Cluster name in which the topology needs to run on")
+        .longOpt("cluster")
+        .hasArgs()
+        .argName("cluster")
+        .required()
+        .build();
+
+    Option role = Option.builder("r")
+        .desc("Role under which the topology needs to run")
+        .longOpt("role")
+        .hasArgs()
+        .argName("role")
+        .required()
+        .build();
+
+    Option environment = Option.builder("e")
+        .desc("Environment under which the topology needs to run")
+        .longOpt("environment")
+        .hasArgs()
+        .argName("environment")
+        .required()
+        .build();
+
+    Option heronHome = Option.builder("d")
+        .desc("Directory where heron is installed")
+        .longOpt("heron_home")
+        .hasArgs()
+        .argName("heron home dir")
+        .required()
+        .build();
+
+    Option configPath = Option.builder("p")
+        .desc("Path of the config files")
+        .longOpt("config_path")
+        .hasArgs()
+        .argName("config path")
+        .required()
+        .build();
+
+    // TODO: Need to figure out the exact format
+    Option configOverrides = Option.builder("o")
+        .desc("Command line config overrides")
+        .longOpt("config_overrides")
+        .hasArgs()
+        .argName("config overrides")
+        .build();
+
+    Option command = Option.builder("m")
+        .desc("Command to run")
+        .longOpt("command")
+        .hasArgs()
+        .required()
+        .argName("command to run")
+        .build();
+
+    options.addOption(cluster);
+    options.addOption(role);
+    options.addOption(environment);
+    options.addOption(configPath);
+    options.addOption(configOverrides);
+    options.addOption(command);
+    options.addOption(heronHome);
+
+    return options;
+  }
+
+  // construct command line help options
+  protected static Options constructHelpOptions() {
+    Options options = new Options();
+    Option help = Option.builder("h")
+        .desc("List all options and their description")
+        .longOpt("help")
+        .build();
+
+    options.addOption(help);
+    return options;
+  }
+}
diff --git a/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java b/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java
new file mode 100644
index 0000000..a42727e
--- /dev/null
+++ b/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java
@@ -0,0 +1,94 @@
+package com.twitter.heron.command;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.util.concurrent.TimeUnit;
+import com.twitter.heron.spi.common.Config;
+import com.twitter.heron.spi.common.Context;
+import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
+import com.twitter.heron.spi.utils.Runtime;
+
+import com.twitter.heron.proto.system.ExecutionEnvironment;
+
+public class ListTopologiesHandler extends CommandHandler {
+  private static final Logger LOG = Logger.getLogger(ListTopologiesHandler.class.getName());
+
+  /**
+   * Construct the command handler with static and runtime config
+   */
+  ListTopologiesHandler(Config config, Config runtime) {
+    super(config, runtime);
+  }
+
+  /**
+   * Execute any conditions before the command execution
+   */
+  public boolean beforeExecution() {
+    return true;
+  }
+
+  /**
+   * Execute any cleanup after the command execution
+   */
+  public boolean afterExecution() {
+    return true;
+  }
+
+  /** 
+   * Execute the command
+   */
+  public boolean execute() throws Exception {
+
+    // get the state manager instance
+    SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
+
+    // get the execution states of all topologies
+    List<ExecutionEnvironment.ExecutionState> executionStates;
+    executionStates = stateManager.getAllExecutionStates().get(5, TimeUnit.SECONDS);
+
+    // if no topologies are found, return right away
+    if (executionStates.isEmpty()) {
+      System.out.println("No topologies found"); 
+      return true;
+    }
+
+    // print the header
+    System.out.format("%-12.12s %-12.12s %-12.12s %-15.15s %-15.15s %s\n", 
+        "CLUSTER", "ROLE", "ENVIRON", "USER", "TIME", "NAME");
+
+    long now = System.currentTimeMillis()/1000;
+    for (ExecutionEnvironment.ExecutionState es: executionStates) {
+
+      // calculate the difference in time in seconds
+      long delta = now - es.getSubmissionTime();
+
+      // calculate (and subtract) whole days
+      long days = (long)Math.floor(delta / 86400);
+      delta -= days * 86400;
+
+      // calculate (and subtract) whole hours
+      long hours = ((long)Math.floor(delta / 3600)) % 24;
+      delta -= hours * 3600;
+
+      // calculate (and subtract) whole minutes
+      long minutes = ((long)Math.floor(delta / 60)) % 60;
+      delta -= minutes * 60;
+
+      // what's left is seconds
+      long seconds = delta % 60;  // in theory the modulus is not required
+
+      String timeString = String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds);      
+ 
+      System.out.format("%-12.12s %-12.12s %-12.12s %-15.15s %-15.15s %s\n", 
+          es.getCluster(), es.getRole(), es.getEnviron(), 
+          es.getSubmissionUser(), timeString, es.getTopologyName());
+    }
+
+    return true;
+  }
+}
diff --git a/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java b/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java
index b1beef5..3f9155b 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java
@@ -146,6 +146,15 @@
     return cb.build();
   }
 
+  public static Config loadCommandsConfig(String heronHome, String configPath) {
+    Config homeConfig = loadBasicConfig(heronHome, configPath); 
+
+    Config.Builder cb = Config.newBuilder()
+        .putAll(homeConfig)
+        .putAll(loadStateManagerConfig(Context.stateManagerFile(homeConfig)));
+    return cb.build();
+  }
+
   public static Config loadSandboxConfig() {
     Config sandboxConfig = loadBasicSandboxConfig(); 
 
diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java
index 17ee0a6..ca8e71c 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java
@@ -1,5 +1,6 @@
 package com.twitter.heron.spi.statemgr;
 
+import java.util.List;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import com.twitter.heron.api.generated.TopologyAPI;
@@ -152,6 +153,13 @@
       WatchCallback watcher, String topologyName);
 
   /**
+   * Get the execution states of all topologies
+   *
+   * @return List<ExecutionState>
+   */
+  ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates();
+
+  /**
    * Set the location of Tmaster.
    * @param location
    * @param topologyName
diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
index 6ef49ec..44b4007 100644
--- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
+++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java
@@ -1,5 +1,6 @@
 package com.twitter.heron.spi.statemgr;
 
+import java.util.List;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import com.twitter.heron.api.generated.TopologyAPI;
@@ -162,6 +163,15 @@
   }
 
   /**
+   * Get the execution states of all topologies
+   *
+   * @return List<ExecutionState>
+   */
+  public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+    return delegate.getAllExecutionStates();
+  }
+
+  /**
    * Get the physical plan for the given topology
    *
    * @param watcher @see com.twitter.heron.spi.statemgr.WatchCallback
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java
index fc44031..d76cfb6 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java
@@ -1,5 +1,6 @@
 package com.twitter.heron.statemgr;
 
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -109,6 +110,11 @@
   }
 
   @Override
+  public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+    return nullFuture;
+  }
+
+  @Override
   public ListenableFuture<PhysicalPlans.PhysicalPlan> getPhysicalPlan(WatchCallback watcher, String topologyName) {
     return nullFuture;
   }
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
index 432f6b8..07f5976 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java
@@ -1,6 +1,9 @@
 package com.twitter.heron.statemgr.localfs;
 
+import java.io.File;
+import java.util.List;
 import java.util.Map;
+import java.util.LinkedList;
 import java.util.logging.Logger;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -193,13 +196,36 @@
       executionState = ExecutionEnvironment.ExecutionState.parseFrom(data);
       future.set(executionState);
     } catch (InvalidProtocolBufferException e) {
-      future.setException(new RuntimeException("Could not parse SchedulerLocation", e));
+      future.setException(new RuntimeException("Could not parse ExecutionState", e));
     }
 
     return future;
   }
 
   @Override
+  public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+    SettableFuture<List<ExecutionEnvironment.ExecutionState>> future = SettableFuture.create();
+    ExecutionEnvironment.ExecutionState executionState;
+    List<ExecutionEnvironment.ExecutionState> executionStates = new LinkedList<ExecutionEnvironment.ExecutionState>();
+
+    File folder = new File(getExecutionStateDir());
+    File[] files = folder.listFiles();
+    for (File file: files) {
+      byte[] data = FileUtils.readFromFile(file.getAbsolutePath());
+      try {
+        executionState = ExecutionEnvironment.ExecutionState.parseFrom(data);
+        executionStates.add(executionState);
+      } catch (InvalidProtocolBufferException e) {
+        future.setException(new RuntimeException("Could not parse ExecutionState", e));
+        return future;
+      }
+    }
+
+    future.set(executionStates);
+    return future;
+  }
+
+  @Override
   public ListenableFuture<PhysicalPlans.PhysicalPlan> getPhysicalPlan(WatchCallback watcher, String topologyName) {
     SettableFuture<PhysicalPlans.PhysicalPlan> future = SettableFuture.create();
     String path = getPhysicalPlanPath(topologyName);
diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index 7cabfb9..e1176b8 100644
--- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -1,5 +1,7 @@
 package com.twitter.heron.statemgr.zookeeper.curator;
 
+import java.util.List;
+import java.util.LinkedList;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
@@ -266,6 +268,13 @@
   }
 
   @Override
+  public ListenableFuture<List<ExecutionEnvironment.ExecutionState>> getAllExecutionStates() {
+    final SettableFuture<List<ExecutionEnvironment.ExecutionState>> executionStateFuture = SettableFuture.create();
+    executionStateFuture.set(new LinkedList<ExecutionEnvironment.ExecutionState>());
+    return executionStateFuture;
+  }
+
+  @Override
   public ListenableFuture<ExecutionEnvironment.ExecutionState> getExecutionState(WatchCallback watcher, String topologyName) {
     final SettableFuture<ExecutionEnvironment.ExecutionState> executionStateFuture = SettableFuture.create();
     String path = getExecutionStatePath(topologyName);
diff --git a/scripts/centos5/BUILD b/scripts/centos5/BUILD
index e38fd99..0f2dc2a 100644
--- a/scripts/centos5/BUILD
+++ b/scripts/centos5/BUILD
@@ -83,6 +83,7 @@
         ":conf-local-uploader",
         ":hcli",
         ":hexamples",
+        ":hcommands",
         ":hscheduler",
         ":hlscheduler",
         ":hrrpacking",
@@ -112,6 +113,7 @@
         "--cp $(location conf-local-uploader)        conf/local/uploader.yaml",
         "--cp $(location heron-core)                 dist/heron-core.tar.gz",
         "--cp $(location hexamples)                  examples/heron-examples.jar",
+        "--cp $(location hcommands)                  lib/commands/heron-commands.jar",
         "--cp $(location hscheduler)                 lib/scheduler/heron-scheduler.jar",
         "--cp $(location hlscheduler)                lib/scheduler/heron-local-scheduler.jar",
         "--cp $(location hrrpacking)                 lib/packing/heron-roundrobin-packing.jar",
@@ -238,6 +240,11 @@
 )
 
 filegroup(
+    name = "hcommands",
+    srcs = ["//heron/commands/src/java:heron-commands"],
+)
+
+filegroup(
     name = "hscheduler",
     srcs = ["//heron/newscheduler/src/java:heron-scheduler"],
 )
diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD
index fb78c8b..596cc84 100644
--- a/scripts/packages/BUILD
+++ b/scripts/packages/BUILD
@@ -8,6 +8,7 @@
 load("/tools/rules/heron_client", "heron_client_bin_files")
 load("/tools/rules/heron_client", "heron_client_conf_files")
 load("/tools/rules/heron_client", "heron_client_lib_3rdparty_files")
+load("/tools/rules/heron_client", "heron_client_lib_commands_files")
 load("/tools/rules/heron_client", "heron_client_lib_scheduler_files")
 load("/tools/rules/heron_client", "heron_client_lib_packing_files")
 load("/tools/rules/heron_client", "heron_client_lib_statemgr_files")
@@ -133,6 +134,12 @@
 )
 
 pkg_tar(
+    name = "heron-client-lib-commands",
+    package_dir = "lib/commands",
+    files = heron_client_lib_commands_files(),
+)
+
+pkg_tar(
     name = "heron-client-lib-scheduler",
     package_dir = "lib/scheduler",
     files = heron_client_lib_scheduler_files(),
@@ -207,8 +214,9 @@
         ":heron-client-dist",
         ":heron-client-examples",
         ":heron-client-lib-3rdparty",
-        ":heron-client-lib-scheduler",
+        ":heron-client-lib-commands",
         ":heron-client-lib-packing",
+        ":heron-client-lib-scheduler",
         ":heron-client-lib-statemgr",
         ":heron-client-lib-uploader",
     ],
diff --git a/tools/rules/heron_client.bzl b/tools/rules/heron_client.bzl
index 0b01978..9df787b 100644
--- a/tools/rules/heron_client.bzl
+++ b/tools/rules/heron_client.bzl
@@ -21,6 +21,11 @@
         "//heron/config/src/yaml:conf-aurora-yaml",
     ]
 
+def heron_client_lib_commands_files():
+    return [
+        "//heron/commands/src/java:heron-commands",
+    ]
+
 def heron_client_lib_scheduler_files():
     return [
         "//heron/newscheduler/src/java:heron-scheduler",